Manager: clear job's blocklist when requeueing the job
Requeueing a job means that the issues that caused workers to get blocked might be resolved, so it should be run with a clean slate.
This commit is contained in:
parent
77516a64fd
commit
6b5f9317cb
@ -58,6 +58,7 @@ type PersistenceService interface {
|
|||||||
AddWorkerToJobBlocklist(ctx context.Context, job *persistence.Job, worker *persistence.Worker, taskType string) error
|
AddWorkerToJobBlocklist(ctx context.Context, job *persistence.Job, worker *persistence.Worker, taskType string) error
|
||||||
FetchJobBlocklist(ctx context.Context, jobUUID string) ([]persistence.JobBlock, error)
|
FetchJobBlocklist(ctx context.Context, jobUUID string) ([]persistence.JobBlock, error)
|
||||||
RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error
|
RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error
|
||||||
|
ClearJobBlocklist(ctx context.Context, job *persistence.Job) error
|
||||||
|
|
||||||
// WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job.
|
// WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job.
|
||||||
WorkersLeftToRun(ctx context.Context, job *persistence.Job, taskType string) (map[string]bool, error)
|
WorkersLeftToRun(ctx context.Context, job *persistence.Job, taskType string) (map[string]bool, error)
|
||||||
|
@ -148,6 +148,10 @@ func (f *Flamenco) SetJobStatus(e echo.Context, jobID string) error {
|
|||||||
logger.Error().Err(err).Msg("error clearing failure list")
|
logger.Error().Err(err).Msg("error clearing failure list")
|
||||||
return sendAPIError(e, http.StatusInternalServerError, "unexpected error clearing the job's tasks' failure list")
|
return sendAPIError(e, http.StatusInternalServerError, "unexpected error clearing the job's tasks' failure list")
|
||||||
}
|
}
|
||||||
|
if err := f.persist.ClearJobBlocklist(ctx, dbJob); err != nil {
|
||||||
|
logger.Error().Err(err).Msg("error clearing failure list")
|
||||||
|
return sendAPIError(e, http.StatusInternalServerError, "unexpected error clearing the job's tasks' failure list")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return e.NoContent(http.StatusNoContent)
|
return e.NoContent(http.StatusNoContent)
|
||||||
|
@ -217,6 +217,7 @@ func TestSetJobStatusFailedToRequeueing(t *testing.T) {
|
|||||||
mf.persistence.EXPECT().FetchJob(ctx, jobID).Return(&dbJob, nil)
|
mf.persistence.EXPECT().FetchJob(ctx, jobID).Return(&dbJob, nil)
|
||||||
mf.stateMachine.EXPECT().JobStatusChange(ctx, &dbJob, statusUpdate.Status, "someone pushed a button")
|
mf.stateMachine.EXPECT().JobStatusChange(ctx, &dbJob, statusUpdate.Status, "someone pushed a button")
|
||||||
mf.persistence.EXPECT().ClearFailureListOfJob(ctx, &dbJob)
|
mf.persistence.EXPECT().ClearFailureListOfJob(ctx, &dbJob)
|
||||||
|
mf.persistence.EXPECT().ClearJobBlocklist(ctx, &dbJob)
|
||||||
|
|
||||||
// Do the call.
|
// Do the call.
|
||||||
err := mf.flamenco.SetJobStatus(echoCtx, jobID)
|
err := mf.flamenco.SetJobStatus(echoCtx, jobID)
|
||||||
|
@ -98,6 +98,20 @@ func (mr *MockPersistenceServiceMockRecorder) ClearFailureListOfTask(arg0, arg1
|
|||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearFailureListOfTask", reflect.TypeOf((*MockPersistenceService)(nil).ClearFailureListOfTask), arg0, arg1)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearFailureListOfTask", reflect.TypeOf((*MockPersistenceService)(nil).ClearFailureListOfTask), arg0, arg1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ClearJobBlocklist mocks base method.
|
||||||
|
func (m *MockPersistenceService) ClearJobBlocklist(arg0 context.Context, arg1 *persistence.Job) error {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "ClearJobBlocklist", arg0, arg1)
|
||||||
|
ret0, _ := ret[0].(error)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClearJobBlocklist indicates an expected call of ClearJobBlocklist.
|
||||||
|
func (mr *MockPersistenceServiceMockRecorder) ClearJobBlocklist(arg0, arg1 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearJobBlocklist", reflect.TypeOf((*MockPersistenceService)(nil).ClearJobBlocklist), arg0, arg1)
|
||||||
|
}
|
||||||
|
|
||||||
// CountTaskFailuresOfWorker mocks base method.
|
// CountTaskFailuresOfWorker mocks base method.
|
||||||
func (m *MockPersistenceService) CountTaskFailuresOfWorker(arg0 context.Context, arg1 *persistence.Job, arg2 *persistence.Worker, arg3 string) (int, error) {
|
func (m *MockPersistenceService) CountTaskFailuresOfWorker(arg0 context.Context, arg1 *persistence.Job, arg2 *persistence.Worker, arg3 string) (int, error) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
@ -51,6 +51,14 @@ func (db *DB) FetchJobBlocklist(ctx context.Context, jobUUID string) ([]JobBlock
|
|||||||
return entries, tx.Error
|
return entries, tx.Error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ClearJobBlocklist removes the entire blocklist of this job.
|
||||||
|
func (db *DB) ClearJobBlocklist(ctx context.Context, job *Job) error {
|
||||||
|
tx := db.gormDB.WithContext(ctx).
|
||||||
|
Where("job_id = ?", job.ID).
|
||||||
|
Delete(JobBlock{})
|
||||||
|
return tx.Error
|
||||||
|
}
|
||||||
|
|
||||||
func (db *DB) RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error {
|
func (db *DB) RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error {
|
||||||
// Find the job ID.
|
// Find the job ID.
|
||||||
job := Job{}
|
job := Job{}
|
||||||
|
@ -65,6 +65,27 @@ func TestFetchJobBlocklist(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClearJobBlocklist(t *testing.T) {
|
||||||
|
ctx, close, db, job, _ := jobTasksTestFixtures(t)
|
||||||
|
defer close()
|
||||||
|
|
||||||
|
// Add a worker and some entries to the block list.
|
||||||
|
worker := createWorker(ctx, t, db)
|
||||||
|
err := db.AddWorkerToJobBlocklist(ctx, job, worker, "blender")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = db.AddWorkerToJobBlocklist(ctx, job, worker, "ffmpeg")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Clear the blocklist.
|
||||||
|
err = db.ClearJobBlocklist(ctx, job)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Check that it is indeed empty.
|
||||||
|
list, err := db.FetchJobBlocklist(ctx, job.UUID)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Empty(t, list)
|
||||||
|
}
|
||||||
|
|
||||||
func TestRemoveFromJobBlocklist(t *testing.T) {
|
func TestRemoveFromJobBlocklist(t *testing.T) {
|
||||||
ctx, close, db, job, _ := jobTasksTestFixtures(t)
|
ctx, close, db, job, _ := jobTasksTestFixtures(t)
|
||||||
defer close()
|
defer close()
|
||||||
@ -91,6 +112,7 @@ func TestRemoveFromJobBlocklist(t *testing.T) {
|
|||||||
assert.Equal(t, entry.TaskType, "blender")
|
assert.Equal(t, entry.TaskType, "blender")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWorkersLeftToRun(t *testing.T) {
|
func TestWorkersLeftToRun(t *testing.T) {
|
||||||
ctx, close, db, job, _ := jobTasksTestFixtures(t)
|
ctx, close, db, job, _ := jobTasksTestFixtures(t)
|
||||||
defer close()
|
defer close()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user