From 64512c81bae04bd64d923a639d342c595efd0130 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Mon, 27 Jun 2022 11:25:18 +0200 Subject: [PATCH] Manager: implement OAPI operations to fetch blocklist & delete items --- internal/manager/api_impl/interfaces.go | 3 + internal/manager/api_impl/jobs.go | 62 +++++++++++++++++++ .../api_impl/mocks/api_impl_mock.gen.go | 29 +++++++++ .../manager/persistence/jobs_blocklist.go | 42 +++++++++++++ .../persistence/jobs_blocklist_test.go | 49 +++++++++++++++ 5 files changed, 185 insertions(+) diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 3448515a..42ffd87e 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -56,6 +56,9 @@ type PersistenceService interface { // AddWorkerToJobBlocklist prevents this Worker of getting any task, of this type, on this job, from the task scheduler. AddWorkerToJobBlocklist(ctx context.Context, job *persistence.Job, worker *persistence.Worker, taskType string) error + FetchJobBlocklist(ctx context.Context, jobUUID string) ([]persistence.JobBlock, error) + RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error + // WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job. WorkersLeftToRun(ctx context.Context, job *persistence.Job, taskType string) (map[string]bool, error) // CountTaskFailuresOfWorker returns the number of task failures of this worker, on this particular job and task type. diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index 71eed734..98e294c2 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -243,6 +243,68 @@ func (f *Flamenco) FetchTaskLogTail(e echo.Context, taskID string) error { return e.String(http.StatusOK, tail) } +func (f *Flamenco) FetchJobBlocklist(e echo.Context, jobID string) error { + if !uuid.IsValid(jobID) { + return sendAPIError(e, http.StatusBadRequest, "job ID should be a UUID") + } + + logger := requestLogger(e).With().Str("job", jobID).Logger() + ctx := e.Request().Context() + + list, err := f.persist.FetchJobBlocklist(ctx, jobID) + if err != nil { + logger.Error().Err(err).Msg("error fetching job blocklist") + return sendAPIError(e, http.StatusInternalServerError, "error fetching job blocklist: %v", err) + } + + apiList := api.JobBlocklist{} + for _, item := range list { + apiList = append(apiList, api.JobBlocklistEntry{ + TaskType: item.TaskType, + WorkerId: item.Worker.UUID, + }) + } + + return e.JSON(http.StatusOK, apiList) +} + +func (f *Flamenco) RemoveJobBlocklist(e echo.Context, jobID string) error { + if !uuid.IsValid(jobID) { + return sendAPIError(e, http.StatusBadRequest, "job ID should be a UUID") + } + + logger := requestLogger(e).With().Str("job", jobID).Logger() + ctx := e.Request().Context() + + var job api.RemoveJobBlocklistJSONRequestBody + if err := e.Bind(&job); err != nil { + logger.Warn().Err(err).Msg("bad request received") + return sendAPIError(e, http.StatusBadRequest, "invalid format") + } + + var lastErr error + for _, entry := range job { + sublogger := logger.With(). + Str("worker", entry.WorkerId). + Str("taskType", entry.TaskType). + Logger() + err := f.persist.RemoveFromJobBlocklist(ctx, jobID, entry.WorkerId, entry.TaskType) + if err != nil { + sublogger.Error().Err(err).Msg("error removing entry from job blocklist") + lastErr = err + continue + } + sublogger.Info().Msg("removed entry from job blocklist") + } + + if lastErr != nil { + return sendAPIError(e, http.StatusInternalServerError, + "error removing at least one entry from the blocklist: %v", lastErr) + } + + return e.NoContent(http.StatusNoContent) +} + func jobDBtoAPI(dbJob *persistence.Job) api.Job { apiJob := api.Job{ SubmittedJob: api.SubmittedJob{ diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index d6215974..79889d73 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -142,6 +142,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchJob(arg0, arg1 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJob", reflect.TypeOf((*MockPersistenceService)(nil).FetchJob), arg0, arg1) } +// FetchJobBlocklist mocks base method. +func (m *MockPersistenceService) FetchJobBlocklist(arg0 context.Context, arg1 string) ([]persistence.JobBlock, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchJobBlocklist", arg0, arg1) + ret0, _ := ret[0].([]persistence.JobBlock) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchJobBlocklist indicates an expected call of FetchJobBlocklist. +func (mr *MockPersistenceServiceMockRecorder) FetchJobBlocklist(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJobBlocklist", reflect.TypeOf((*MockPersistenceService)(nil).FetchJobBlocklist), arg0, arg1) +} + // FetchTask mocks base method. func (m *MockPersistenceService) FetchTask(arg0 context.Context, arg1 string) (*persistence.Task, error) { m.ctrl.T.Helper() @@ -232,6 +247,20 @@ func (mr *MockPersistenceServiceMockRecorder) QueryJobs(arg0, arg1 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryJobs", reflect.TypeOf((*MockPersistenceService)(nil).QueryJobs), arg0, arg1) } +// RemoveFromJobBlocklist mocks base method. +func (m *MockPersistenceService) RemoveFromJobBlocklist(arg0 context.Context, arg1, arg2, arg3 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RemoveFromJobBlocklist", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// RemoveFromJobBlocklist indicates an expected call of RemoveFromJobBlocklist. +func (mr *MockPersistenceServiceMockRecorder) RemoveFromJobBlocklist(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveFromJobBlocklist", reflect.TypeOf((*MockPersistenceService)(nil).RemoveFromJobBlocklist), arg0, arg1, arg2, arg3) +} + // SaveTask mocks base method. func (m *MockPersistenceService) SaveTask(arg0 context.Context, arg1 *persistence.Task) error { m.ctrl.T.Helper() diff --git a/internal/manager/persistence/jobs_blocklist.go b/internal/manager/persistence/jobs_blocklist.go index 8a11959f..8c7272d8 100644 --- a/internal/manager/persistence/jobs_blocklist.go +++ b/internal/manager/persistence/jobs_blocklist.go @@ -39,6 +39,48 @@ func (db *DB) AddWorkerToJobBlocklist(ctx context.Context, job *Job, worker *Wor return tx.Error } +func (db *DB) FetchJobBlocklist(ctx context.Context, jobUUID string) ([]JobBlock, error) { + entries := []JobBlock{} + + tx := db.gormDB.WithContext(ctx). + Model(JobBlock{}). + Joins("inner join jobs on jobs.id = job_blocks.job_id"). + Joins("Worker"). + Where("jobs.uuid = ?", jobUUID). + Scan(&entries) + return entries, tx.Error +} + +func (db *DB) RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error { + // Find the job ID. + job := Job{} + tx := db.gormDB.WithContext(ctx). + Select("id"). + Where("uuid = ?", jobUUID). + Find(&job) + if tx.Error != nil { + return jobError(tx.Error, "fetching job with uuid=%q", jobUUID) + } + + // Find the worker ID. + worker := Worker{} + tx = db.gormDB.WithContext(ctx). + Select("id"). + Where("uuid = ?", workerUUID). + Find(&worker) + if tx.Error != nil { + return workerError(tx.Error, "fetching worker with uuid=%q", workerUUID) + } + + // Remove the blocklist entry. + tx = db.gormDB.WithContext(ctx). + Where("job_id = ?", job.ID). + Where("worker_id = ?", worker.ID). + Where("task_type = ?", taskType). + Delete(JobBlock{}) + return tx.Error +} + // WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job. // // NOTE: this does NOT consider the task failure list, which blocks individual diff --git a/internal/manager/persistence/jobs_blocklist_test.go b/internal/manager/persistence/jobs_blocklist_test.go index e80cf7e3..78ca8a58 100644 --- a/internal/manager/persistence/jobs_blocklist_test.go +++ b/internal/manager/persistence/jobs_blocklist_test.go @@ -42,6 +42,55 @@ func TestAddWorkerToJobBlocklist(t *testing.T) { } } +func TestFetchJobBlocklist(t *testing.T) { + ctx, close, db, job, _ := jobTasksTestFixtures(t) + defer close() + + // Add a worker to the block list. + worker := createWorker(ctx, t, db) + err := db.AddWorkerToJobBlocklist(ctx, job, worker, "blender") + assert.NoError(t, err) + + list, err := db.FetchJobBlocklist(ctx, job.UUID) + assert.NoError(t, err) + + if assert.Len(t, list, 1) { + entry := list[0] + assert.Equal(t, entry.JobID, job.ID) + assert.Equal(t, entry.WorkerID, worker.ID) + assert.Equal(t, entry.TaskType, "blender") + + assert.Nil(t, entry.Job, "should NOT fetch the entire job") + assert.NotNil(t, entry.Worker, "SHOULD fetch the entire worker") + } +} + +func TestRemoveFromJobBlocklist(t *testing.T) { + ctx, close, db, job, _ := jobTasksTestFixtures(t) + defer close() + + // Add a worker and some entries to the block list. + worker := createWorker(ctx, t, db) + err := db.AddWorkerToJobBlocklist(ctx, job, worker, "blender") + assert.NoError(t, err) + err = db.AddWorkerToJobBlocklist(ctx, job, worker, "ffmpeg") + assert.NoError(t, err) + + // Remove an entry. + err = db.RemoveFromJobBlocklist(ctx, job.UUID, worker.UUID, "ffmpeg") + assert.NoError(t, err) + + // Check that the other entry is still there. + list, err := db.FetchJobBlocklist(ctx, job.UUID) + assert.NoError(t, err) + + if assert.Len(t, list, 1) { + entry := list[0] + assert.Equal(t, entry.JobID, job.ID) + assert.Equal(t, entry.WorkerID, worker.ID) + assert.Equal(t, entry.TaskType, "blender") + } +} func TestWorkersLeftToRun(t *testing.T) { ctx, close, db, job, _ := jobTasksTestFixtures(t) defer close()