From fd31a85bcd0e87b1078e47c7612a2e0ae8628474 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Fri, 17 Jun 2022 15:03:15 +0200 Subject: [PATCH] Manager: add blocking of workers when they fail certain tasks too much When a worker fails too many tasks, of the same task type, on the same job, it'll get blocked from doing those. --- internal/manager/api_impl/interfaces.go | 7 + .../api_impl/mocks/api_impl_mock.gen.go | 44 +++++ .../manager/api_impl/worker_task_updates.go | 126 +++++++++++++- internal/manager/api_impl/workers_test.go | 154 +++++++++++++++++- internal/manager/config/config.go | 2 +- internal/manager/config/defaults.go | 2 +- internal/manager/config/settings_test.go | 2 + internal/manager/persistence/db_migration.go | 8 +- .../manager/persistence/jobs_blocklist.go | 92 +++++++++++ .../persistence/jobs_blocklist_test.go | 137 ++++++++++++++++ 10 files changed, 566 insertions(+), 8 deletions(-) create mode 100644 internal/manager/persistence/jobs_blocklist.go create mode 100644 internal/manager/persistence/jobs_blocklist_test.go diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 8c67ba5f..26d59c35 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -53,6 +53,13 @@ type PersistenceService interface { // ClearFailureListOfJob en-mass, for all tasks of this job, clears the list of workers that failed those tasks. ClearFailureListOfJob(context.Context, *persistence.Job) error + // AddWorkerToJobBlocklist prevents this Worker of getting any task, of this type, on this job, from the task scheduler. + AddWorkerToJobBlocklist(ctx context.Context, job *persistence.Job, worker *persistence.Worker, taskType string) error + // WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job. + WorkersLeftToRun(ctx context.Context, job *persistence.Job, taskType string) (map[string]bool, error) + // CountTaskFailuresOfWorker returns the number of task failures of this worker, on this particular job and task type. + CountTaskFailuresOfWorker(ctx context.Context, job *persistence.Job, worker *persistence.Worker, taskType string) (int, error) + // Database queries. QueryJobs(ctx context.Context, query api.JobsQuery) ([]*persistence.Job, error) QueryJobTaskSummaries(ctx context.Context, jobUUID string) ([]*persistence.Task, error) diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 247f3ec7..a6355db4 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -40,6 +40,20 @@ func (m *MockPersistenceService) EXPECT() *MockPersistenceServiceMockRecorder { return m.recorder } +// AddWorkerToJobBlocklist mocks base method. +func (m *MockPersistenceService) AddWorkerToJobBlocklist(arg0 context.Context, arg1 *persistence.Job, arg2 *persistence.Worker, arg3 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddWorkerToJobBlocklist", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddWorkerToJobBlocklist indicates an expected call of AddWorkerToJobBlocklist. +func (mr *MockPersistenceServiceMockRecorder) AddWorkerToJobBlocklist(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkerToJobBlocklist", reflect.TypeOf((*MockPersistenceService)(nil).AddWorkerToJobBlocklist), arg0, arg1, arg2, arg3) +} + // AddWorkerToTaskFailedList mocks base method. func (m *MockPersistenceService) AddWorkerToTaskFailedList(arg0 context.Context, arg1 *persistence.Task, arg2 *persistence.Worker) (int, error) { m.ctrl.T.Helper() @@ -83,6 +97,21 @@ func (mr *MockPersistenceServiceMockRecorder) ClearFailureListOfTask(arg0, arg1 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearFailureListOfTask", reflect.TypeOf((*MockPersistenceService)(nil).ClearFailureListOfTask), arg0, arg1) } +// CountTaskFailuresOfWorker mocks base method. +func (m *MockPersistenceService) CountTaskFailuresOfWorker(arg0 context.Context, arg1 *persistence.Job, arg2 *persistence.Worker, arg3 string) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CountTaskFailuresOfWorker", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CountTaskFailuresOfWorker indicates an expected call of CountTaskFailuresOfWorker. +func (mr *MockPersistenceServiceMockRecorder) CountTaskFailuresOfWorker(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountTaskFailuresOfWorker", reflect.TypeOf((*MockPersistenceService)(nil).CountTaskFailuresOfWorker), arg0, arg1, arg2, arg3) +} + // CreateWorker mocks base method. func (m *MockPersistenceService) CreateWorker(arg0 context.Context, arg1 *persistence.Worker) error { m.ctrl.T.Helper() @@ -315,6 +344,21 @@ func (mr *MockPersistenceServiceMockRecorder) WorkerSeen(arg0, arg1 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkerSeen", reflect.TypeOf((*MockPersistenceService)(nil).WorkerSeen), arg0, arg1) } +// WorkersLeftToRun mocks base method. +func (m *MockPersistenceService) WorkersLeftToRun(arg0 context.Context, arg1 *persistence.Job, arg2 string) (map[string]bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WorkersLeftToRun", arg0, arg1, arg2) + ret0, _ := ret[0].(map[string]bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WorkersLeftToRun indicates an expected call of WorkersLeftToRun. +func (mr *MockPersistenceServiceMockRecorder) WorkersLeftToRun(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkersLeftToRun", reflect.TypeOf((*MockPersistenceService)(nil).WorkersLeftToRun), arg0, arg1, arg2) +} + // MockChangeBroadcaster is a mock of ChangeBroadcaster interface. type MockChangeBroadcaster struct { ctrl *gomock.Controller diff --git a/internal/manager/api_impl/worker_task_updates.go b/internal/manager/api_impl/worker_task_updates.go index ae6bb52f..4c280c89 100644 --- a/internal/manager/api_impl/worker_task_updates.go +++ b/internal/manager/api_impl/worker_task_updates.go @@ -159,7 +159,17 @@ func (f *Flamenco) onTaskFailed( if err != nil { return fmt.Errorf("adding worker to failure list of task: %w", err) } - // f.maybeBlocklistWorker(ctx, logger, worker, task) + + logger = logger.With().Str("taskType", task.Type).Logger() + shouldHardFail, err := f.maybeBlocklistWorker(ctx, logger, worker, task) + if err != nil { + return fmt.Errorf("block-listing worker: %w", err) + } + if shouldHardFail { + // Hard failure because of blocklisting should simply fail the entire job. + // There are no more workers left to finish it. + return f.failJobAfterCatastroficTaskFailure(ctx, logger, worker, task) + } // Determine whether this is soft or hard failure. threshold := f.config.Get().TaskFailAfterSoftFailCount @@ -173,6 +183,120 @@ func (f *Flamenco) onTaskFailed( return f.hardFailTask(ctx, logger, worker, task, numFailed) } +// maybeBlocklistWorker potentially block-lists the Worker, and checks whether +// there are any workers left to run tasks of this type. +// +// Returns "must hard-fail". That is, returns `false` if there are still workers +// left to run tasks of this type, on this job. +// +// If the worker is NOT block-listed at this moment, always returns `false`. +// +// Returns `true` if ALL workers that can execute this task type are blocked +// from working on this job. +func (f *Flamenco) maybeBlocklistWorker( + ctx context.Context, + logger zerolog.Logger, + worker *persistence.Worker, + task *persistence.Task, +) (bool, error) { + numFailures, err := f.persist.CountTaskFailuresOfWorker(ctx, task.Job, worker, task.Type) + if err != nil { + return false, fmt.Errorf("counting failures of worker on job %q, task type %q: %w", task.Job.UUID, task.Type, err) + } + // The received task update hasn't been persisted in the database yet, + // so we should count that too. + numFailures++ + + threshold := f.config.Get().BlocklistThreshold + if numFailures < threshold { + logger.Info().Int("numFailedTasks", numFailures).Msg("not enough failed tasks to blocklist worker") + // TODO: This might need special handling, as this worker will be blocked + // from retrying this particular task. It could have been the last worker to + // be allowed this task type; if that is the case, the job is now stuck. + return false, nil + } + + // Blocklist the Worker. + if err := f.blocklistWorker(ctx, logger, worker, task); err != nil { + return false, err + } + + // Return hard-failure if there are no workers left for this task type. + numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task) + return numWorkers == 0, err +} + +func (f *Flamenco) blocklistWorker( + ctx context.Context, + logger zerolog.Logger, + worker *persistence.Worker, + task *persistence.Task, +) error { + logger.Warn(). + Str("job", task.Job.UUID). + Msg("block-listing worker") + err := f.persist.AddWorkerToJobBlocklist(ctx, task.Job, worker, task.Type) + if err != nil { + return fmt.Errorf("adding worker to block list: %w", err) + } + + // TODO: requeue all tasks of this job & task type that were hard-failed by this worker. + return nil +} + +func (f *Flamenco) numWorkersCapableOfRunningTask(ctx context.Context, task *persistence.Task) (int, error) { + // See which workers are left to run tasks of this type, on this job, + workersLeft, err := f.persist.WorkersLeftToRun(ctx, task.Job, task.Type) + if err != nil { + return 0, fmt.Errorf("fetching workers available to run tasks of type %q on job %q: %w", + task.Job.UUID, task.Type, err) + } + + // Remove (from the list of available workers) those who failed this task before. + failers, err := f.persist.FetchTaskFailureList(ctx, task) + if err != nil { + return 0, fmt.Errorf("fetching failure list of task %q: %w", task.UUID, err) + } + for _, failure := range failers { + delete(workersLeft, failure.UUID) + } + + return len(workersLeft), nil +} + +// failJobAfterCatastroficTaskFailure fails the entire job. +// This function is meant to be called when a task is failed, causing a block of +// the worker, and leaving no workers any more to do tasks of this type. +func (f *Flamenco) failJobAfterCatastroficTaskFailure( + ctx context.Context, + logger zerolog.Logger, + worker *persistence.Worker, + task *persistence.Task, +) error { + taskLog := fmt.Sprintf( + "Task failed by worker %s, Manager will fail the entire job as there are no more workers left for tasks of type %q.", + worker.Identifier(), task.Type, + ) + if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil { + logger.Error().Err(err).Msg("error writing failure notice to task log") + } + + if err := f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusFailed); err != nil { + logger.Error(). + Err(err). + Str("newStatus", string(api.TaskStatusFailed)). + Msg("error changing task status") + } + + newJobStatus := api.JobStatusFailed + logger.Info(). + Str("job", task.Job.UUID). + Str("newJobStatus", string(newJobStatus)). + Msg("no more workers left to run tasks of this type, failing the entire job") + reason := fmt.Sprintf("no more workers left to run tasks of type %q", task.Type) + return f.stateMachine.JobStatusChange(ctx, task.Job, newJobStatus, reason) +} + func (f *Flamenco) hardFailTask( ctx context.Context, logger zerolog.Logger, diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index 2a7b601a..7cc29b25 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -464,24 +464,30 @@ func TestTaskUpdateFailed(t *testing.T) { WorkerID: &worker.ID, Job: &mockJob, Activity: "pre-update activity", + Type: "misc", } conf := config.Conf{ Base: config.Base{ TaskFailAfterSoftFailCount: 3, + BlocklistThreshold: 65535, // This test doesn't cover blocklisting. }, } mf.config.EXPECT().Get().Return(&conf).AnyTimes() + const numSubTests = 2 // Expect the task to be fetched for each sub-test: - mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil).Times(2) + mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil).Times(numSubTests) // Expect a 'touch' of the task for each sub-test: - mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), &mockTask).Times(2) - mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker).Times(2) + mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), &mockTask).Times(numSubTests) + mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker).Times(numSubTests) + + // Mimick that this is always first failure of this worker/job/tasktype combo: + mf.persistence.EXPECT().CountTaskFailuresOfWorker(gomock.Any(), &mockJob, &worker, "misc").Return(0, nil).Times(numSubTests) { - // Expect the Worker to be added to the list of workers. + // Expect the Worker to be added to the list of failed workers. // This returns 1, which is less than the failure threshold -> soft failure expected. mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil) @@ -515,6 +521,146 @@ func TestTaskUpdateFailed(t *testing.T) { } } +func TestBlockingAfterFailure(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + worker := testWorker() + + // Construct the JSON request object. + taskUpdate := api.TaskUpdateJSONRequestBody{ + TaskStatus: ptr(api.TaskStatusFailed), + } + + // Construct the task that's supposed to be updated. + taskID := "181eab68-1123-4790-93b1-94309a899411" + jobID := "e4719398-7cfa-4877-9bab-97c2d6c158b5" + mockJob := persistence.Job{UUID: jobID} + mockTask := persistence.Task{ + UUID: taskID, + Worker: &worker, + WorkerID: &worker.ID, + Job: &mockJob, + Activity: "pre-update activity", + Type: "misc", + } + + conf := config.Conf{ + Base: config.Base{ + TaskFailAfterSoftFailCount: 3, + BlocklistThreshold: 3, + }, + } + mf.config.EXPECT().Get().Return(&conf).AnyTimes() + + const numSubTests = 3 + // Expect the task to be fetched for each sub-test: + mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil).Times(numSubTests) + + // Expect a 'touch' of the task for each sub-test: + mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), &mockTask).Times(numSubTests) + mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker).Times(numSubTests) + + // Mimick that this is the 3rd of this worker/job/tasktype combo, and thus should trigger a block. + // Returns 2 because there have been 2 previous failures. + mf.persistence.EXPECT(). + CountTaskFailuresOfWorker(gomock.Any(), &mockJob, &worker, "misc"). + Return(2, nil). + Times(numSubTests) + + // Expect the worker to be blocked. + mf.persistence.EXPECT(). + AddWorkerToJobBlocklist(gomock.Any(), &mockJob, &worker, "misc"). + Times(numSubTests) + + { + // Mimick that there is another worker to work on this task, so the job should continue happily. + mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc"). + Return(map[string]bool{"60453eec-5a26-43e9-9da2-d00506d492cc": true}, nil) + mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask). + Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil) + + // Expect the Worker to be added to the list of failed workers for this task. + // This returns 1, which is less than the failure threshold -> soft failure. + mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil) + + // Expect soft failure of the task. + mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusSoftFailed) + mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID, + "Task failed by 1 worker, Manager will mark it as soft failure. 2 more failures will cause hard failure.") + + // Do the call. + echoCtx := mf.prepareMockedJSONRequest(taskUpdate) + requestWorkerStore(echoCtx, &worker) + err := mf.flamenco.TaskUpdate(echoCtx, taskID) + assert.NoError(t, err) + assertResponseEmpty(t, echoCtx) + } + + { + // Test without any workers left to run these tasks on this job due to blocklisting. This should fail the entire job. + mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc"). + Return(map[string]bool{}, nil) + mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask). + Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil) + + // Expect the Worker to be added to the list of failed workers for this task. + // This returns 1, which is less than the failure threshold -> soft failure if it were only based on this metric. + mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil) + + // Expect hard failure of the task, because there are no workers left to perfom it. + mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusFailed) + mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID, + "Task failed by worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c), Manager will fail the entire job "+ + "as there are no more workers left for tasks of type \"misc\".") + + // Expect failure of the job. + mf.stateMachine.EXPECT(). + JobStatusChange(gomock.Any(), &mockJob, api.JobStatusFailed, "no more workers left to run tasks of type \"misc\"") + + // Do the call. + echoCtx := mf.prepareMockedJSONRequest(taskUpdate) + requestWorkerStore(echoCtx, &worker) + err := mf.flamenco.TaskUpdate(echoCtx, taskID) + assert.NoError(t, err) + assertResponseEmpty(t, echoCtx) + } + + { + // Test that no worker has been blocklisted, but the one available one did fail this task. + // This also makes the task impossible to run, and should just fail the entire job. + theOtherFailingWorker := persistence.Worker{ + UUID: "ce312357-29cd-4389-81ab-4d43e30945f8", + } + mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc"). + Return(map[string]bool{theOtherFailingWorker.UUID: true}, nil) + mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask). + Return([]*persistence.Worker{&theOtherFailingWorker}, nil) + + // Expect the Worker to be added to the list of failed workers for this task. + // This returns 1, which is less than the failure threshold -> soft failure if it were only based on this metric. + mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil) + + // Expect hard failure of the task, because there are no workers left to perfom it. + mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusFailed) + mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID, + "Task failed by worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c), Manager will fail the entire job "+ + "as there are no more workers left for tasks of type \"misc\".") + + // Expect failure of the job. + mf.stateMachine.EXPECT(). + JobStatusChange(gomock.Any(), &mockJob, api.JobStatusFailed, "no more workers left to run tasks of type \"misc\"") + + // Do the call. + echoCtx := mf.prepareMockedJSONRequest(taskUpdate) + requestWorkerStore(echoCtx, &worker) + err := mf.flamenco.TaskUpdate(echoCtx, taskID) + assert.NoError(t, err) + assertResponseEmpty(t, echoCtx) + } +} + func TestMayWorkerRun(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() diff --git a/internal/manager/config/config.go b/internal/manager/config/config.go index 87767bd4..34d792d1 100644 --- a/internal/manager/config/config.go +++ b/internal/manager/config/config.go @@ -92,7 +92,7 @@ type Base struct { /* This many failures (on a given job+task type combination) will ban a worker * from that task type on that job. */ - // BlocklistThreshold int `yaml:"blocklist_threshold"` + BlocklistThreshold int `yaml:"blocklist_threshold"` // When this many workers have tried the task and failed, it will be hard-failed // (even when there are workers left that could technically retry the task). diff --git a/internal/manager/config/defaults.go b/internal/manager/config/defaults.go index ac542f9d..176b2d4f 100644 --- a/internal/manager/config/defaults.go +++ b/internal/manager/config/defaults.go @@ -39,7 +39,7 @@ var defaultConfig = Conf{ // // be accurate enough for this type of cleanup. // TaskCleanupMaxAge: 14 * 24 * time.Hour, - // BlocklistThreshold: 3, + BlocklistThreshold: 3, TaskFailAfterSoftFailCount: 3, // WorkerCleanupStatus: []string{string(api.WorkerStatusOffline)}, diff --git a/internal/manager/config/settings_test.go b/internal/manager/config/settings_test.go index 08f634c8..50eee65a 100644 --- a/internal/manager/config/settings_test.go +++ b/internal/manager/config/settings_test.go @@ -20,6 +20,8 @@ func TestDefaultSettings(t *testing.T) { assert.Equal(t, false, config.Variables["ffmpeg"].IsTwoWay) assert.Equal(t, "ffmpeg", config.Variables["ffmpeg"].Values[0].Value) assert.Equal(t, VariablePlatformLinux, config.Variables["ffmpeg"].Values[0].Platform) + + assert.Greater(t, config.BlocklistThreshold, 0) } func TestVariableValidation(t *testing.T) { diff --git a/internal/manager/persistence/db_migration.go b/internal/manager/persistence/db_migration.go index cafd3f5e..e4fc0d2d 100644 --- a/internal/manager/persistence/db_migration.go +++ b/internal/manager/persistence/db_migration.go @@ -7,7 +7,13 @@ import ( ) func (db *DB) migrate() error { - err := db.gormDB.AutoMigrate(&Job{}, &Task{}, &TaskFailure{}, &Worker{}) + err := db.gormDB.AutoMigrate( + &Job{}, + &JobBlock{}, + &Task{}, + &TaskFailure{}, + &Worker{}, + ) if err != nil { return fmt.Errorf("failed to automigrate database: %v", err) } diff --git a/internal/manager/persistence/jobs_blocklist.go b/internal/manager/persistence/jobs_blocklist.go new file mode 100644 index 00000000..8a11959f --- /dev/null +++ b/internal/manager/persistence/jobs_blocklist.go @@ -0,0 +1,92 @@ +package persistence + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "math" + "time" + + "gorm.io/gorm/clause" +) + +// JobBlock keeps track of which Worker is not allowed to run which task type on which job. +type JobBlock struct { + // Don't include the standard Gorm UpdatedAt or DeletedAt fields, as they're useless here. + // Entries will never be updated, and should never be soft-deleted but just purged from existence. + ID uint + CreatedAt time.Time + + JobID uint `gorm:"default:0;uniqueIndex:job_worker_tasktype"` + Job *Job `gorm:"foreignkey:JobID;references:ID;constraint:OnDelete:CASCADE"` + + WorkerID uint `gorm:"default:0;uniqueIndex:job_worker_tasktype"` + Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:CASCADE"` + + TaskType string `gorm:"uniqueIndex:job_worker_tasktype"` +} + +// AddWorkerToJobBlocklist prevents this Worker of getting any task, of this type, on this job, from the task scheduler. +func (db *DB) AddWorkerToJobBlocklist(ctx context.Context, job *Job, worker *Worker, taskType string) error { + entry := JobBlock{ + Job: job, + Worker: worker, + TaskType: taskType, + } + tx := db.gormDB.WithContext(ctx). + Clauses(clause.OnConflict{DoNothing: true}). + Create(&entry) + return tx.Error +} + +// WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job. +// +// NOTE: this does NOT consider the task failure list, which blocks individual +// workers from individual tasks. This is ONLY concerning the job blocklist. +func (db *DB) WorkersLeftToRun(ctx context.Context, job *Job, taskType string) (map[string]bool, error) { + // Find the IDs of the workers blocked on this job + tasktype combo. + blockedWorkers := db.gormDB. + Table("workers as blocked_workers"). + Select("blocked_workers.id"). + Joins("inner join job_blocks JB on blocked_workers.id = JB.worker_id"). + Where("JB.job_id = ?", job.ID). + Where("JB.task_type = ?", taskType) + + // Find the workers NOT blocked. + workers := []*Worker{} + tx := db.gormDB.WithContext(ctx). + Model(&Worker{}). + Select("uuid"). + Where("id not in (?)", blockedWorkers). + Scan(&workers) + if tx.Error != nil { + return nil, tx.Error + } + + // From the list of workers, construct the map of UUIDs. + uuidMap := map[string]bool{} + for _, worker := range workers { + uuidMap[worker.UUID] = true + } + + return uuidMap, nil +} + +// CountTaskFailuresOfWorker returns the number of task failures of this worker, on this particular job and task type. +func (db *DB) CountTaskFailuresOfWorker(ctx context.Context, job *Job, worker *Worker, taskType string) (int, error) { + var numFailures int64 + + tx := db.gormDB.WithContext(ctx). + Model(&TaskFailure{}). + Joins("inner join tasks T on task_failures.task_id = T.id"). + Where("task_failures.worker_id = ?", worker.ID). + Where("T.job_id = ?", job.ID). + Where("T.type = ?", taskType). + Count(&numFailures) + + if numFailures > math.MaxInt { + panic("overflow error in number of failures") + } + + return int(numFailures), tx.Error +} diff --git a/internal/manager/persistence/jobs_blocklist_test.go b/internal/manager/persistence/jobs_blocklist_test.go new file mode 100644 index 00000000..e80cf7e3 --- /dev/null +++ b/internal/manager/persistence/jobs_blocklist_test.go @@ -0,0 +1,137 @@ +package persistence + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// SPDX-License-Identifier: GPL-3.0-or-later + +func TestAddWorkerToJobBlocklist(t *testing.T) { + ctx, close, db, job, _ := jobTasksTestFixtures(t) + defer close() + + worker := createWorker(ctx, t, db) + + { + // Add a worker to the block list. + err := db.AddWorkerToJobBlocklist(ctx, job, worker, "blender") + assert.NoError(t, err) + + list := []JobBlock{} + tx := db.gormDB.Model(&JobBlock{}).Scan(&list) + assert.NoError(t, tx.Error) + if assert.Len(t, list, 1) { + entry := list[0] + assert.Equal(t, entry.JobID, job.ID) + assert.Equal(t, entry.WorkerID, worker.ID) + assert.Equal(t, entry.TaskType, "blender") + } + } + + { + // Adding the same worker again should be a no-op. + err := db.AddWorkerToJobBlocklist(ctx, job, worker, "blender") + assert.NoError(t, err) + + list := []JobBlock{} + tx := db.gormDB.Model(&JobBlock{}).Scan(&list) + assert.NoError(t, tx.Error) + assert.Len(t, list, 1, "No new entry should have been created") + } +} + +func TestWorkersLeftToRun(t *testing.T) { + ctx, close, db, job, _ := jobTasksTestFixtures(t) + defer close() + + // No workers. + left, err := db.WorkersLeftToRun(ctx, job, "blender") + assert.NoError(t, err) + assert.Empty(t, left) + + worker1 := createWorker(ctx, t, db) + worker2 := createWorkerFrom(ctx, t, db, *worker1) + + uuidMap := func(workers ...*Worker) map[string]bool { + theMap := map[string]bool{} + for _, worker := range workers { + theMap[worker.UUID] = true + } + return theMap + } + + // Two workers, no blocklist. + left, err = db.WorkersLeftToRun(ctx, job, "blender") + if assert.NoError(t, err) { + assert.Equal(t, uuidMap(worker1, worker2), left) + } + + // Two workers, one blocked. + _ = db.AddWorkerToJobBlocklist(ctx, job, worker1, "blender") + left, err = db.WorkersLeftToRun(ctx, job, "blender") + if assert.NoError(t, err) { + assert.Equal(t, uuidMap(worker2), left) + } + + // Two workers, both blocked. + _ = db.AddWorkerToJobBlocklist(ctx, job, worker2, "blender") + left, err = db.WorkersLeftToRun(ctx, job, "blender") + assert.NoError(t, err) + assert.Empty(t, left) + + // Two workers, unknown job. + fakeJob := Job{Model: Model{ID: 327}} + left, err = db.WorkersLeftToRun(ctx, &fakeJob, "blender") + if assert.NoError(t, err) { + assert.Equal(t, uuidMap(worker1, worker2), left) + } +} + +func TestCountTaskFailuresOfWorker(t *testing.T) { + ctx, close, db, dbJob, authoredJob := jobTasksTestFixtures(t) + defer close() + + task0, _ := db.FetchTask(ctx, authoredJob.Tasks[0].UUID) + task1, _ := db.FetchTask(ctx, authoredJob.Tasks[1].UUID) + task2, _ := db.FetchTask(ctx, authoredJob.Tasks[2].UUID) + + // Sanity check on the test data. + assert.Equal(t, "blender", task0.Type) + assert.Equal(t, "blender", task1.Type) + assert.Equal(t, "ffmpeg", task2.Type) + + worker1 := createWorker(ctx, t, db) + worker2 := createWorkerFrom(ctx, t, db, *worker1) + + // Store some failures for different tasks + _, _ = db.AddWorkerToTaskFailedList(ctx, task0, worker1) + _, _ = db.AddWorkerToTaskFailedList(ctx, task1, worker1) + _, _ = db.AddWorkerToTaskFailedList(ctx, task1, worker2) + _, _ = db.AddWorkerToTaskFailedList(ctx, task2, worker1) + + // Multiple failures. + numBlender1, err := db.CountTaskFailuresOfWorker(ctx, dbJob, worker1, "blender") + if assert.NoError(t, err) { + assert.Equal(t, 2, numBlender1) + } + + // Single failure, but multiple tasks exist of this type. + numBlender2, err := db.CountTaskFailuresOfWorker(ctx, dbJob, worker2, "blender") + if assert.NoError(t, err) { + assert.Equal(t, 1, numBlender2) + } + + // Single failure, only one task of this type exists. + numFFMpeg1, err := db.CountTaskFailuresOfWorker(ctx, dbJob, worker1, "ffmpeg") + if assert.NoError(t, err) { + assert.Equal(t, 1, numFFMpeg1) + } + + // No failure. + numFFMpeg2, err := db.CountTaskFailuresOfWorker(ctx, dbJob, worker2, "ffmpeg") + if assert.NoError(t, err) { + assert.Equal(t, 0, numFFMpeg2) + } +}