Manager: keep track of which worker failed which task

When a Worker indicates a task failed, mark it as `soft-failed` until
enough workers have tried & failed at the same task.

This is the first step in a blocklisting system, where tasks of an
often-failing worker will be requeued to be retried by others.

NOTE: currently the failure list of a task is NOT reset whenever it is
requeued! This will be implemented in a future commit, and is tracked in
`FEATURES.md`.
This commit is contained in:
Sybren A. Stüvel 2022-06-13 18:38:35 +02:00
parent c5debdeb70
commit 6e12a2fb25
9 changed files with 214 additions and 24 deletions

View File

@ -46,6 +46,9 @@ Note that list is **not** in any specific order.
Example: jobs in statuses `cancel-requested`, `requeueing`, etc.
- [x] Task timeout monitoring
- [ ] Worker blocklisting & failed task requeueing
- [x] Keep track of which worker failed which task.
- [ ] Clear task failure list when the task gets (re)queued.
- [ ] Keep track of a blocklist as `(worker ID, job ID, task type)` tuple in the database.
- [x] Worker timeout monitoring
- [ ] Last rendered image display

1
go.mod
View File

@ -30,6 +30,7 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91 // indirect
github.com/gertd/go-pluralize v0.2.1 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/swag v0.19.5 // indirect

2
go.sum
View File

@ -21,6 +21,8 @@ github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/fromkeith/gossdp v0.0.0-20180102154144-1b2c43f6886e h1:cG4ivpkHpkmWTaaLrgekDVR0xAr87V697T2c+WnUdiY=
github.com/fromkeith/gossdp v0.0.0-20180102154144-1b2c43f6886e/go.mod h1:7xQpS/YtlWo38XfIqje9GgtlPuBRatYcL23GlYBtgWM=
github.com/gertd/go-pluralize v0.2.1 h1:M3uASbVjMnTsPb0PNqg+E/24Vwigyo/tvyMTtAlLgiA=
github.com/gertd/go-pluralize v0.2.1/go.mod h1:rbYaKDbsXxmRfr8uygAEKhOWsjyrrqrkHVpZvoOp8zk=
github.com/getkin/kin-openapi v0.80.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSyhcnluiMv+Xg=
github.com/getkin/kin-openapi v0.88.0 h1:BjJ2JERWJbYE1o1RGEj/5LmR5qw7ecfl3O3su4ImR+0=
github.com/getkin/kin-openapi v0.88.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSyhcnluiMv+Xg=

View File

@ -13,6 +13,7 @@ import (
"github.com/benbjohnson/clock"
"github.com/rs/zerolog"
"git.blender.org/flamenco/internal/manager/config"
"git.blender.org/flamenco/internal/manager/job_compilers"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/manager/task_state_machine"
@ -45,6 +46,7 @@ type PersistenceService interface {
// ScheduleTask finds a task to execute by the given worker, and assigns it to that worker.
// If no task is available, (nil, nil) is returned, as this is not an error situation.
ScheduleTask(ctx context.Context, w *persistence.Worker) (*persistence.Task, error)
AddWorkerToTaskFailedList(context.Context, *persistence.Task, *persistence.Worker) (numFailed int, err error)
// Database queries.
QueryJobs(ctx context.Context, query api.JobsQuery) ([]*persistence.Job, error)
@ -102,6 +104,8 @@ type LogStorage interface {
type ConfigService interface {
VariableReplacer
Get() *config.Conf
// EffectiveStoragePath returns the job storage path used by Flamenco. It's
// basically the configured storage path, but can be influenced by other
// options (like Shaman).

View File

@ -40,6 +40,21 @@ func (m *MockPersistenceService) EXPECT() *MockPersistenceServiceMockRecorder {
return m.recorder
}
// AddWorkerToTaskFailedList mocks base method.
func (m *MockPersistenceService) AddWorkerToTaskFailedList(arg0 context.Context, arg1 *persistence.Task, arg2 *persistence.Worker) (int, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddWorkerToTaskFailedList", arg0, arg1, arg2)
ret0, _ := ret[0].(int)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// AddWorkerToTaskFailedList indicates an expected call of AddWorkerToTaskFailedList.
func (mr *MockPersistenceServiceMockRecorder) AddWorkerToTaskFailedList(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkerToTaskFailedList", reflect.TypeOf((*MockPersistenceService)(nil).AddWorkerToTaskFailedList), arg0, arg1, arg2)
}
// CreateWorker mocks base method.
func (m *MockPersistenceService) CreateWorker(arg0 context.Context, arg1 *persistence.Worker) error {
m.ctrl.T.Helper()
@ -512,6 +527,20 @@ func (mr *MockConfigServiceMockRecorder) ExpandVariables(arg0, arg1, arg2 interf
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExpandVariables", reflect.TypeOf((*MockConfigService)(nil).ExpandVariables), arg0, arg1, arg2)
}
// Get mocks base method.
func (m *MockConfigService) Get() *config.Conf {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Get")
ret0, _ := ret[0].(*config.Conf)
return ret0
}
// Get indicates an expected call of Get.
func (mr *MockConfigServiceMockRecorder) Get() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockConfigService)(nil).Get))
}
// MockTaskStateMachine is a mock of TaskStateMachine interface.
type MockTaskStateMachine struct {
ctrl *gomock.Controller

View File

@ -10,6 +10,7 @@ import (
"strings"
"time"
"github.com/gertd/go-pluralize"
"github.com/labstack/echo/v4"
"github.com/rs/zerolog"
"golang.org/x/crypto/bcrypt"
@ -360,7 +361,7 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error {
}
// Decode the request body.
var taskUpdate api.TaskUpdateJSONRequestBody
var taskUpdate api.TaskUpdate
if err := e.Bind(&taskUpdate); err != nil {
logger.Warn().Err(err).Msg("bad request received")
return sendAPIError(e, http.StatusBadRequest, "invalid format")
@ -375,7 +376,15 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error {
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID)
}
// TODO: check whether this task may undergo the requested status change.
// Status 'soft-failed' should never be sent by a Worker. Distinguishing
// between soft and hard failures is up to the Manager.
// Workers should always just send 'failed' instead.
if taskUpdate.TaskStatus != nil && *taskUpdate.TaskStatus == api.TaskStatusSoftFailed {
logger.Warn().Str("status", string(*taskUpdate.TaskStatus)).
Msg("worker trying to update task to not-allowed status")
return sendAPIError(e, http.StatusBadRequest,
"task status %s not allowed to be sent by Worker", *taskUpdate.TaskStatus)
}
taskUpdateErr := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate)
workerUpdateErr := f.workerPingedTask(ctx, logger, dbTask)
@ -394,18 +403,19 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error {
return e.NoContent(http.StatusNoContent)
}
// doTaskUpdate actually updates the task and its log file.
func (f *Flamenco) doTaskUpdate(
ctx context.Context,
logger zerolog.Logger,
w *persistence.Worker,
dbTask *persistence.Task,
update api.TaskUpdateJSONRequestBody,
update api.TaskUpdate,
) error {
if dbTask.Job == nil {
logger.Panic().Msg("dbTask.Job is nil, unable to continue")
}
var dbErrActivity, dbErrStatus error
var dbErrActivity error
if update.Activity != nil {
dbTask.Activity = *update.Activity
@ -422,25 +432,92 @@ func (f *Flamenco) doTaskUpdate(
_ = f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log)
}
if update.TaskStatus != nil {
if update.TaskStatus == nil {
return dbErrActivity
}
oldTaskStatus := dbTask.Status
err := f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus)
var err error
if *update.TaskStatus == api.TaskStatusFailed {
// Failure is more complex than just going to the failed state.
err = f.onTaskFailed(ctx, logger, w, dbTask, update)
} else {
// Just go to the given state.
err = f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus)
}
if err != nil {
logger.Error().Err(err).
Str("newTaskStatus", string(*update.TaskStatus)).
Str("oldTaskStatus", string(oldTaskStatus)).
Msg("error changing task status")
dbErrStatus = fmt.Errorf("changing status of task %s to %q: %w",
return fmt.Errorf("changing status of task %s to %q: %w",
dbTask.UUID, *update.TaskStatus, err)
}
return nil
}
// onTaskFailed decides whether a task is soft- or hard-failed. Note that this
// means that the task may NOT go to the status mentioned in the `update`
// parameter, but go to `soft-failed` instead.
func (f *Flamenco) onTaskFailed(
ctx context.Context,
logger zerolog.Logger,
worker *persistence.Worker,
task *persistence.Task,
update api.TaskUpdate,
) error {
// Sanity check.
if update.TaskStatus == nil || *update.TaskStatus != api.TaskStatusFailed {
panic("onTaskFailed should only be called with a task update that indicates task failure")
}
// Any error updating the status is more important than an error updating the
// activity.
if dbErrStatus != nil {
return dbErrStatus
// Bookkeeping of failure.
numFailed, err := f.persist.AddWorkerToTaskFailedList(ctx, task, worker)
if err != nil {
return fmt.Errorf("adding worker to failure list of task: %w", err)
}
return dbErrActivity
// f.maybeBlocklistWorker(ctx, w, dbTask)
threshold := f.config.Get().TaskFailAfterSoftFailCount
logger = logger.With().
Int("failedByWorkerCount", numFailed).
Int("threshold", threshold).
Logger()
var (
newStatus api.TaskStatus
localLog, taskLog string
)
pluralizer := pluralize.NewClient()
if numFailed >= threshold {
newStatus = api.TaskStatusFailed
localLog = "too many workers failed this task, hard-failing it"
taskLog = fmt.Sprintf(
"Task failed by %s, Manager will mark it as hard failure",
pluralizer.Pluralize("worker", numFailed, true),
)
} else {
newStatus = api.TaskStatusSoftFailed
localLog = "worker failed this task, soft-failing to give another worker a try"
failsToThreshold := threshold - numFailed
taskLog = fmt.Sprintf(
"Task failed by %s, Manager will mark it as soft failure. %d more %s will cause hard failure.",
pluralizer.Pluralize("worker", numFailed, true),
failsToThreshold,
pluralizer.Pluralize("failure", failsToThreshold, false),
)
}
if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil {
logger.Error().Err(err).Msg("error writing failure notice to task log")
}
logger.Info().Str("newTaskStatus", string(newStatus)).Msg(localLog)
return f.stateMachine.TaskStatusChange(ctx, task, newStatus)
}
func (f *Flamenco) workerPingedTask(

View File

@ -11,6 +11,7 @@ import (
"github.com/labstack/echo/v4"
"github.com/stretchr/testify/assert"
"git.blender.org/flamenco/internal/manager/config"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/pkg/api"
)
@ -328,7 +329,7 @@ func TestTaskUpdate(t *testing.T) {
taskUpdate := api.TaskUpdateJSONRequestBody{
Activity: ptr("testing"),
Log: ptr("line1\nline2\n"),
TaskStatus: ptr(api.TaskStatusFailed),
TaskStatus: ptr(api.TaskStatusCompleted),
}
// Construct the task that's supposed to be updated.
@ -348,7 +349,7 @@ func TestTaskUpdate(t *testing.T) {
// Expect the task status change to be handed to the state machine.
var statusChangedtask persistence.Task
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), gomock.AssignableToTypeOf(&persistence.Task{}), api.TaskStatusFailed).
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), gomock.AssignableToTypeOf(&persistence.Task{}), api.TaskStatusCompleted).
DoAndReturn(func(ctx context.Context, task *persistence.Task, newStatus api.TaskStatus) error {
statusChangedtask = *task
return nil
@ -388,6 +389,79 @@ func TestTaskUpdate(t *testing.T) {
assert.Equal(t, "testing", actUpdatedTask.Activity)
}
func TestTaskUpdateFailed(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
worker := testWorker()
// Construct the JSON request object.
taskUpdate := api.TaskUpdateJSONRequestBody{
TaskStatus: ptr(api.TaskStatusFailed),
}
// Construct the task that's supposed to be updated.
taskID := "181eab68-1123-4790-93b1-94309a899411"
jobID := "e4719398-7cfa-4877-9bab-97c2d6c158b5"
mockJob := persistence.Job{UUID: jobID}
mockTask := persistence.Task{
UUID: taskID,
Worker: &worker,
WorkerID: &worker.ID,
Job: &mockJob,
Activity: "pre-update activity",
}
conf := config.Conf{
Base: config.Base{
TaskFailAfterSoftFailCount: 3,
},
}
mf.config.EXPECT().Get().Return(&conf).AnyTimes()
// Expect the task to be fetched for each sub-test:
mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil).Times(2)
// Expect a 'touch' of the task for each sub-test:
mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), &mockTask).Times(2)
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker).Times(2)
{
// Expect the Worker to be added to the list of workers.
// This returns 1, which is less than the failure threshold -> soft failure expected.
mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil)
// Expect soft failure.
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusSoftFailed)
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID,
"Task failed by 1 worker, Manager will mark it as soft failure. 2 more failures will cause hard failure.")
// Do the call.
echoCtx := mf.prepareMockedJSONRequest(taskUpdate)
requestWorkerStore(echoCtx, &worker)
err := mf.flamenco.TaskUpdate(echoCtx, taskID)
assert.NoError(t, err)
assertResponseEmpty(t, echoCtx)
}
{
// Test with more (mocked) failures in the past, pushing the task over the threshold.
mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).
Return(conf.TaskFailAfterSoftFailCount, nil)
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusFailed)
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID,
"Task failed by 3 workers, Manager will mark it as hard failure")
// Do the call.
echoCtx := mf.prepareMockedJSONRequest(taskUpdate)
requestWorkerStore(echoCtx, &worker)
err := mf.flamenco.TaskUpdate(echoCtx, taskID)
assert.NoError(t, err)
assertResponseEmpty(t, echoCtx)
}
}
func TestMayWorkerRun(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

View File

@ -96,7 +96,7 @@ type Base struct {
// When this many workers have tried the task and failed, it will be hard-failed
// (even when there are workers left that could technically retry the task).
// TaskFailAfterSoftFailCount int `yaml:"task_fail_after_softfail_count"`
TaskFailAfterSoftFailCount int `yaml:"task_fail_after_softfail_count"`
// TestTasks TestTasks `yaml:"test_tasks"`

View File

@ -40,7 +40,7 @@ var defaultConfig = Conf{
// TaskCleanupMaxAge: 14 * 24 * time.Hour,
// BlacklistThreshold: 3,
// TaskFailAfterSoftFailCount: 3,
TaskFailAfterSoftFailCount: 3,
// WorkerCleanupStatus: []string{string(api.WorkerStatusOffline)},