diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 2b13d82d..63a999ba 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -75,6 +75,7 @@ type TaskStateMachine interface { JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus, reason string) error RequeueActiveTasksOfWorker(ctx context.Context, worker *persistence.Worker, reason string) error + RequeueFailedTasksOfWorkerOfJob(ctx context.Context, worker *persistence.Worker, job *persistence.Job, reason string) error } // TaskStateMachine should be a subset of task_state_machine.StateMachine. diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index cabc3193..9f84d763 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -679,6 +679,20 @@ func (mr *MockTaskStateMachineMockRecorder) RequeueActiveTasksOfWorker(arg0, arg return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequeueActiveTasksOfWorker", reflect.TypeOf((*MockTaskStateMachine)(nil).RequeueActiveTasksOfWorker), arg0, arg1, arg2) } +// RequeueFailedTasksOfWorkerOfJob mocks base method. +func (m *MockTaskStateMachine) RequeueFailedTasksOfWorkerOfJob(arg0 context.Context, arg1 *persistence.Worker, arg2 *persistence.Job, arg3 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RequeueFailedTasksOfWorkerOfJob", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// RequeueFailedTasksOfWorkerOfJob indicates an expected call of RequeueFailedTasksOfWorkerOfJob. +func (mr *MockTaskStateMachineMockRecorder) RequeueFailedTasksOfWorkerOfJob(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequeueFailedTasksOfWorkerOfJob", reflect.TypeOf((*MockTaskStateMachine)(nil).RequeueFailedTasksOfWorkerOfJob), arg0, arg1, arg2, arg3) +} + // TaskStatusChange mocks base method. func (m *MockTaskStateMachine) TaskStatusChange(arg0 context.Context, arg1 *persistence.Task, arg2 api.TaskStatus) error { m.ctrl.T.Helper() diff --git a/internal/manager/api_impl/test-flamenco-manager.yaml b/internal/manager/api_impl/test-flamenco-manager.yaml index 65a596d6..e37ce875 100644 --- a/internal/manager/api_impl/test-flamenco-manager.yaml +++ b/internal/manager/api_impl/test-flamenco-manager.yaml @@ -2,17 +2,17 @@ _meta: version: 3 mode: develop -listen: '[::0]:8083' +listen: "[::0]:8083" own_url: http://192.168.3.108:8083/ flamenco: http://localhost:51234/ manager_id: 5852bc5198377351f95d103e manager_secret: SRVwA7wAxPRfudvqTDOLXwPn1cDRIlADz5Ef9kHk7d52Us task_logs_path: /tmp/flamenco-unittests -blacklist_threshold: 3 +blocklist_threshold: 3 shaman: - enabled: false + enabled: false variables: blender: @@ -25,7 +25,7 @@ variables: platform: linux value: /opt/myblenderbuild/blender - platform: windows - value: 'c:/temp/blender.exe' + value: "c:/temp/blender.exe" - platform: darwin value: /opt/myblenderbuild/blender ffmpeg: diff --git a/internal/manager/api_impl/worker_task_updates.go b/internal/manager/api_impl/worker_task_updates.go index 4c280c89..1977baa5 100644 --- a/internal/manager/api_impl/worker_task_updates.go +++ b/internal/manager/api_impl/worker_task_updates.go @@ -161,15 +161,22 @@ func (f *Flamenco) onTaskFailed( } logger = logger.With().Str("taskType", task.Type).Logger() - shouldHardFail, err := f.maybeBlocklistWorker(ctx, logger, worker, task) + wasBlacklisted, shoudlFailJob, err := f.maybeBlocklistWorker(ctx, logger, worker, task) if err != nil { return fmt.Errorf("block-listing worker: %w", err) } - if shouldHardFail { - // Hard failure because of blocklisting should simply fail the entire job. - // There are no more workers left to finish it. + if shoudlFailJob { + // There are no more workers left to finish the job. return f.failJobAfterCatastroficTaskFailure(ctx, logger, worker, task) } + if wasBlacklisted { + // Requeue all tasks of this job & task type that were hard-failed before by this worker. + reason := fmt.Sprintf("worker %s was blocked from tasks of type %q", worker.Name, task.Type) + err := f.stateMachine.RequeueFailedTasksOfWorkerOfJob(ctx, worker, task.Job, reason) + if err != nil { + return err + } + } // Determine whether this is soft or hard failure. threshold := f.config.Get().TaskFailAfterSoftFailCount @@ -186,22 +193,18 @@ func (f *Flamenco) onTaskFailed( // maybeBlocklistWorker potentially block-lists the Worker, and checks whether // there are any workers left to run tasks of this type. // -// Returns "must hard-fail". That is, returns `false` if there are still workers -// left to run tasks of this type, on this job. -// -// If the worker is NOT block-listed at this moment, always returns `false`. -// -// Returns `true` if ALL workers that can execute this task type are blocked -// from working on this job. +// Returns whether the worker was blacklisted, and whether the entire job should +// be failed (in case this was the last worker that could have worked on this +// task). func (f *Flamenco) maybeBlocklistWorker( ctx context.Context, logger zerolog.Logger, worker *persistence.Worker, task *persistence.Task, -) (bool, error) { +) (wasBlacklisted, shouldFailJob bool, err error) { numFailures, err := f.persist.CountTaskFailuresOfWorker(ctx, task.Job, worker, task.Type) if err != nil { - return false, fmt.Errorf("counting failures of worker on job %q, task type %q: %w", task.Job.UUID, task.Type, err) + return false, false, fmt.Errorf("counting failures of worker on job %q, task type %q: %w", task.Job.UUID, task.Type, err) } // The received task update hasn't been persisted in the database yet, // so we should count that too. @@ -213,17 +216,17 @@ func (f *Flamenco) maybeBlocklistWorker( // TODO: This might need special handling, as this worker will be blocked // from retrying this particular task. It could have been the last worker to // be allowed this task type; if that is the case, the job is now stuck. - return false, nil + return false, false, nil } // Blocklist the Worker. if err := f.blocklistWorker(ctx, logger, worker, task); err != nil { - return false, err + return true, false, err } // Return hard-failure if there are no workers left for this task type. numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task) - return numWorkers == 0, err + return true, numWorkers == 0, err } func (f *Flamenco) blocklistWorker( @@ -239,8 +242,6 @@ func (f *Flamenco) blocklistWorker( if err != nil { return fmt.Errorf("adding worker to block list: %w", err) } - - // TODO: requeue all tasks of this job & task type that were hard-failed by this worker. return nil } diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index 04f0fdf9..0da77621 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -590,6 +590,12 @@ func TestBlockingAfterFailure(t *testing.T) { mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID, "Task failed by 1 worker, Manager will mark it as soft failure. 2 more failures will cause hard failure.") + // Because the job didn't fail in its entirety, the tasks previously failed + // by the Worker should be requeued so they can be picked up by another. + mf.stateMachine.EXPECT().RequeueFailedTasksOfWorkerOfJob( + gomock.Any(), &worker, &mockJob, + "worker дрон was blocked from tasks of type \"misc\"") + // Do the call. echoCtx := mf.prepareMockedJSONRequest(taskUpdate) requestWorkerStore(echoCtx, &worker) @@ -619,6 +625,8 @@ func TestBlockingAfterFailure(t *testing.T) { mf.stateMachine.EXPECT(). JobStatusChange(gomock.Any(), &mockJob, api.JobStatusFailed, "no more workers left to run tasks of type \"misc\"") + // Because the job failed, there is no need to re-queue any tasks previously failed by this worker. + // Do the call. echoCtx := mf.prepareMockedJSONRequest(taskUpdate) requestWorkerStore(echoCtx, &worker) @@ -652,6 +660,8 @@ func TestBlockingAfterFailure(t *testing.T) { mf.stateMachine.EXPECT(). JobStatusChange(gomock.Any(), &mockJob, api.JobStatusFailed, "no more workers left to run tasks of type \"misc\"") + // Because the job failed, there is no need to re-queue any tasks previously failed by this worker. + // Do the call. echoCtx := mf.prepareMockedJSONRequest(taskUpdate) requestWorkerStore(echoCtx, &worker) @@ -743,5 +753,4 @@ func TestMayWorkerRun(t *testing.T) { StatusChangeRequested: true, }) } - } diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index c2f2506b..3ff44c0b 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -287,6 +287,21 @@ func (db *DB) FetchTasksOfWorkerInStatus(ctx context.Context, worker *Worker, ta return result, nil } +func (db *DB) FetchTasksOfWorkerInStatusOfJob(ctx context.Context, worker *Worker, taskStatus api.TaskStatus, job *Job) ([]*Task, error) { + result := []*Task{} + tx := db.gormDB.WithContext(ctx). + Model(&Task{}). + Joins("Job"). + Where("tasks.worker_id = ?", worker.ID). + Where("tasks.status = ?", taskStatus). + Where("job.id = ?", job.ID). + Scan(&result) + if tx.Error != nil { + return nil, taskError(tx.Error, "finding tasks of worker %s in status %q and job %s", worker.UUID, taskStatus, job.UUID) + } + return result, nil +} + func (db *DB) JobHasTasksInStatus(ctx context.Context, job *Job, taskStatus api.TaskStatus) (bool, error) { var numTasksInStatus int64 tx := db.gormDB.WithContext(ctx). diff --git a/internal/manager/task_state_machine/interfaces.go b/internal/manager/task_state_machine/interfaces.go index 6a6f25f6..39d1a954 100644 --- a/internal/manager/task_state_machine/interfaces.go +++ b/internal/manager/task_state_machine/interfaces.go @@ -34,6 +34,7 @@ type PersistenceService interface { FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*persistence.Job, error) FetchTasksOfWorkerInStatus(context.Context, *persistence.Worker, api.TaskStatus) ([]*persistence.Task, error) + FetchTasksOfWorkerInStatusOfJob(context.Context, *persistence.Worker, api.TaskStatus, *persistence.Job) ([]*persistence.Task, error) } // PersistenceService should be a subset of persistence.DB diff --git a/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go b/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go index 90eeddfd..92503fe4 100644 --- a/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go +++ b/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go @@ -93,6 +93,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTasksOfWorkerInStatus(arg0, a return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTasksOfWorkerInStatus", reflect.TypeOf((*MockPersistenceService)(nil).FetchTasksOfWorkerInStatus), arg0, arg1, arg2) } +// FetchTasksOfWorkerInStatusOfJob mocks base method. +func (m *MockPersistenceService) FetchTasksOfWorkerInStatusOfJob(arg0 context.Context, arg1 *persistence.Worker, arg2 api.TaskStatus, arg3 *persistence.Job) ([]*persistence.Task, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchTasksOfWorkerInStatusOfJob", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].([]*persistence.Task) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchTasksOfWorkerInStatusOfJob indicates an expected call of FetchTasksOfWorkerInStatusOfJob. +func (mr *MockPersistenceServiceMockRecorder) FetchTasksOfWorkerInStatusOfJob(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTasksOfWorkerInStatusOfJob", reflect.TypeOf((*MockPersistenceService)(nil).FetchTasksOfWorkerInStatusOfJob), arg0, arg1, arg2, arg3) +} + // JobHasTasksInStatus mocks base method. func (m *MockPersistenceService) JobHasTasksInStatus(arg0 context.Context, arg1 *persistence.Job, arg2 api.TaskStatus) (bool, error) { m.ctrl.T.Helper() diff --git a/internal/manager/task_state_machine/worker_requeue.go b/internal/manager/task_state_machine/worker_requeue.go index 552f2ddf..00101e24 100644 --- a/internal/manager/task_state_machine/worker_requeue.go +++ b/internal/manager/task_state_machine/worker_requeue.go @@ -4,7 +4,6 @@ package task_state_machine import ( "context" - "fmt" "git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/pkg/api" @@ -18,17 +17,47 @@ func (sm *StateMachine) RequeueActiveTasksOfWorker( ctx context.Context, worker *persistence.Worker, reason string, +) error { + // Fetch the tasks to update. + tasks, err := sm.persist.FetchTasksOfWorkerInStatus( + ctx, worker, api.TaskStatusActive) + if err != nil { + return err + } + + return sm.requeueTasksOfWorker(ctx, tasks, worker, reason) +} + +// RequeueFailedTasksOfWorkerOfJob re-queues all failed tasks of this worker on this job. +// +// `reason`: a string that can be appended to text like "Task requeued because " +func (sm *StateMachine) RequeueFailedTasksOfWorkerOfJob( + ctx context.Context, + worker *persistence.Worker, + job *persistence.Job, + reason string, +) error { + // Fetch the tasks to update. + tasks, err := sm.persist.FetchTasksOfWorkerInStatusOfJob( + ctx, worker, api.TaskStatusFailed, job) + if err != nil { + return err + } + + return sm.requeueTasksOfWorker(ctx, tasks, worker, reason) +} + +func (sm *StateMachine) requeueTasksOfWorker( + ctx context.Context, + tasks []*persistence.Task, + worker *persistence.Worker, + reason string, ) error { logger := log.With(). Str("worker", worker.UUID). + Str("reason", reason). Logger() - // Fetch the tasks to update. - tasks, err := sm.persist.FetchTasksOfWorkerInStatus(ctx, worker, api.TaskStatusActive) - if err != nil { - return fmt.Errorf("fetching tasks of worker %s in status %q: %w", worker.UUID, api.TaskStatusActive, err) - } - // Run each task change through the task state machine. var lastErr error for _, task := range tasks {