Manager: clear task failure list on requeueing of jobs & tasks

When a job or task gets requeued from the web interface, its task
failure lists (i.e. the list of workers that previously failed this
task) will be cleared.

This clearing doesn't happen in other situations, e.g. when a worker
signs off and its task gets requeued, the task's failure list will
remain as-is.
This commit is contained in:
Sybren A. Stüvel 2022-06-17 11:05:34 +02:00
parent d8be9d95e8
commit 0b5140fc5f
6 changed files with 304 additions and 0 deletions

View File

@ -47,6 +47,10 @@ type PersistenceService interface {
// If no task is available, (nil, nil) is returned, as this is not an error situation.
ScheduleTask(ctx context.Context, w *persistence.Worker) (*persistence.Task, error)
AddWorkerToTaskFailedList(context.Context, *persistence.Task, *persistence.Worker) (numFailed int, err error)
// ClearFailureListOfTask clears the list of workers that failed this task.
ClearFailureListOfTask(context.Context, *persistence.Task) error
// ClearFailureListOfJob en-mass, for all tasks of this job, clears the list of workers that failed those tasks.
ClearFailureListOfJob(context.Context, *persistence.Job) error
// Database queries.
QueryJobs(ctx context.Context, query api.JobsQuery) ([]*persistence.Job, error)

View File

@ -131,6 +131,18 @@ func (f *Flamenco) SetJobStatus(e echo.Context, jobID string) error {
logger.Error().Err(err).Msg("error changing job status")
return sendAPIError(e, http.StatusInternalServerError, "unexpected error changing job status")
}
// Only in this function, i.e. only when changing the job from the web
// interface, does requeueing the job mean it should clear the failure list.
// This is why this is implemented here, and not in the Task State Machine.
switch statusChange.Status {
case api.JobStatusRequeueing:
if err := f.persist.ClearFailureListOfJob(ctx, dbJob); err != nil {
logger.Error().Err(err).Msg("error clearing failure list")
return sendAPIError(e, http.StatusInternalServerError, "unexpected error clearing the job's tasks' failure list")
}
}
return e.NoContent(http.StatusNoContent)
}
@ -177,6 +189,18 @@ func (f *Flamenco) SetTaskStatus(e echo.Context, taskID string) error {
logger.Error().Err(err).Msg("error changing task status")
return sendAPIError(e, http.StatusInternalServerError, "unexpected error changing task status")
}
// Only in this function, i.e. only when changing the task from the web
// interface, does requeueing the task mean it should clear the failure list.
// This is why this is implemented here, and not in the Task State Machine.
switch statusChange.Status {
case api.TaskStatusQueued:
if err := f.persist.ClearFailureListOfTask(ctx, dbTask); err != nil {
logger.Error().Err(err).Msg("error clearing failure list")
return sendAPIError(e, http.StatusInternalServerError, "unexpected error clearing the task's failure list")
}
}
return e.NoContent(http.StatusNoContent)
}

View File

@ -181,6 +181,8 @@ func TestSetJobStatus_happy(t *testing.T) {
mf.persistence.EXPECT().FetchJob(ctx, jobID).Return(&dbJob, nil)
mf.stateMachine.EXPECT().JobStatusChange(ctx, &dbJob, statusUpdate.Status, "someone pushed a button")
// Going to Cancel Requested should NOT clear the failure list.
// Do the call.
echoCtx := mf.prepareMockedJSONRequest(statusUpdate)
err := mf.flamenco.SetJobStatus(echoCtx, jobID)
@ -189,6 +191,85 @@ func TestSetJobStatus_happy(t *testing.T) {
assertResponseEmpty(t, echoCtx)
}
func TestSetJobStatusFailedToRequeueing(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
jobID := "18a9b096-d77e-438c-9be2-74397038298b"
statusUpdate := api.JobStatusChange{
Status: api.JobStatusRequeueing,
Reason: "someone pushed a button",
}
dbJob := persistence.Job{
UUID: jobID,
Name: "test job",
Status: api.JobStatusFailed,
Settings: persistence.StringInterfaceMap{},
Metadata: persistence.StringStringMap{},
}
// Set up expectations.
echoCtx := mf.prepareMockedJSONRequest(statusUpdate)
ctx := echoCtx.Request().Context()
mf.persistence.EXPECT().FetchJob(ctx, jobID).Return(&dbJob, nil)
mf.stateMachine.EXPECT().JobStatusChange(ctx, &dbJob, statusUpdate.Status, "someone pushed a button")
mf.persistence.EXPECT().ClearFailureListOfJob(ctx, &dbJob)
// Do the call.
err := mf.flamenco.SetJobStatus(echoCtx, jobID)
assert.NoError(t, err)
assertResponseEmpty(t, echoCtx)
}
func TestSetTaskStatusQueued(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
jobID := "18a9b096-d77e-438c-9be2-74397038298b"
taskID := "22a2e6e6-13a3-40e7-befd-d4ec8d97049d"
statusUpdate := api.TaskStatusChange{
Status: api.TaskStatusQueued,
Reason: "someone pushed a button",
}
dbJob := persistence.Job{
Model: persistence.Model{ID: 47},
UUID: jobID,
Name: "test job",
Status: api.JobStatusFailed,
Settings: persistence.StringInterfaceMap{},
Metadata: persistence.StringStringMap{},
}
dbTask := persistence.Task{
UUID: taskID,
Name: "test task",
Status: api.TaskStatusFailed,
Job: &dbJob,
JobID: dbJob.ID,
}
// Set up expectations.
echoCtx := mf.prepareMockedJSONRequest(statusUpdate)
ctx := echoCtx.Request().Context()
mf.persistence.EXPECT().FetchTask(ctx, taskID).Return(&dbTask, nil)
mf.stateMachine.EXPECT().TaskStatusChange(ctx, &dbTask, statusUpdate.Status)
mf.persistence.EXPECT().ClearFailureListOfTask(ctx, &dbTask)
updatedTask := dbTask
updatedTask.Activity = "someone pushed a button"
mf.persistence.EXPECT().SaveTaskActivity(ctx, &updatedTask)
// Do the call.
err := mf.flamenco.SetTaskStatus(echoCtx, taskID)
assert.NoError(t, err)
assertResponseEmpty(t, echoCtx)
}
func TestFetchTaskLogTail(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

View File

@ -55,6 +55,34 @@ func (mr *MockPersistenceServiceMockRecorder) AddWorkerToTaskFailedList(arg0, ar
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkerToTaskFailedList", reflect.TypeOf((*MockPersistenceService)(nil).AddWorkerToTaskFailedList), arg0, arg1, arg2)
}
// ClearFailureListOfJob mocks base method.
func (m *MockPersistenceService) ClearFailureListOfJob(arg0 context.Context, arg1 *persistence.Job) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClearFailureListOfJob", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// ClearFailureListOfJob indicates an expected call of ClearFailureListOfJob.
func (mr *MockPersistenceServiceMockRecorder) ClearFailureListOfJob(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearFailureListOfJob", reflect.TypeOf((*MockPersistenceService)(nil).ClearFailureListOfJob), arg0, arg1)
}
// ClearFailureListOfTask mocks base method.
func (m *MockPersistenceService) ClearFailureListOfTask(arg0 context.Context, arg1 *persistence.Task) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClearFailureListOfTask", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// ClearFailureListOfTask indicates an expected call of ClearFailureListOfTask.
func (mr *MockPersistenceServiceMockRecorder) ClearFailureListOfTask(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearFailureListOfTask", reflect.TypeOf((*MockPersistenceService)(nil).ClearFailureListOfTask), arg0, arg1)
}
// CreateWorker mocks base method.
func (m *MockPersistenceService) CreateWorker(arg0 context.Context, arg1 *persistence.Worker) error {
m.ctrl.T.Helper()

View File

@ -459,3 +459,26 @@ func (db *DB) AddWorkerToTaskFailedList(ctx context.Context, t *Task, w *Worker)
}
return int(numFailed64), tx.Error
}
// ClearFailureListOfTask clears the list of workers that failed this task.
func (db *DB) ClearFailureListOfTask(ctx context.Context, t *Task) error {
tx := db.gormDB.WithContext(ctx).
Where("task_id = ?", t.ID).
Delete(&TaskFailure{})
return tx.Error
}
// ClearFailureListOfJob en-mass, for all tasks of this job, clears the list of
// workers that failed those tasks.
func (db *DB) ClearFailureListOfJob(ctx context.Context, j *Job) error {
// SQLite doesn't support JOIN in DELETE queries, so use a sub-query instead.
jobTasksQuery := db.gormDB.Model(&Task{}).
Select("id").
Where("job_id = ?", j.ID)
tx := db.gormDB.WithContext(ctx).
Where("task_id in (?)", jobTasksQuery).
Delete(&TaskFailure{})
return tx.Error
}

View File

@ -4,6 +4,8 @@ package persistence
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"fmt"
"math"
"testing"
"time"
@ -11,6 +13,7 @@ import (
"golang.org/x/net/context"
"git.blender.org/flamenco/internal/manager/job_compilers"
"git.blender.org/flamenco/internal/uuid"
"git.blender.org/flamenco/pkg/api"
)
@ -304,6 +307,78 @@ func TestAddWorkerToTaskFailedList(t *testing.T) {
assert.Zero(t, num)
}
func TestClearFailureListOfTask(t *testing.T) {
ctx, close, db, _, authoredJob := jobTasksTestFixtures(t)
defer close()
task1, _ := db.FetchTask(ctx, authoredJob.Tasks[1].UUID)
task2, _ := db.FetchTask(ctx, authoredJob.Tasks[2].UUID)
worker1 := createWorker(ctx, t, db)
// Create another worker, using the 1st as template:
newWorker := *worker1
newWorker.ID = 0
newWorker.UUID = "89ed2b02-b51b-4cd4-b44a-4a1c8d01db85"
newWorker.Name = "Worker 2"
assert.NoError(t, db.SaveWorker(ctx, &newWorker))
worker2, err := db.FetchWorker(ctx, newWorker.UUID)
assert.NoError(t, err)
// Store some failures for different tasks.
_, _ = db.AddWorkerToTaskFailedList(ctx, task1, worker1)
_, _ = db.AddWorkerToTaskFailedList(ctx, task1, worker2)
_, _ = db.AddWorkerToTaskFailedList(ctx, task2, worker1)
// Clearing should just update this one task.
assert.NoError(t, db.ClearFailureListOfTask(ctx, task1))
var failures = []TaskFailure{}
tx := db.gormDB.Model(&TaskFailure{}).Scan(&failures)
assert.NoError(t, tx.Error)
if assert.Len(t, failures, 1) {
assert.Equal(t, task2.ID, failures[0].TaskID)
assert.Equal(t, worker1.ID, failures[0].WorkerID)
}
}
func TestClearFailureListOfJob(t *testing.T) {
ctx, close, db, dbJob1, authoredJob1 := jobTasksTestFixtures(t)
defer close()
// Construct a cloned version of the job.
authoredJob2 := duplicateJobAndTasks(authoredJob1)
persistAuthoredJob(t, ctx, db, authoredJob2)
task1_1, _ := db.FetchTask(ctx, authoredJob1.Tasks[1].UUID)
task1_2, _ := db.FetchTask(ctx, authoredJob1.Tasks[2].UUID)
task2_1, _ := db.FetchTask(ctx, authoredJob2.Tasks[1].UUID)
worker1 := createWorker(ctx, t, db)
worker2 := createWorkerFrom(ctx, t, db, *worker1)
// Store some failures for different tasks and jobs
_, _ = db.AddWorkerToTaskFailedList(ctx, task1_1, worker1)
_, _ = db.AddWorkerToTaskFailedList(ctx, task1_1, worker2)
_, _ = db.AddWorkerToTaskFailedList(ctx, task1_2, worker1)
_, _ = db.AddWorkerToTaskFailedList(ctx, task2_1, worker1)
_, _ = db.AddWorkerToTaskFailedList(ctx, task2_1, worker2)
// Sanity check: there should be 5 failures registered now.
assert.Equal(t, 5, countTaskFailures(db))
// Clearing should be limited to the given job.
assert.NoError(t, db.ClearFailureListOfJob(ctx, dbJob1))
var failures = []TaskFailure{}
tx := db.gormDB.Model(&TaskFailure{}).Scan(&failures)
assert.NoError(t, tx.Error)
if assert.Len(t, failures, 2) {
assert.Equal(t, task2_1.ID, failures[0].TaskID)
assert.Equal(t, worker1.ID, failures[0].WorkerID)
assert.Equal(t, task2_1.ID, failures[1].TaskID)
assert.Equal(t, worker2.ID, failures[1].WorkerID)
}
}
func createTestAuthoredJobWithTasks() job_compilers.AuthoredJob {
task1 := job_compilers.AuthoredTask{
Name: "render-1-3",
@ -384,6 +459,43 @@ func persistAuthoredJob(t *testing.T, ctx context.Context, db *DB, authoredJob j
return dbJob
}
// duplicateJobAndTasks constructs a copy of the given job and its tasks, ensuring new UUIDs.
// Does NOT copy settings, metadata, or commands. Just for testing with more than one job in the database.
func duplicateJobAndTasks(job job_compilers.AuthoredJob) job_compilers.AuthoredJob {
// The function call already made a new AuthoredJob copy.
// This function just needs to make the tasks are duplicated, make UUIDs
// unique, and ensure that task pointers are pointing to the copy.
// Duplicate task arrays.
tasks := job.Tasks
job.Tasks = []job_compilers.AuthoredTask{}
job.Tasks = append(job.Tasks, tasks...)
// Construct a mapping from old UUID to pointer-to-new-task
taskPtrs := map[string]*job_compilers.AuthoredTask{}
for idx := range job.Tasks {
taskPtrs[job.Tasks[idx].UUID] = &job.Tasks[idx]
}
// Go over all task dependencies, as those are stored as pointers, and update them.
for taskIdx := range job.Tasks {
newDeps := make([]*job_compilers.AuthoredTask, len(job.Tasks[taskIdx].Dependencies))
for depIdxs, oldTaskPtr := range job.Tasks[taskIdx].Dependencies {
depUUID := oldTaskPtr.UUID
newDeps[depIdxs] = taskPtrs[depUUID]
}
job.Tasks[taskIdx].Dependencies = newDeps
}
// Assign new UUIDs to the job & tasks.
job.JobID = uuid.New()
for idx := range job.Tasks {
job.Tasks[idx].UUID = uuid.New()
}
return job
}
func jobTasksTestFixtures(t *testing.T) (context.Context, context.CancelFunc, *DB, *Job, job_compilers.AuthoredJob) {
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
@ -420,3 +532,35 @@ func createWorker(ctx context.Context, t *testing.T, db *DB) *Worker {
return fetchedWorker
}
// createWorkerFrom duplicates the given worker, ensuring new UUIDs.
func createWorkerFrom(ctx context.Context, t *testing.T, db *DB, worker Worker) *Worker {
worker.ID = 0
worker.UUID = uuid.New()
worker.Name += " (copy)"
err := db.SaveWorker(ctx, &worker)
if !assert.NoError(t, err) {
t.FailNow()
}
dbWorker, err := db.FetchWorker(ctx, worker.UUID)
if !assert.NoError(t, err) {
t.FailNow()
}
return dbWorker
}
func countTaskFailures(db *DB) int {
var numFailures int64
tx := db.gormDB.Model(&TaskFailure{}).Count(&numFailures)
if tx.Error != nil {
panic(tx.Error)
}
if numFailures > math.MaxInt {
panic(fmt.Sprintf("too many failures: %v", numFailures))
}
return int(numFailures)
}