Skip to content

Commit

Permalink
Fix preempted metrics (#3951)
Browse files Browse the repository at this point in the history
* Fix preempted metrics

Signed-off-by: Chris Martin <[email protected]>

* fix comment

Signed-off-by: Chris Martin <[email protected]>

* fix test

Signed-off-by: Chris Martin <[email protected]>

* lint

Signed-off-by: Chris Martin <[email protected]>

---------

Signed-off-by: Chris Martin <[email protected]>
  • Loading branch information
d80tb7 committed Sep 21, 2024
1 parent 21d20ca commit 72f71dc
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 5 deletions.
5 changes: 3 additions & 2 deletions internal/scheduler/jobdb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,10 +651,11 @@ func (job *Job) ValidateResourceRequests() error {

// WithNewRun creates a copy of the job with a new run on the given executor.
func (job *Job) WithNewRun(executor, nodeId, nodeName, pool string, scheduledAtPriority int32) *Job {
now := job.jobDb.clock.Now()
return job.WithUpdatedRun(job.jobDb.CreateRun(
job.jobDb.uuidProvider.New(),
job.Id(),
job.jobDb.clock.Now().UnixNano(),
now.UnixNano(),
executor,
nodeId,
nodeName,
Expand All @@ -668,7 +669,7 @@ func (job *Job) WithNewRun(executor, nodeId, nodeName, pool string, scheduledAtP
false,
false,
false,
nil,
&now,
nil,
nil,
nil,
Expand Down
7 changes: 7 additions & 0 deletions internal/scheduler/jobdb/job_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package jobdb

import (
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
clock "k8s.io/utils/clock/testing"

"github.com/armadaproject/armada/internal/common/stringinterner"
"github.com/armadaproject/armada/internal/common/types"
Expand All @@ -30,6 +32,7 @@ var (
}
TestDefaultPriorityClass = PriorityClass3
SchedulingKeyGenerator = schedulerobjects.NewSchedulingKeyGeneratorWithKey(make([]byte, 32))
testClock = clock.NewFakeClock(time.Now())
jobDb = NewJobDbWithSchedulingKeyGenerator(
TestPriorityClasses,
TestDefaultPriorityClass,
Expand All @@ -41,6 +44,10 @@ var (
scheduledAtPriority = int32(5)
)

func init() {
jobDb.clock = testClock
}

var baseJobRun = jobDb.CreateRun(
uuid.New().String(),
uuid.NewString(),
Expand Down
4 changes: 3 additions & 1 deletion internal/scheduler/jobdb/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,20 @@ func TestJob_TestWithNewRun(t *testing.T) {
jobWithRun := baseJob.WithNewRun("test-executor", "test-nodeId", "nodeId", "pool", scheduledAtPriority)
assert.Equal(t, true, jobWithRun.HasRuns())
run := jobWithRun.LatestRun()
created := jobDb.clock.Now()
assert.NotNil(t, run)
assert.Equal(
t,
&JobRun{
id: run.id,
jobId: "test-job",
created: run.created,
created: created.UnixNano(),
executor: "test-executor",
nodeId: "test-nodeId",
nodeName: "nodeId",
pool: "pool",
scheduledAtPriority: &scheduledAtPriority,
leaseTime: &created,
},
run,
)
Expand Down
10 changes: 9 additions & 1 deletion internal/scheduler/metrics/state_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,22 @@ func (m *jobStateMetrics) collect(ch chan<- prometheus.Metric) {
}
}

// ReportJobLeased reports the job as being leasedJob. This has to be reported separately because the state transition
// ReportJobLeased reports the job as being leased. This has to be reported separately because the state transition
// logic does work for job leased!
func (m *jobStateMetrics) ReportJobLeased(job *jobdb.Job) {
run := job.LatestRun()
duration, priorState := stateDuration(job, run, run.LeaseTime())
m.updateStateDuration(job, leased, priorState, duration)
}

// ReportJobPreempted reports the job as being preempted. This has to be reported separately because the state transition
// logic does work for job preempted!
func (m *jobStateMetrics) ReportJobPreempted(job *jobdb.Job) {
run := job.LatestRun()
duration, priorState := stateDuration(job, run, run.PreemptedTime())
m.updateStateDuration(job, preempted, priorState, duration)
}

func (m *jobStateMetrics) ReportStateTransitions(
jsts []jobdb.JobStateTransitions,
jobRunErrorsByRunId map[string]*armadaevents.Error,
Expand Down
3 changes: 3 additions & 0 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke
for _, jctx := range overallSchedulerResult.ScheduledJobs {
s.metrics.ReportJobLeased(jctx.Job)
}
for _, jctx := range overallSchedulerResult.PreemptedJobs {
s.metrics.ReportJobPreempted(jctx.Job)
}
}

return overallSchedulerResult, nil
Expand Down
3 changes: 2 additions & 1 deletion internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,9 @@ func (l *FairSchedulingAlgo) schedulePool(
}
for i, jctx := range result.PreemptedJobs {
jobDbJob := jctx.Job
now := l.clock.Now()
if run := jobDbJob.LatestRun(); run != nil {
jobDbJob = jobDbJob.WithUpdatedRun(run.WithFailed(true))
jobDbJob = jobDbJob.WithUpdatedRun(run.WithFailed(true).WithPreemptedTime(&now))
} else {
return nil, nil, errors.Errorf("attempting to preempt job %s with no associated runs", jobDbJob.Id())
}
Expand Down

0 comments on commit 72f71dc

Please sign in to comment.