diff --git a/FEATURES.md b/FEATURES.md index 8e8183ec..7ee28a9e 100644 --- a/FEATURES.md +++ b/FEATURES.md @@ -46,6 +46,9 @@ Note that list is **not** in any specific order. Example: jobs in statuses `cancel-requested`, `requeueing`, etc. - [x] Task timeout monitoring - [ ] Worker blocklisting & failed task requeueing + - [x] Keep track of which worker failed which task. + - [ ] Clear task failure list when the task gets (re)queued. + - [ ] Keep track of a blocklist as `(worker ID, job ID, task type)` tuple in the database. - [x] Worker timeout monitoring - [ ] Last rendered image display diff --git a/go.mod b/go.mod index 1079e1f2..fc525020 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91 // indirect + github.com/gertd/go-pluralize v0.2.1 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/swag v0.19.5 // indirect diff --git a/go.sum b/go.sum index c2fb6574..de6c7293 100644 --- a/go.sum +++ b/go.sum @@ -21,6 +21,8 @@ github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4 github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/fromkeith/gossdp v0.0.0-20180102154144-1b2c43f6886e h1:cG4ivpkHpkmWTaaLrgekDVR0xAr87V697T2c+WnUdiY= github.com/fromkeith/gossdp v0.0.0-20180102154144-1b2c43f6886e/go.mod h1:7xQpS/YtlWo38XfIqje9GgtlPuBRatYcL23GlYBtgWM= +github.com/gertd/go-pluralize v0.2.1 h1:M3uASbVjMnTsPb0PNqg+E/24Vwigyo/tvyMTtAlLgiA= +github.com/gertd/go-pluralize v0.2.1/go.mod h1:rbYaKDbsXxmRfr8uygAEKhOWsjyrrqrkHVpZvoOp8zk= github.com/getkin/kin-openapi v0.80.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSyhcnluiMv+Xg= github.com/getkin/kin-openapi v0.88.0 h1:BjJ2JERWJbYE1o1RGEj/5LmR5qw7ecfl3O3su4ImR+0= github.com/getkin/kin-openapi v0.88.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSyhcnluiMv+Xg= diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 11f9378b..3fb2746d 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -13,6 +13,7 @@ import ( "github.com/benbjohnson/clock" "github.com/rs/zerolog" + "git.blender.org/flamenco/internal/manager/config" "git.blender.org/flamenco/internal/manager/job_compilers" "git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/internal/manager/task_state_machine" @@ -45,6 +46,7 @@ type PersistenceService interface { // ScheduleTask finds a task to execute by the given worker, and assigns it to that worker. // If no task is available, (nil, nil) is returned, as this is not an error situation. ScheduleTask(ctx context.Context, w *persistence.Worker) (*persistence.Task, error) + AddWorkerToTaskFailedList(context.Context, *persistence.Task, *persistence.Worker) (numFailed int, err error) // Database queries. QueryJobs(ctx context.Context, query api.JobsQuery) ([]*persistence.Job, error) @@ -102,6 +104,8 @@ type LogStorage interface { type ConfigService interface { VariableReplacer + Get() *config.Conf + // EffectiveStoragePath returns the job storage path used by Flamenco. It's // basically the configured storage path, but can be influenced by other // options (like Shaman). diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 3a569cd4..19a2104f 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -40,6 +40,21 @@ func (m *MockPersistenceService) EXPECT() *MockPersistenceServiceMockRecorder { return m.recorder } +// AddWorkerToTaskFailedList mocks base method. +func (m *MockPersistenceService) AddWorkerToTaskFailedList(arg0 context.Context, arg1 *persistence.Task, arg2 *persistence.Worker) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddWorkerToTaskFailedList", arg0, arg1, arg2) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AddWorkerToTaskFailedList indicates an expected call of AddWorkerToTaskFailedList. +func (mr *MockPersistenceServiceMockRecorder) AddWorkerToTaskFailedList(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkerToTaskFailedList", reflect.TypeOf((*MockPersistenceService)(nil).AddWorkerToTaskFailedList), arg0, arg1, arg2) +} + // CreateWorker mocks base method. func (m *MockPersistenceService) CreateWorker(arg0 context.Context, arg1 *persistence.Worker) error { m.ctrl.T.Helper() @@ -512,6 +527,20 @@ func (mr *MockConfigServiceMockRecorder) ExpandVariables(arg0, arg1, arg2 interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExpandVariables", reflect.TypeOf((*MockConfigService)(nil).ExpandVariables), arg0, arg1, arg2) } +// Get mocks base method. +func (m *MockConfigService) Get() *config.Conf { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get") + ret0, _ := ret[0].(*config.Conf) + return ret0 +} + +// Get indicates an expected call of Get. +func (mr *MockConfigServiceMockRecorder) Get() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockConfigService)(nil).Get)) +} + // MockTaskStateMachine is a mock of TaskStateMachine interface. type MockTaskStateMachine struct { ctrl *gomock.Controller diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index e375bdf9..cd62f134 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/gertd/go-pluralize" "github.com/labstack/echo/v4" "github.com/rs/zerolog" "golang.org/x/crypto/bcrypt" @@ -360,7 +361,7 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error { } // Decode the request body. - var taskUpdate api.TaskUpdateJSONRequestBody + var taskUpdate api.TaskUpdate if err := e.Bind(&taskUpdate); err != nil { logger.Warn().Err(err).Msg("bad request received") return sendAPIError(e, http.StatusBadRequest, "invalid format") @@ -375,7 +376,15 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error { return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID) } - // TODO: check whether this task may undergo the requested status change. + // Status 'soft-failed' should never be sent by a Worker. Distinguishing + // between soft and hard failures is up to the Manager. + // Workers should always just send 'failed' instead. + if taskUpdate.TaskStatus != nil && *taskUpdate.TaskStatus == api.TaskStatusSoftFailed { + logger.Warn().Str("status", string(*taskUpdate.TaskStatus)). + Msg("worker trying to update task to not-allowed status") + return sendAPIError(e, http.StatusBadRequest, + "task status %s not allowed to be sent by Worker", *taskUpdate.TaskStatus) + } taskUpdateErr := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate) workerUpdateErr := f.workerPingedTask(ctx, logger, dbTask) @@ -394,18 +403,19 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error { return e.NoContent(http.StatusNoContent) } +// doTaskUpdate actually updates the task and its log file. func (f *Flamenco) doTaskUpdate( ctx context.Context, logger zerolog.Logger, w *persistence.Worker, dbTask *persistence.Task, - update api.TaskUpdateJSONRequestBody, + update api.TaskUpdate, ) error { if dbTask.Job == nil { logger.Panic().Msg("dbTask.Job is nil, unable to continue") } - var dbErrActivity, dbErrStatus error + var dbErrActivity error if update.Activity != nil { dbTask.Activity = *update.Activity @@ -422,25 +432,92 @@ func (f *Flamenco) doTaskUpdate( _ = f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log) } - if update.TaskStatus != nil { - oldTaskStatus := dbTask.Status - err := f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus) - if err != nil { - logger.Error().Err(err). - Str("newTaskStatus", string(*update.TaskStatus)). - Str("oldTaskStatus", string(oldTaskStatus)). - Msg("error changing task status") - dbErrStatus = fmt.Errorf("changing status of task %s to %q: %w", - dbTask.UUID, *update.TaskStatus, err) - } + if update.TaskStatus == nil { + return dbErrActivity } - // Any error updating the status is more important than an error updating the - // activity. - if dbErrStatus != nil { - return dbErrStatus + oldTaskStatus := dbTask.Status + var err error + if *update.TaskStatus == api.TaskStatusFailed { + // Failure is more complex than just going to the failed state. + err = f.onTaskFailed(ctx, logger, w, dbTask, update) + } else { + // Just go to the given state. + err = f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus) } - return dbErrActivity + + if err != nil { + logger.Error().Err(err). + Str("newTaskStatus", string(*update.TaskStatus)). + Str("oldTaskStatus", string(oldTaskStatus)). + Msg("error changing task status") + return fmt.Errorf("changing status of task %s to %q: %w", + dbTask.UUID, *update.TaskStatus, err) + } + + return nil +} + +// onTaskFailed decides whether a task is soft- or hard-failed. Note that this +// means that the task may NOT go to the status mentioned in the `update` +// parameter, but go to `soft-failed` instead. +func (f *Flamenco) onTaskFailed( + ctx context.Context, + logger zerolog.Logger, + worker *persistence.Worker, + task *persistence.Task, + update api.TaskUpdate, +) error { + // Sanity check. + if update.TaskStatus == nil || *update.TaskStatus != api.TaskStatusFailed { + panic("onTaskFailed should only be called with a task update that indicates task failure") + } + + // Bookkeeping of failure. + numFailed, err := f.persist.AddWorkerToTaskFailedList(ctx, task, worker) + if err != nil { + return fmt.Errorf("adding worker to failure list of task: %w", err) + } + // f.maybeBlocklistWorker(ctx, w, dbTask) + + threshold := f.config.Get().TaskFailAfterSoftFailCount + logger = logger.With(). + Int("failedByWorkerCount", numFailed). + Int("threshold", threshold). + Logger() + + var ( + newStatus api.TaskStatus + localLog, taskLog string + ) + pluralizer := pluralize.NewClient() + if numFailed >= threshold { + newStatus = api.TaskStatusFailed + + localLog = "too many workers failed this task, hard-failing it" + taskLog = fmt.Sprintf( + "Task failed by %s, Manager will mark it as hard failure", + pluralizer.Pluralize("worker", numFailed, true), + ) + } else { + newStatus = api.TaskStatusSoftFailed + + localLog = "worker failed this task, soft-failing to give another worker a try" + failsToThreshold := threshold - numFailed + taskLog = fmt.Sprintf( + "Task failed by %s, Manager will mark it as soft failure. %d more %s will cause hard failure.", + pluralizer.Pluralize("worker", numFailed, true), + failsToThreshold, + pluralizer.Pluralize("failure", failsToThreshold, false), + ) + } + + if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil { + logger.Error().Err(err).Msg("error writing failure notice to task log") + } + + logger.Info().Str("newTaskStatus", string(newStatus)).Msg(localLog) + return f.stateMachine.TaskStatusChange(ctx, task, newStatus) } func (f *Flamenco) workerPingedTask( diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index eae3bb76..409fe564 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -11,6 +11,7 @@ import ( "github.com/labstack/echo/v4" "github.com/stretchr/testify/assert" + "git.blender.org/flamenco/internal/manager/config" "git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/pkg/api" ) @@ -328,7 +329,7 @@ func TestTaskUpdate(t *testing.T) { taskUpdate := api.TaskUpdateJSONRequestBody{ Activity: ptr("testing"), Log: ptr("line1\nline2\n"), - TaskStatus: ptr(api.TaskStatusFailed), + TaskStatus: ptr(api.TaskStatusCompleted), } // Construct the task that's supposed to be updated. @@ -348,7 +349,7 @@ func TestTaskUpdate(t *testing.T) { // Expect the task status change to be handed to the state machine. var statusChangedtask persistence.Task - mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), gomock.AssignableToTypeOf(&persistence.Task{}), api.TaskStatusFailed). + mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), gomock.AssignableToTypeOf(&persistence.Task{}), api.TaskStatusCompleted). DoAndReturn(func(ctx context.Context, task *persistence.Task, newStatus api.TaskStatus) error { statusChangedtask = *task return nil @@ -388,6 +389,79 @@ func TestTaskUpdate(t *testing.T) { assert.Equal(t, "testing", actUpdatedTask.Activity) } +func TestTaskUpdateFailed(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + worker := testWorker() + + // Construct the JSON request object. + taskUpdate := api.TaskUpdateJSONRequestBody{ + TaskStatus: ptr(api.TaskStatusFailed), + } + + // Construct the task that's supposed to be updated. + taskID := "181eab68-1123-4790-93b1-94309a899411" + jobID := "e4719398-7cfa-4877-9bab-97c2d6c158b5" + mockJob := persistence.Job{UUID: jobID} + mockTask := persistence.Task{ + UUID: taskID, + Worker: &worker, + WorkerID: &worker.ID, + Job: &mockJob, + Activity: "pre-update activity", + } + + conf := config.Conf{ + Base: config.Base{ + TaskFailAfterSoftFailCount: 3, + }, + } + mf.config.EXPECT().Get().Return(&conf).AnyTimes() + + // Expect the task to be fetched for each sub-test: + mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil).Times(2) + + // Expect a 'touch' of the task for each sub-test: + mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), &mockTask).Times(2) + mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker).Times(2) + + { + // Expect the Worker to be added to the list of workers. + // This returns 1, which is less than the failure threshold -> soft failure expected. + mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil) + + // Expect soft failure. + mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusSoftFailed) + mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID, + "Task failed by 1 worker, Manager will mark it as soft failure. 2 more failures will cause hard failure.") + + // Do the call. + echoCtx := mf.prepareMockedJSONRequest(taskUpdate) + requestWorkerStore(echoCtx, &worker) + err := mf.flamenco.TaskUpdate(echoCtx, taskID) + assert.NoError(t, err) + assertResponseEmpty(t, echoCtx) + } + + { + // Test with more (mocked) failures in the past, pushing the task over the threshold. + mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker). + Return(conf.TaskFailAfterSoftFailCount, nil) + mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusFailed) + mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID, + "Task failed by 3 workers, Manager will mark it as hard failure") + + // Do the call. + echoCtx := mf.prepareMockedJSONRequest(taskUpdate) + requestWorkerStore(echoCtx, &worker) + err := mf.flamenco.TaskUpdate(echoCtx, taskID) + assert.NoError(t, err) + assertResponseEmpty(t, echoCtx) + } +} + func TestMayWorkerRun(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() diff --git a/internal/manager/config/config.go b/internal/manager/config/config.go index 08532f54..601a9cd7 100644 --- a/internal/manager/config/config.go +++ b/internal/manager/config/config.go @@ -96,7 +96,7 @@ type Base struct { // When this many workers have tried the task and failed, it will be hard-failed // (even when there are workers left that could technically retry the task). - // TaskFailAfterSoftFailCount int `yaml:"task_fail_after_softfail_count"` + TaskFailAfterSoftFailCount int `yaml:"task_fail_after_softfail_count"` // TestTasks TestTasks `yaml:"test_tasks"` diff --git a/internal/manager/config/defaults.go b/internal/manager/config/defaults.go index 031444e4..7bfa71fd 100644 --- a/internal/manager/config/defaults.go +++ b/internal/manager/config/defaults.go @@ -40,7 +40,7 @@ var defaultConfig = Conf{ // TaskCleanupMaxAge: 14 * 24 * time.Hour, // BlacklistThreshold: 3, - // TaskFailAfterSoftFailCount: 3, + TaskFailAfterSoftFailCount: 3, // WorkerCleanupStatus: []string{string(api.WorkerStatusOffline)},