Skip to content

Commit

Permalink
Remove poolUtils as all job runs and nodes have a pool that can be ac…
Browse files Browse the repository at this point in the history
…cessed by Pool() and GetPool() respectively. (#223) (#3912)

Co-authored-by: Eleanor Pratt <[email protected]>
  • Loading branch information
eleanorpratt and Eleanor Pratt committed Sep 6, 2024
1 parent af31592 commit 9fa77a0
Show file tree
Hide file tree
Showing 7 changed files with 9 additions and 129 deletions.
2 changes: 1 addition & 1 deletion internal/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
txn := c.jobDb.ReadTxn()
for _, executor := range executors {
for _, node := range executor.Nodes {
pool := GetNodePool(node, executor)
pool := node.GetPool()
clusterKey := clusterMetricKey{
cluster: executor.Id,
pool: pool,
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/pool_assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (p *DefaultPoolAssigner) Refresh(ctx *armadacontext.Context) error {
func (p *DefaultPoolAssigner) AssignPools(j *jobdb.Job) ([]string, error) {
// If Job has an active run then use the pool associated with the executor it was assigned to
if !j.Queued() && j.HasRuns() {
pool := GetRunPool(j.LatestRun(), p.nodeById[j.LatestRun().NodeId()], p.executorById[j.LatestRun().Executor()])
pool := j.LatestRun().Pool()
return []string{pool}, nil
}
// otherwise use the pools associated with the job
Expand Down
13 changes: 2 additions & 11 deletions internal/scheduler/pool_assigner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ func TestPoolAssigner_AssignPools(t *testing.T) {
WithQueued(false).
WithNewRun(cpuExecutor.Id, testfixtures.TestCluster()[0].Id, testfixtures.TestCluster()[0].Name, "", 0)

runningJobWithoutPoolSetOnLatestRunOrExistingAssociatedNode := queuedJob.
WithQueued(false).
WithNewRun(cpuExecutor.Id, "unknownNode", "unknownNode", "", 0)

tests := map[string]struct {
executorTimout time.Duration
executors []*schedulerobjects.Executor
Expand All @@ -49,15 +45,10 @@ func TestPoolAssigner_AssignPools(t *testing.T) {
job: runningJob,
expectedPools: []string{"cpu"},
},
"running job without pool set returns pool of associated node": {
"running job without pool set returns empty string pool": {
executors: []*schedulerobjects.Executor{cpuExecutor},
job: runningJobWithoutPoolSetOnLatestRun,
expectedPools: []string{testfixtures.TestPool},
},
"running job without pool set returns pool of associated cluster if associated node does not exist": {
executors: []*schedulerobjects.Executor{cpuExecutor},
job: runningJobWithoutPoolSetOnLatestRunOrExistingAssociatedNode,
expectedPools: []string{cpuExecutor.Pool},
expectedPools: []string{""},
},
}
for name, tc := range tests {
Expand Down
39 changes: 0 additions & 39 deletions internal/scheduler/poolutils.go

This file was deleted.

72 changes: 0 additions & 72 deletions internal/scheduler/poolutils_test.go

This file was deleted.

8 changes: 4 additions & 4 deletions internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con
executorById[executor.Id] = executor
for _, node := range executor.Nodes {
nodeById[node.GetId()] = node
pool := GetNodePool(node, executor)
pool := node.GetPool()
allKnownPools[pool] = true

if _, present := nodesByPoolAndExecutor[pool]; !present {
Expand Down Expand Up @@ -273,7 +273,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con
totalCapacityByPool := make(schedulerobjects.QuantityByTAndResourceType[string])
for _, executor := range executors {
for _, node := range executor.Nodes {
totalCapacityByPool.AddResourceList(GetNodePool(node, executor), node.TotalResources)
totalCapacityByPool.AddResourceList(node.GetPool(), node.TotalResources)
}
}

Expand All @@ -300,7 +300,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con
pools := job.Pools()

if !job.Queued() && job.LatestRun() != nil {
pool := GetRunPool(job.LatestRun(), nodeById[job.LatestRun().NodeId()], executorById[job.LatestRun().Executor()])
pool := job.LatestRun().Pool()
pools = []string{pool}
} else if len(pools) < 1 {
// This occurs if we haven't assigned a job to a pool. Right now this can occur if a user
Expand Down Expand Up @@ -349,7 +349,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con
if nodeName := run.NodeName(); nodeName == "" {
return nil, errors.Errorf("run %s of job %s is not queued but has no nodeName associated with it", run.Id(), job.Id())
}
pool := GetRunPool(job.LatestRun(), nodeById[job.LatestRun().NodeId()], executorById[job.LatestRun().Executor()])
pool := job.LatestRun().Pool()
if _, present := jobsByPoolAndExecutor[pool]; !present {
jobsByPoolAndExecutor[pool] = map[string][]*jobdb.Job{}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/submitcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) {
for _, ex := range executors {
nodes := ex.GetNodes()
nodesByPool := armadaslices.GroupByFunc(nodes, func(n *schedulerobjects.Node) string {
return GetNodePool(n, ex)
return n.GetPool()
})
for pool, nodes := range nodesByPool {
nodeDb, err := srv.constructNodeDb(nodes)
Expand Down

0 comments on commit 9fa77a0

Please sign in to comment.