diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index f7d5927a..b23eed89 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -58,6 +58,7 @@ type PersistenceService interface { AddWorkerToJobBlocklist(ctx context.Context, job *persistence.Job, worker *persistence.Worker, taskType string) error FetchJobBlocklist(ctx context.Context, jobUUID string) ([]persistence.JobBlock, error) RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error + ClearJobBlocklist(ctx context.Context, job *persistence.Job) error // WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job. WorkersLeftToRun(ctx context.Context, job *persistence.Job, taskType string) (map[string]bool, error) diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index 7f501d0f..2f382867 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -148,6 +148,10 @@ func (f *Flamenco) SetJobStatus(e echo.Context, jobID string) error { logger.Error().Err(err).Msg("error clearing failure list") return sendAPIError(e, http.StatusInternalServerError, "unexpected error clearing the job's tasks' failure list") } + if err := f.persist.ClearJobBlocklist(ctx, dbJob); err != nil { + logger.Error().Err(err).Msg("error clearing failure list") + return sendAPIError(e, http.StatusInternalServerError, "unexpected error clearing the job's tasks' failure list") + } } return e.NoContent(http.StatusNoContent) diff --git a/internal/manager/api_impl/jobs_test.go b/internal/manager/api_impl/jobs_test.go index ab57101b..a1b7bcdf 100644 --- a/internal/manager/api_impl/jobs_test.go +++ b/internal/manager/api_impl/jobs_test.go @@ -217,6 +217,7 @@ func TestSetJobStatusFailedToRequeueing(t *testing.T) { mf.persistence.EXPECT().FetchJob(ctx, jobID).Return(&dbJob, nil) mf.stateMachine.EXPECT().JobStatusChange(ctx, &dbJob, statusUpdate.Status, "someone pushed a button") mf.persistence.EXPECT().ClearFailureListOfJob(ctx, &dbJob) + mf.persistence.EXPECT().ClearJobBlocklist(ctx, &dbJob) // Do the call. err := mf.flamenco.SetJobStatus(echoCtx, jobID) diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index a99a69c9..c7dde833 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -98,6 +98,20 @@ func (mr *MockPersistenceServiceMockRecorder) ClearFailureListOfTask(arg0, arg1 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearFailureListOfTask", reflect.TypeOf((*MockPersistenceService)(nil).ClearFailureListOfTask), arg0, arg1) } +// ClearJobBlocklist mocks base method. +func (m *MockPersistenceService) ClearJobBlocklist(arg0 context.Context, arg1 *persistence.Job) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ClearJobBlocklist", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ClearJobBlocklist indicates an expected call of ClearJobBlocklist. +func (mr *MockPersistenceServiceMockRecorder) ClearJobBlocklist(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearJobBlocklist", reflect.TypeOf((*MockPersistenceService)(nil).ClearJobBlocklist), arg0, arg1) +} + // CountTaskFailuresOfWorker mocks base method. func (m *MockPersistenceService) CountTaskFailuresOfWorker(arg0 context.Context, arg1 *persistence.Job, arg2 *persistence.Worker, arg3 string) (int, error) { m.ctrl.T.Helper() diff --git a/internal/manager/persistence/jobs_blocklist.go b/internal/manager/persistence/jobs_blocklist.go index 8c7272d8..1ac69ac3 100644 --- a/internal/manager/persistence/jobs_blocklist.go +++ b/internal/manager/persistence/jobs_blocklist.go @@ -51,6 +51,14 @@ func (db *DB) FetchJobBlocklist(ctx context.Context, jobUUID string) ([]JobBlock return entries, tx.Error } +// ClearJobBlocklist removes the entire blocklist of this job. +func (db *DB) ClearJobBlocklist(ctx context.Context, job *Job) error { + tx := db.gormDB.WithContext(ctx). + Where("job_id = ?", job.ID). + Delete(JobBlock{}) + return tx.Error +} + func (db *DB) RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error { // Find the job ID. job := Job{} diff --git a/internal/manager/persistence/jobs_blocklist_test.go b/internal/manager/persistence/jobs_blocklist_test.go index 78ca8a58..a87cb5d1 100644 --- a/internal/manager/persistence/jobs_blocklist_test.go +++ b/internal/manager/persistence/jobs_blocklist_test.go @@ -65,6 +65,27 @@ func TestFetchJobBlocklist(t *testing.T) { } } +func TestClearJobBlocklist(t *testing.T) { + ctx, close, db, job, _ := jobTasksTestFixtures(t) + defer close() + + // Add a worker and some entries to the block list. + worker := createWorker(ctx, t, db) + err := db.AddWorkerToJobBlocklist(ctx, job, worker, "blender") + assert.NoError(t, err) + err = db.AddWorkerToJobBlocklist(ctx, job, worker, "ffmpeg") + assert.NoError(t, err) + + // Clear the blocklist. + err = db.ClearJobBlocklist(ctx, job) + assert.NoError(t, err) + + // Check that it is indeed empty. + list, err := db.FetchJobBlocklist(ctx, job.UUID) + assert.NoError(t, err) + assert.Empty(t, list) +} + func TestRemoveFromJobBlocklist(t *testing.T) { ctx, close, db, job, _ := jobTasksTestFixtures(t) defer close() @@ -91,6 +112,7 @@ func TestRemoveFromJobBlocklist(t *testing.T) { assert.Equal(t, entry.TaskType, "blender") } } + func TestWorkersLeftToRun(t *testing.T) { ctx, close, db, job, _ := jobTasksTestFixtures(t) defer close()