Manager: implement OAPI operations to fetch blocklist & delete items

This commit is contained in:
Sybren A. Stüvel 2022-06-27 11:25:18 +02:00
parent 1353d1df0f
commit 64512c81ba
5 changed files with 185 additions and 0 deletions

View File

@ -56,6 +56,9 @@ type PersistenceService interface {
// AddWorkerToJobBlocklist prevents this Worker of getting any task, of this type, on this job, from the task scheduler.
AddWorkerToJobBlocklist(ctx context.Context, job *persistence.Job, worker *persistence.Worker, taskType string) error
FetchJobBlocklist(ctx context.Context, jobUUID string) ([]persistence.JobBlock, error)
RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error
// WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job.
WorkersLeftToRun(ctx context.Context, job *persistence.Job, taskType string) (map[string]bool, error)
// CountTaskFailuresOfWorker returns the number of task failures of this worker, on this particular job and task type.

View File

@ -243,6 +243,68 @@ func (f *Flamenco) FetchTaskLogTail(e echo.Context, taskID string) error {
return e.String(http.StatusOK, tail)
}
func (f *Flamenco) FetchJobBlocklist(e echo.Context, jobID string) error {
if !uuid.IsValid(jobID) {
return sendAPIError(e, http.StatusBadRequest, "job ID should be a UUID")
}
logger := requestLogger(e).With().Str("job", jobID).Logger()
ctx := e.Request().Context()
list, err := f.persist.FetchJobBlocklist(ctx, jobID)
if err != nil {
logger.Error().Err(err).Msg("error fetching job blocklist")
return sendAPIError(e, http.StatusInternalServerError, "error fetching job blocklist: %v", err)
}
apiList := api.JobBlocklist{}
for _, item := range list {
apiList = append(apiList, api.JobBlocklistEntry{
TaskType: item.TaskType,
WorkerId: item.Worker.UUID,
})
}
return e.JSON(http.StatusOK, apiList)
}
func (f *Flamenco) RemoveJobBlocklist(e echo.Context, jobID string) error {
if !uuid.IsValid(jobID) {
return sendAPIError(e, http.StatusBadRequest, "job ID should be a UUID")
}
logger := requestLogger(e).With().Str("job", jobID).Logger()
ctx := e.Request().Context()
var job api.RemoveJobBlocklistJSONRequestBody
if err := e.Bind(&job); err != nil {
logger.Warn().Err(err).Msg("bad request received")
return sendAPIError(e, http.StatusBadRequest, "invalid format")
}
var lastErr error
for _, entry := range job {
sublogger := logger.With().
Str("worker", entry.WorkerId).
Str("taskType", entry.TaskType).
Logger()
err := f.persist.RemoveFromJobBlocklist(ctx, jobID, entry.WorkerId, entry.TaskType)
if err != nil {
sublogger.Error().Err(err).Msg("error removing entry from job blocklist")
lastErr = err
continue
}
sublogger.Info().Msg("removed entry from job blocklist")
}
if lastErr != nil {
return sendAPIError(e, http.StatusInternalServerError,
"error removing at least one entry from the blocklist: %v", lastErr)
}
return e.NoContent(http.StatusNoContent)
}
func jobDBtoAPI(dbJob *persistence.Job) api.Job {
apiJob := api.Job{
SubmittedJob: api.SubmittedJob{

View File

@ -142,6 +142,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchJob(arg0, arg1 interface{}) *
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJob", reflect.TypeOf((*MockPersistenceService)(nil).FetchJob), arg0, arg1)
}
// FetchJobBlocklist mocks base method.
func (m *MockPersistenceService) FetchJobBlocklist(arg0 context.Context, arg1 string) ([]persistence.JobBlock, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchJobBlocklist", arg0, arg1)
ret0, _ := ret[0].([]persistence.JobBlock)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchJobBlocklist indicates an expected call of FetchJobBlocklist.
func (mr *MockPersistenceServiceMockRecorder) FetchJobBlocklist(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJobBlocklist", reflect.TypeOf((*MockPersistenceService)(nil).FetchJobBlocklist), arg0, arg1)
}
// FetchTask mocks base method.
func (m *MockPersistenceService) FetchTask(arg0 context.Context, arg1 string) (*persistence.Task, error) {
m.ctrl.T.Helper()
@ -232,6 +247,20 @@ func (mr *MockPersistenceServiceMockRecorder) QueryJobs(arg0, arg1 interface{})
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryJobs", reflect.TypeOf((*MockPersistenceService)(nil).QueryJobs), arg0, arg1)
}
// RemoveFromJobBlocklist mocks base method.
func (m *MockPersistenceService) RemoveFromJobBlocklist(arg0 context.Context, arg1, arg2, arg3 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RemoveFromJobBlocklist", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(error)
return ret0
}
// RemoveFromJobBlocklist indicates an expected call of RemoveFromJobBlocklist.
func (mr *MockPersistenceServiceMockRecorder) RemoveFromJobBlocklist(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveFromJobBlocklist", reflect.TypeOf((*MockPersistenceService)(nil).RemoveFromJobBlocklist), arg0, arg1, arg2, arg3)
}
// SaveTask mocks base method.
func (m *MockPersistenceService) SaveTask(arg0 context.Context, arg1 *persistence.Task) error {
m.ctrl.T.Helper()

View File

@ -39,6 +39,48 @@ func (db *DB) AddWorkerToJobBlocklist(ctx context.Context, job *Job, worker *Wor
return tx.Error
}
func (db *DB) FetchJobBlocklist(ctx context.Context, jobUUID string) ([]JobBlock, error) {
entries := []JobBlock{}
tx := db.gormDB.WithContext(ctx).
Model(JobBlock{}).
Joins("inner join jobs on jobs.id = job_blocks.job_id").
Joins("Worker").
Where("jobs.uuid = ?", jobUUID).
Scan(&entries)
return entries, tx.Error
}
func (db *DB) RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error {
// Find the job ID.
job := Job{}
tx := db.gormDB.WithContext(ctx).
Select("id").
Where("uuid = ?", jobUUID).
Find(&job)
if tx.Error != nil {
return jobError(tx.Error, "fetching job with uuid=%q", jobUUID)
}
// Find the worker ID.
worker := Worker{}
tx = db.gormDB.WithContext(ctx).
Select("id").
Where("uuid = ?", workerUUID).
Find(&worker)
if tx.Error != nil {
return workerError(tx.Error, "fetching worker with uuid=%q", workerUUID)
}
// Remove the blocklist entry.
tx = db.gormDB.WithContext(ctx).
Where("job_id = ?", job.ID).
Where("worker_id = ?", worker.ID).
Where("task_type = ?", taskType).
Delete(JobBlock{})
return tx.Error
}
// WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job.
//
// NOTE: this does NOT consider the task failure list, which blocks individual

View File

@ -42,6 +42,55 @@ func TestAddWorkerToJobBlocklist(t *testing.T) {
}
}
func TestFetchJobBlocklist(t *testing.T) {
ctx, close, db, job, _ := jobTasksTestFixtures(t)
defer close()
// Add a worker to the block list.
worker := createWorker(ctx, t, db)
err := db.AddWorkerToJobBlocklist(ctx, job, worker, "blender")
assert.NoError(t, err)
list, err := db.FetchJobBlocklist(ctx, job.UUID)
assert.NoError(t, err)
if assert.Len(t, list, 1) {
entry := list[0]
assert.Equal(t, entry.JobID, job.ID)
assert.Equal(t, entry.WorkerID, worker.ID)
assert.Equal(t, entry.TaskType, "blender")
assert.Nil(t, entry.Job, "should NOT fetch the entire job")
assert.NotNil(t, entry.Worker, "SHOULD fetch the entire worker")
}
}
func TestRemoveFromJobBlocklist(t *testing.T) {
ctx, close, db, job, _ := jobTasksTestFixtures(t)
defer close()
// Add a worker and some entries to the block list.
worker := createWorker(ctx, t, db)
err := db.AddWorkerToJobBlocklist(ctx, job, worker, "blender")
assert.NoError(t, err)
err = db.AddWorkerToJobBlocklist(ctx, job, worker, "ffmpeg")
assert.NoError(t, err)
// Remove an entry.
err = db.RemoveFromJobBlocklist(ctx, job.UUID, worker.UUID, "ffmpeg")
assert.NoError(t, err)
// Check that the other entry is still there.
list, err := db.FetchJobBlocklist(ctx, job.UUID)
assert.NoError(t, err)
if assert.Len(t, list, 1) {
entry := list[0]
assert.Equal(t, entry.JobID, job.ID)
assert.Equal(t, entry.WorkerID, worker.ID)
assert.Equal(t, entry.TaskType, "blender")
}
}
func TestWorkersLeftToRun(t *testing.T) {
ctx, close, db, job, _ := jobTasksTestFixtures(t)
defer close()