From 3b3adaebb40d01cf1e5305046a1d4c24ed0cfc1b Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Fri, 23 Jun 2023 15:08:20 +0100 Subject: [PATCH 1/5] Cleanup api.go --- internal/scheduler/api.go | 128 +++++++++++++++-------------- internal/scheduler/api_test.go | 4 +- internal/scheduler/schedulerapp.go | 4 +- 3 files changed, 71 insertions(+), 65 deletions(-) diff --git a/internal/scheduler/api.go b/internal/scheduler/api.go index d0ded087588..363b22a18e1 100644 --- a/internal/scheduler/api.go +++ b/internal/scheduler/api.go @@ -24,16 +24,25 @@ import ( "github.com/armadaproject/armada/pkg/executorapi" ) -// ExecutorApi is a gRPC service that exposes functionality required by the armada executors +// ExecutorApi is the gRPC service executors use to synchronise their state with that of the scheduler. type ExecutorApi struct { - producer pulsar.Producer - jobRepository database.JobRepository - executorRepository database.ExecutorRepository - legacyExecutorRepository database.ExecutorRepository - allowedPriorities []int32 // allowed priority classes - maxJobsPerCall uint // maximum number of jobs that will be leased in a single call - maxPulsarMessageSize uint // maximum sizer of pulsar messages produced - nodeIdLabel string + // Used to send Pulsar messages when, e.g., executors report a job has finished. + producer pulsar.Producer + // Interface to the component storing job information, such as which jobs are leased to a particular executor. + jobRepository database.JobRepository + // Interface to the component storing executor information, such as which when we last heard from an executor. + executorRepository database.ExecutorRepository + // Like executorRepository + legacyExecutorRepository database.ExecutorRepository + // Allowed priority class priorities. + allowedPriorities []int32 + // Max number of job leases sent per call to LeaseJobRuns. + maxJobsPerCall uint + // Max size of Pulsar messages produced. + maxPulsarMessageSizeBytes uint + // See scheduling config. + nodeIdLabel string + // See scheduling config. priorityClassNameOverride *string clock clock.Clock } @@ -46,6 +55,7 @@ func NewExecutorApi(producer pulsar.Producer, maxJobsPerCall uint, nodeIdLabel string, priorityClassNameOverride *string, + maxPulsarMessageSizeBytes uint, ) (*ExecutorApi, error) { if len(allowedPriorities) == 0 { return nil, errors.New("allowedPriorities cannot be empty") @@ -60,60 +70,57 @@ func NewExecutorApi(producer pulsar.Producer, legacyExecutorRepository: legacyExecutorRepository, allowedPriorities: allowedPriorities, maxJobsPerCall: maxJobsPerCall, - maxPulsarMessageSize: 1024 * 1024 * 2, + maxPulsarMessageSizeBytes: maxPulsarMessageSizeBytes, nodeIdLabel: nodeIdLabel, priorityClassNameOverride: priorityClassNameOverride, clock: clock.RealClock{}, }, nil } -// LeaseJobRuns performs the following actions: -// - Stores the request in postgres so that the scheduler can use the job + capacity information in the next scheduling round -// - Determines if any of the job runs in the request are no longer active and should be cancelled -// - Determines if any new job runs should be leased to the executor +// LeaseJobRuns reconciles the state of the executor with that of the scheduler. Specifically it: +// 1. Stores job and capacity information received from the executor to make it available to the scheduler. +// 2. Notifies the executor if any of its jobs are no longer active, e.g., due to being preempted by the scheduler. +// 3. Transfers any jobs scheduled on this executor cluster that the executor don't already have. func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRunsServer) error { - ctx := stream.Context() - log := ctxlogrus.Extract(ctx) // Receive once to get info necessary to get jobs to lease. req, err := stream.Recv() if err != nil { return errors.WithStack(err) } - log.Infof("Handling lease request for executor %s", req.ExecutorId) + ctx := stream.Context() + log := ctxlogrus.Extract(ctx) + log = log.WithField("executor", req.ExecutorId) - // store the executor state for use by the scheduler - executorState := srv.createExecutorState(ctx, req) - if err = srv.executorRepository.StoreExecutor(stream.Context(), executorState); err != nil { + executor := srv.executorFromLeaseRequest(ctx, req) + if err := srv.executorRepository.StoreExecutor(ctx, executor); err != nil { return err } - - // store the executor state for the legacy executor to use - if err = srv.legacyExecutorRepository.StoreExecutor(stream.Context(), executorState); err != nil { + if err = srv.legacyExecutorRepository.StoreExecutor(ctx, executor); err != nil { return err } - requestRuns, err := extractRunIds(req) + requestRuns, err := runIdsFromLeaseRequest(req) if err != nil { return err } - log.Debugf("Executor is currently aware of %d job runs", len(requestRuns)) + log.Infof("executor is aware of %d job runs", len(requestRuns)) - runsToCancel, err := srv.jobRepository.FindInactiveRuns(stream.Context(), requestRuns) + runsToCancel, err := srv.jobRepository.FindInactiveRuns(ctx, requestRuns) if err != nil { return err } - log.Debugf("Detected %d runs that need cancelling", len(runsToCancel)) + // TODO: Print a combined diff later. + log.Infof("%d runs that need cancelling", len(runsToCancel)) - // Fetch new leases from the db - leases, err := srv.jobRepository.FetchJobRunLeases(stream.Context(), req.ExecutorId, srv.maxJobsPerCall, requestRuns) + leases, err := srv.jobRepository.FetchJobRunLeases(ctx, req.ExecutorId, srv.maxJobsPerCall, requestRuns) if err != nil { return err } - // if necessary send a list of runs to cancel + // Send any runs that should be cancelled.. if len(runsToCancel) > 0 { - err = stream.Send(&executorapi.LeaseStreamMessage{ + if err := stream.Send(&executorapi.LeaseStreamMessage{ Event: &executorapi.LeaseStreamMessage_CancelRuns{ CancelRuns: &executorapi.CancelRuns{ JobRunIdsToCancel: util.Map(runsToCancel, func(x uuid.UUID) *armadaevents.Uuid { @@ -121,25 +128,22 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns }), }, }, - }) - - if err != nil { + }); err != nil { return errors.WithStack(err) } } - // Now send any leases + // Send any scheduled jobs the executor doesn't already have. decompressor := compress.NewZlibDecompressor() for _, lease := range leases { submitMsg := &armadaevents.SubmitJob{} - err = decompressAndMarshall(lease.SubmitMessage, decompressor, submitMsg) - if err != nil { + if err := unmarshalFromCompressedBytes(lease.SubmitMessage, decompressor, submitMsg); err != nil { return err } if srv.priorityClassNameOverride != nil { srv.setPriorityClassName(submitMsg, *srv.priorityClassNameOverride) } - srv.addNodeSelector(submitMsg, lease.Node) + srv.addNodeIdSelector(submitMsg, lease.Node) var groups []string if len(lease.Groups) > 0 { @@ -148,7 +152,7 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns return err } } - err = stream.Send(&executorapi.LeaseStreamMessage{ + err := stream.Send(&executorapi.LeaseStreamMessage{ Event: &executorapi.LeaseStreamMessage_Lease{ Lease: &executorapi.JobRunLease{ JobRunId: armadaevents.ProtoUuidFromUuid(lease.RunID), @@ -189,11 +193,10 @@ func (srv *ExecutorApi) setPriorityClassName(job *armadaevents.SubmitJob, priori } } -func (srv *ExecutorApi) addNodeSelector(job *armadaevents.SubmitJob, nodeId string) { +func (srv *ExecutorApi) addNodeIdSelector(job *armadaevents.SubmitJob, nodeId string) { if job == nil || nodeId == "" { return } - if job.MainObject != nil { switch typed := job.MainObject.Object.(type) { case *armadaevents.KubernetesMainObject_PodSpec: @@ -207,9 +210,10 @@ func addNodeSelector(podSpec *armadaevents.PodSpecWithAvoidList, key string, val return } if podSpec.PodSpec.NodeSelector == nil { - podSpec.PodSpec.NodeSelector = make(map[string]string, 1) + podSpec.PodSpec.NodeSelector = map[string]string{key: value} + } else { + podSpec.PodSpec.NodeSelector[key] = value } - podSpec.PodSpec.NodeSelector[key] = value } func setPriorityClassName(podSpec *armadaevents.PodSpecWithAvoidList, priorityClassName string) { @@ -219,19 +223,19 @@ func setPriorityClassName(podSpec *armadaevents.PodSpecWithAvoidList, priorityCl podSpec.PodSpec.PriorityClassName = priorityClassName } -// ReportEvents publishes all events to pulsar. The events are compacted for more efficient publishing +// ReportEvents publishes all events to Pulsar. The events are compacted for more efficient publishing. func (srv *ExecutorApi) ReportEvents(ctx context.Context, list *executorapi.EventList) (*types.Empty, error) { - err := pulsarutils.CompactAndPublishSequences(ctx, list.Events, srv.producer, srv.maxPulsarMessageSize, schedulers.Pulsar) + err := pulsarutils.CompactAndPublishSequences(ctx, list.Events, srv.producer, srv.maxPulsarMessageSizeBytes, schedulers.Pulsar) return &types.Empty{}, err } -// createExecutorState extracts a schedulerobjects.Executor from the requesrt -func (srv *ExecutorApi) createExecutorState(ctx context.Context, req *executorapi.LeaseRequest) *schedulerobjects.Executor { +// executorFromLeaseRequest extracts a schedulerobjects.Executor from the request. +func (srv *ExecutorApi) executorFromLeaseRequest(ctx context.Context, req *executorapi.LeaseRequest) *schedulerobjects.Executor { log := ctxlogrus.Extract(ctx) nodes := make([]*schedulerobjects.Node, 0, len(req.Nodes)) + now := srv.clock.Now().UTC() for _, nodeInfo := range req.Nodes { - node, err := api.NewNodeFromNodeInfo(nodeInfo, req.ExecutorId, srv.allowedPriorities, srv.clock.Now().UTC()) - if err != nil { + if node, err := api.NewNodeFromNodeInfo(nodeInfo, req.ExecutorId, srv.allowedPriorities, now); err != nil { logging.WithStacktrace(log, err).Warnf( "skipping node %s from executor %s", nodeInfo.GetName(), req.GetExecutorId(), ) @@ -244,37 +248,35 @@ func (srv *ExecutorApi) createExecutorState(ctx context.Context, req *executorap Pool: req.Pool, Nodes: nodes, MinimumJobSize: schedulerobjects.ResourceList{Resources: req.MinimumJobSize}, - LastUpdateTime: srv.clock.Now().UTC(), - UnassignedJobRuns: util.Map(req.UnassignedJobRunIds, func(x armadaevents.Uuid) string { - return strings.ToLower(armadaevents.UuidFromProtoUuid(&x).String()) + LastUpdateTime: now, + UnassignedJobRuns: util.Map(req.UnassignedJobRunIds, func(jobId armadaevents.Uuid) string { + return strings.ToLower(armadaevents.UuidFromProtoUuid(&jobId).String()) }), } } -// extractRunIds extracts all the job runs contained in the executor request -func extractRunIds(req *executorapi.LeaseRequest) ([]uuid.UUID, error) { - runIds := make([]uuid.UUID, 0) - // add all runids from nodes +// runIdsFromLeaseRequest returns the ids of all runs in a lease request, including any not yet assigned to a node. +func runIdsFromLeaseRequest(req *executorapi.LeaseRequest) ([]uuid.UUID, error) { + runIds := make([]uuid.UUID, 0, 256) for _, node := range req.Nodes { for runIdStr := range node.RunIdsByState { - runId, err := uuid.Parse(runIdStr) - if err != nil { + if runId, err := uuid.Parse(runIdStr); err != nil { return nil, errors.WithStack(err) + } else { + runIds = append(runIds, runId) } - runIds = append(runIds, runId) } } - // add all unassigned runids for _, runId := range req.UnassignedJobRunIds { runIds = append(runIds, armadaevents.UuidFromProtoUuid(&runId)) } return runIds, nil } -func decompressAndMarshall(b []byte, decompressor compress.Decompressor, msg proto.Message) error { - decompressed, err := decompressor.Decompress(b) +func unmarshalFromCompressedBytes(bytes []byte, decompressor compress.Decompressor, msg proto.Message) error { + decompressedBytes, err := decompressor.Decompress(bytes) if err != nil { return err } - return proto.Unmarshal(decompressed, msg) + return proto.Unmarshal(decompressedBytes, msg) } diff --git a/internal/scheduler/api_test.go b/internal/scheduler/api_test.go index e0e30371755..5587c8cfb96 100644 --- a/internal/scheduler/api_test.go +++ b/internal/scheduler/api_test.go @@ -171,7 +171,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { mockLegacyExecutorRepository := schedulermocks.NewMockExecutorRepository(ctrl) mockStream := schedulermocks.NewMockExecutorApi_LeaseJobRunsServer(ctrl) - runIds, err := extractRunIds(tc.request) + runIds, err := runIdsFromLeaseRequest(tc.request) require.NoError(t, err) // set up mocks @@ -204,6 +204,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { maxJobsPerCall, "kubernetes.io/hostname", nil, + 4*1024*1024, ) require.NoError(t, err) server.clock = testClock @@ -331,6 +332,7 @@ func TestExecutorApi_Publish(t *testing.T) { 100, "kubernetes.io/hostname", nil, + 4*1024*1024, ) require.NoError(t, err) diff --git a/internal/scheduler/schedulerapp.go b/internal/scheduler/schedulerapp.go index fd1b8ade516..981d7759c58 100644 --- a/internal/scheduler/schedulerapp.go +++ b/internal/scheduler/schedulerapp.go @@ -115,7 +115,7 @@ func Run(config schedulerconfig.Configuration) error { defer grpcServer.GracefulStop() lis, err := net.Listen("tcp", fmt.Sprintf(":%d", config.Grpc.Port)) if err != nil { - return errors.WithMessage(err, "error setting up grpc server") + return errors.WithMessage(err, "error setting up gRPC server") } allowedPcs := config.Scheduling.Preemption.AllowedPriorities() executorServer, err := NewExecutorApi( @@ -127,7 +127,9 @@ func Run(config schedulerconfig.Configuration) error { config.Scheduling.MaximumJobsToSchedule, config.Scheduling.Preemption.NodeIdLabel, config.Scheduling.Preemption.PriorityClassNameOverride, + config.Pulsar.MaxAllowedMessageSize, ) + if err != nil { return errors.WithMessage(err, "error creating executorApi") } From 3fb65fd40a1bafc528b5b294452543eac6b79c43 Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Mon, 26 Jun 2023 11:29:13 +0100 Subject: [PATCH 2/5] Ingest annotations, cleanup --- internal/scheduler/api.go | 15 +++++++-------- internal/scheduler/jobdb/job.go | 12 ++++++------ internal/scheduler/scheduler.go | 12 +++++------- internal/scheduleringester/instructions.go | 18 ++++++++++++------ 4 files changed, 30 insertions(+), 27 deletions(-) diff --git a/internal/scheduler/api.go b/internal/scheduler/api.go index 363b22a18e1..2e869782731 100644 --- a/internal/scheduler/api.go +++ b/internal/scheduler/api.go @@ -104,21 +104,20 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns if err != nil { return err } - log.Infof("executor is aware of %d job runs", len(requestRuns)) - runsToCancel, err := srv.jobRepository.FindInactiveRuns(ctx, requestRuns) if err != nil { return err } - // TODO: Print a combined diff later. - log.Infof("%d runs that need cancelling", len(runsToCancel)) - - leases, err := srv.jobRepository.FetchJobRunLeases(ctx, req.ExecutorId, srv.maxJobsPerCall, requestRuns) + newRuns, err := srv.jobRepository.FetchJobRunLeases(ctx, req.ExecutorId, srv.maxJobsPerCall, requestRuns) if err != nil { return err } + log.Infof( + "executor currently has %d job runs; sending %d cancellations and %d new runs", + len(requestRuns), len(runsToCancel), len(newRuns), + ) - // Send any runs that should be cancelled.. + // Send any runs that should be cancelled. if len(runsToCancel) > 0 { if err := stream.Send(&executorapi.LeaseStreamMessage{ Event: &executorapi.LeaseStreamMessage_CancelRuns{ @@ -135,7 +134,7 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns // Send any scheduled jobs the executor doesn't already have. decompressor := compress.NewZlibDecompressor() - for _, lease := range leases { + for _, lease := range newRuns { submitMsg := &armadaevents.SubmitJob{} if err := unmarshalFromCompressedBytes(lease.SubmitMessage, decompressor, submitMsg); err != nil { return err diff --git a/internal/scheduler/jobdb/job.go b/internal/scheduler/jobdb/job.go index aae2e08be0f..11d69aca285 100644 --- a/internal/scheduler/jobdb/job.go +++ b/internal/scheduler/jobdb/job.go @@ -13,25 +13,25 @@ import ( // Job is the scheduler-internal representation of a job. type Job struct { - // String representation of the job id + // String representation of the job id. id string // Name of the queue this job belongs to. queue string - // Jobset the job belongs to - // We store this as it's needed for sending job event messages + // Jobset the job belongs to. + // We store this as it's needed for sending job event messages. jobset string // Per-queue priority of this job. priority uint32 // Requested per queue priority of this job. - // This is used when syncing the postgres database with the scheduler-internal database + // This is used when syncing the postgres database with the scheduler-internal database. requestedPriority uint32 // Logical timestamp indicating the order in which jobs are submitted. // Jobs with identical Queue and Priority are sorted by this. created int64 // True if the job is currently queued. - // If this is set then the job will not be considered for scheduling + // If this is set then the job will not be considered for scheduling. queued bool - // The current version of the queued state + // The current version of the queued state. queuedVersion int32 // Scheduling requirements of this job. jobSchedulingInfo *schedulerobjects.JobSchedulingInfo diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 24a44336a99..b3d5423af5f 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -209,8 +209,8 @@ func (s *Scheduler) cycle(ctx context.Context, updateAll bool, leaderToken Leade } events = append(events, expirationEvents...) + // Schedule jobs. if s.clock.Now().Sub(s.previousSchedulingRoundEnd) > s.schedulePeriod { - // Schedule jobs. overallSchedulerResult, err := s.schedulingAlgo.Schedule(ctx, txn, s.jobDb) if err != nil { return err @@ -222,8 +222,6 @@ func (s *Scheduler) cycle(ctx context.Context, updateAll bool, leaderToken Leade } events = append(events, resultEvents...) s.previousSchedulingRoundEnd = s.clock.Now() - } else { - log.Infof("skipping scheduling new jobs this cycle as a scheduling round ran less than %s ago", s.schedulePeriod) } // Publish to Pulsar. @@ -264,7 +262,7 @@ func (s *Scheduler) syncState(ctx context.Context) ([]*jobdb.Job, error) { // Try and retrieve the job from the jobDb. If it doesn't exist then create it. job := s.jobDb.GetById(txn, dbJob.JobID) if job == nil { - job, err = s.createSchedulerJob(&dbJob) + job, err = s.schedulerJobFromDatabaseJob(&dbJob) if err != nil { return nil, err } @@ -817,8 +815,8 @@ func (s *Scheduler) ensureDbUpToDate(ctx context.Context, pollInterval time.Dura } } -// createSchedulerJob creates a new scheduler job from a database job. -func (s *Scheduler) createSchedulerJob(dbJob *database.Job) (*jobdb.Job, error) { +// schedulerJobFromDatabaseJob creates a new scheduler job from a database job. +func (s *Scheduler) schedulerJobFromDatabaseJob(dbJob *database.Job) (*jobdb.Job, error) { schedulingInfo := &schedulerobjects.JobSchedulingInfo{} err := proto.Unmarshal(dbJob.SchedulingInfo, schedulingInfo) if err != nil { @@ -892,7 +890,7 @@ func updateSchedulerRun(run *jobdb.JobRun, dbRun *database.Run) *jobdb.JobRun { return run } -// updateSchedulerJob updates the scheduler job (in-place) to match the database job +// updateSchedulerJob updates the scheduler job in-place to match the database job. func updateSchedulerJob(job *jobdb.Job, dbJob *database.Job) (*jobdb.Job, error) { if dbJob.CancelRequested && !job.CancelRequested() { job = job.WithCancelRequested(true) diff --git a/internal/scheduleringester/instructions.go b/internal/scheduleringester/instructions.go index 3ff86553121..10702d12062 100644 --- a/internal/scheduleringester/instructions.go +++ b/internal/scheduleringester/instructions.go @@ -148,7 +148,7 @@ func (c *InstructionConverter) handleSubmitJob(job *armadaevents.SubmitJob, subm // Produce a minimal representation of the job for the scheduler. // To avoid the scheduler needing to load the entire job spec. - schedulingInfo, err := c.schedulingInfoFromSubmitJob(job) + schedulingInfo, err := c.schedulingInfoFromSubmitJob(job, submitTime) if err != nil { return nil, err } @@ -357,13 +357,15 @@ func (c *InstructionConverter) handlePartitionMarker(pm *armadaevents.PartitionM // schedulingInfoFromSubmitJob returns a minimal representation of a job // containing only the info needed by the scheduler. -func (c *InstructionConverter) schedulingInfoFromSubmitJob(submitJob *armadaevents.SubmitJob) (*schedulerobjects.JobSchedulingInfo, error) { +func (c *InstructionConverter) schedulingInfoFromSubmitJob(submitJob *armadaevents.SubmitJob, submitTime time.Time) (*schedulerobjects.JobSchedulingInfo, error) { // Component common to all jobs. schedulingInfo := &schedulerobjects.JobSchedulingInfo{ Lifetime: submitJob.Lifetime, AtMostOnce: submitJob.AtMostOnce, Preemptible: submitJob.Preemptible, ConcurrencySafe: submitJob.ConcurrencySafe, + SubmitTime: submitTime, + Priority: submitJob.Priority, Version: 0, } @@ -371,12 +373,16 @@ func (c *InstructionConverter) schedulingInfoFromSubmitJob(submitJob *armadaeven switch object := submitJob.MainObject.Object.(type) { case *armadaevents.KubernetesMainObject_PodSpec: podSpec := object.PodSpec.PodSpec - requirements := &schedulerobjects.ObjectRequirements_PodRequirements{ - PodRequirements: adapters.PodRequirementsFromPodSpec(podSpec, c.priorityClasses), - } + schedulingInfo.PriorityClassName = podSpec.PriorityClassName + podRequirements := adapters.PodRequirementsFromPodSpec(podSpec, c.priorityClasses) + podRequirements.Annotations = submitJob.ObjectMeta.Annotations schedulingInfo.ObjectRequirements = append( schedulingInfo.ObjectRequirements, - &schedulerobjects.ObjectRequirements{Requirements: requirements}, + &schedulerobjects.ObjectRequirements{ + Requirements: &schedulerobjects.ObjectRequirements_PodRequirements{ + PodRequirements: podRequirements, + }, + }, ) default: return nil, errors.Errorf("unsupported object type %T", object) From 68049ae4678d2022ac8f8adcc5e3a8e02c4e9788 Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Mon, 26 Jun 2023 12:01:26 +0100 Subject: [PATCH 3/5] Cleanup --- internal/scheduleringester/dbops.go | 2 +- internal/scheduleringester/instructions.go | 7 ++-- .../scheduleringester/instructions_test.go | 2 +- internal/scheduleringester/schedulerdb.go | 39 ++++++++++--------- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/internal/scheduleringester/dbops.go b/internal/scheduleringester/dbops.go index f4b448dec47..b4300cc868a 100644 --- a/internal/scheduleringester/dbops.go +++ b/internal/scheduleringester/dbops.go @@ -99,7 +99,7 @@ func AppendDbOperation(ops []DbOperation, op DbOperation) []DbOperation { break } } - return discardNilOps(ops) // TODO: Can be made more efficient. + return discardNilOps(ops) } func discardNilOps(ops []DbOperation) []DbOperation { diff --git a/internal/scheduleringester/instructions.go b/internal/scheduleringester/instructions.go index 10702d12062..fa0e70c09a9 100644 --- a/internal/scheduleringester/instructions.go +++ b/internal/scheduleringester/instructions.go @@ -48,7 +48,7 @@ func NewInstructionConverter( func (c *InstructionConverter) Convert(_ context.Context, sequencesWithIds *ingest.EventSequencesWithIds) *DbOperationsWithMessageIds { operations := make([]DbOperation, 0) for _, es := range sequencesWithIds.EventSequences { - for _, op := range c.convertSequence(es) { + for _, op := range c.dbOperationsFromEventSequence(es) { operations = AppendDbOperation(operations, op) } } @@ -58,14 +58,13 @@ func (c *InstructionConverter) Convert(_ context.Context, sequencesWithIds *inge } } -func (c *InstructionConverter) convertSequence(es *armadaevents.EventSequence) []DbOperation { +func (c *InstructionConverter) dbOperationsFromEventSequence(es *armadaevents.EventSequence) []DbOperation { meta := eventSequenceCommon{ queue: es.Queue, jobset: es.JobSetName, user: es.UserId, groups: es.Groups, } - operations := make([]DbOperation, 0, len(es.Events)) for idx, event := range es.Events { eventTime := time.Now().UTC() @@ -117,7 +116,7 @@ func (c *InstructionConverter) convertSequence(es *armadaevents.EventSequence) [ } if err != nil { c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - log.WithError(err).Warnf("Could not convert event at index %d.", idx) + log.WithError(err).Errorf("Could not convert event at index %d.", idx) } else { operations = append(operations, operationsFromEvent...) } diff --git a/internal/scheduleringester/instructions_test.go b/internal/scheduleringester/instructions_test.go index 45debe5c30d..6cf5665254f 100644 --- a/internal/scheduleringester/instructions_test.go +++ b/internal/scheduleringester/instructions_test.go @@ -201,7 +201,7 @@ func TestConvertSequence(t *testing.T) { t.Run(name, func(t *testing.T) { converter := InstructionConverter{m, f.PriorityClasses, compressor} es := f.NewEventSequence(tc.events...) - results := converter.convertSequence(es) + results := converter.dbOperationsFromEventSequence(es) assertOperationsEqual(t, tc.expected, results) }) } diff --git a/internal/scheduleringester/schedulerdb.go b/internal/scheduleringester/schedulerdb.go index ad34ada01c9..9b944836c2e 100644 --- a/internal/scheduleringester/schedulerdb.go +++ b/internal/scheduleringester/schedulerdb.go @@ -42,10 +42,9 @@ func NewSchedulerDb( } } -// Store persists all operations in the database. Note that: -// - this function will retry until it either succeeds or a terminal error is encountered -// - this function will take out a postgres lock to ensure that other ingesters are not writing to the database -// at the same time (for details, see acquireLock()) +// Store persists all operations in the database. +// This function retires until it either succeeds or encounters a terminal error. +// This function locks the postgres table to avoid write conflicts; see acquireLock() for details. func (s *SchedulerDb) Store(ctx context.Context, instructions *DbOperationsWithMessageIds) error { return ingest.WithRetry(func() (bool, error) { err := s.db.BeginTxFunc(ctx, pgx.TxOptions{ @@ -53,36 +52,38 @@ func (s *SchedulerDb) Store(ctx context.Context, instructions *DbOperationsWithM AccessMode: pgx.ReadWrite, DeferrableMode: pgx.Deferrable, }, func(tx pgx.Tx) error { - // First acquire the write lock lockCtx, cancel := context.WithTimeout(ctx, s.lockTimeout) defer cancel() - err := s.acquireLock(lockCtx, tx) - if err != nil { + // The lock is released automatically on transaction rollback/commit. + if err := s.acquireLock(lockCtx, tx); err != nil { return err } - // Now insert the ops for _, dbOp := range instructions.Ops { - err := s.WriteDbOp(ctx, tx, dbOp) - if err != nil { + if err := s.WriteDbOp(ctx, tx, dbOp); err != nil { return err } } - return err + return nil }) return true, err }, s.initialBackOff, s.maxBackOff) } -// acquireLock acquires the armada_scheduleringester_lock, which prevents two ingesters writing to the db at the same -// time. This is necessary because: -// - when rows are inserted into the database they are stamped with a sequence number -// - the scheduler relies on this sequence number increasing to ensure it has fetched all updated rows -// - concurrent transactions will result in sequence numbers being interleaved across transactions. -// - the interleaved sequences may result in the scheduler seeing sequence numbers that do not strictly increase over time. +// acquireLock acquires a postgres advisory lock, thus preventing concurrent writes. +// This is necessary to ensure sequence numbers assigned to each inserted row are monotonically increasing. +// Such a sequence number is assigned to each inserted row by a postgres function. +// +// Hence, if rows are inserted across multiple transactions concurrently, +// sequence numbers may be interleaved between transactions and the slower transaction may insert +// rows with sequence numbers smaller than those already written. +// +// The scheduler relies on these sequence numbers to only fetch new or updated rows in each update cycle. func (s *SchedulerDb) acquireLock(ctx context.Context, tx pgx.Tx) error { const lockId = 8741339439634283896 - _, err := tx.Exec(ctx, "SELECT pg_advisory_xact_lock($1)", lockId) - return errors.Wrapf(err, "Could not obtain lock") + if _, err := tx.Exec(ctx, "SELECT pg_advisory_xact_lock($1)", lockId); err != nil { + return errors.Wrapf(err, "could not obtain lock") + } + return nil } func (s *SchedulerDb) WriteDbOp(ctx context.Context, tx pgx.Tx, op DbOperation) error { From ebdeacf3aaaf7abe38c0db2da77e8a6cd22e7abc Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Mon, 26 Jun 2023 12:02:23 +0100 Subject: [PATCH 4/5] Lint --- internal/scheduler/schedulerapp.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/scheduler/schedulerapp.go b/internal/scheduler/schedulerapp.go index 981d7759c58..625998ef443 100644 --- a/internal/scheduler/schedulerapp.go +++ b/internal/scheduler/schedulerapp.go @@ -129,7 +129,6 @@ func Run(config schedulerconfig.Configuration) error { config.Scheduling.Preemption.PriorityClassNameOverride, config.Pulsar.MaxAllowedMessageSize, ) - if err != nil { return errors.WithMessage(err, "error creating executorApi") } From a97fc23d36c3f813e7a4df20582ac3b098353baa Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Mon, 26 Jun 2023 12:19:14 +0100 Subject: [PATCH 5/5] Fix test --- internal/scheduleringester/instructions_test.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/internal/scheduleringester/instructions_test.go b/internal/scheduleringester/instructions_test.go index 6cf5665254f..99ecde22f1a 100644 --- a/internal/scheduleringester/instructions_test.go +++ b/internal/scheduleringester/instructions_test.go @@ -272,11 +272,14 @@ func assertErrorMessagesEqual(t *testing.T, expectedBytes []byte, actualBytes [] func getExpectedSubmitMessageSchedulingInfo(t *testing.T) *schedulerobjects.JobSchedulingInfo { expectedSubmitSchedulingInfo := &schedulerobjects.JobSchedulingInfo{ - Lifetime: 0, - AtMostOnce: true, - Preemptible: true, - ConcurrencySafe: true, - Version: 0, + Lifetime: 0, + AtMostOnce: true, + Preemptible: true, + ConcurrencySafe: true, + Version: 0, + PriorityClassName: "test-priority", + Priority: 3, + SubmitTime: f.BaseTime, ObjectRequirements: []*schedulerobjects.ObjectRequirements{ { Requirements: &schedulerobjects.ObjectRequirements_PodRequirements{