From c3525c3b1a2f7ba83ce49eee9bd41be3744d76ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Mon, 13 Jun 2022 12:29:44 +0200 Subject: [PATCH] Manager: move task requeueing to `TaskStateMachine` Requeueing the tasks of a specific worker is now done in the `TaskStateMachine`, such that it can be called from other services as well in future commits. This also makes the `LogStorage` service a dependency of the `TaskStateMachine`, as it needs to write "this task was requeued" kind of messages to the task logs. --- cmd/flamenco-manager/main.go | 2 +- internal/manager/api_impl/interfaces.go | 3 +- .../api_impl/mocks/api_impl_mock.gen.go | 29 ++++---- internal/manager/api_impl/workers.go | 40 +---------- internal/manager/api_impl/workers_test.go | 35 +--------- .../manager/task_state_machine/interfaces.go | 13 +++- .../mocks/interfaces_mock.gen.go | 69 ++++++++++++++++++- .../task_state_machine/task_state_machine.go | 5 +- .../task_state_machine_test.go | 4 +- .../task_state_machine/worker_requeue.go | 62 +++++++++++++++++ .../task_state_machine/worker_requeue_test.go | 64 +++++++++++++++++ 11 files changed, 232 insertions(+), 94 deletions(-) create mode 100644 internal/manager/task_state_machine/worker_requeue.go create mode 100644 internal/manager/task_state_machine/worker_requeue_test.go diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index bd868279..b35483b2 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -109,8 +109,8 @@ func main() { timeService := clock.New() webUpdater := webupdates.New() - taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater) logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath, timeService, webUpdater) + taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater, logStorage) flamenco := buildFlamencoAPI(timeService, configService, persist, taskStateMachine, logStorage, webUpdater) e := buildWebService(flamenco, persist, ssdp, webUpdater, urls) diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 827ecb97..ddb86a25 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -32,7 +32,6 @@ type PersistenceService interface { FetchTask(ctx context.Context, taskID string) (*persistence.Task, error) SaveTask(ctx context.Context, task *persistence.Task) error SaveTaskActivity(ctx context.Context, t *persistence.Task) error - FetchTasksOfWorkerInStatus(context.Context, *persistence.Worker, api.TaskStatus) ([]*persistence.Task, error) // TaskTouchedByWorker marks the task as 'touched' by a worker. This is used for timeout detection. TaskTouchedByWorker(context.Context, *persistence.Task) error @@ -59,6 +58,8 @@ type TaskStateMachine interface { // JobStatusChange gives a Job a new status, and handles the resulting status changes on its tasks. JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus, reason string) error + + RequeueTasksOfWorker(ctx context.Context, worker *persistence.Worker, reason string) error } // TaskStateMachine should be a subset of task_state_machine.StateMachine. diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index faad736d..5bf95a31 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -84,21 +84,6 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTask(arg0, arg1 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTask", reflect.TypeOf((*MockPersistenceService)(nil).FetchTask), arg0, arg1) } -// FetchTasksOfWorkerInStatus mocks base method. -func (m *MockPersistenceService) FetchTasksOfWorkerInStatus(arg0 context.Context, arg1 *persistence.Worker, arg2 api.TaskStatus) ([]*persistence.Task, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchTasksOfWorkerInStatus", arg0, arg1, arg2) - ret0, _ := ret[0].([]*persistence.Task) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// FetchTasksOfWorkerInStatus indicates an expected call of FetchTasksOfWorkerInStatus. -func (mr *MockPersistenceServiceMockRecorder) FetchTasksOfWorkerInStatus(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTasksOfWorkerInStatus", reflect.TypeOf((*MockPersistenceService)(nil).FetchTasksOfWorkerInStatus), arg0, arg1, arg2) -} - // FetchWorker mocks base method. func (m *MockPersistenceService) FetchWorker(arg0 context.Context, arg1 string) (*persistence.Worker, error) { m.ctrl.T.Helper() @@ -550,6 +535,20 @@ func (mr *MockTaskStateMachineMockRecorder) JobStatusChange(arg0, arg1, arg2, ar return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "JobStatusChange", reflect.TypeOf((*MockTaskStateMachine)(nil).JobStatusChange), arg0, arg1, arg2, arg3) } +// RequeueTasksOfWorker mocks base method. +func (m *MockTaskStateMachine) RequeueTasksOfWorker(arg0 context.Context, arg1 *persistence.Worker, arg2 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RequeueTasksOfWorker", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// RequeueTasksOfWorker indicates an expected call of RequeueTasksOfWorker. +func (mr *MockTaskStateMachineMockRecorder) RequeueTasksOfWorker(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequeueTasksOfWorker", reflect.TypeOf((*MockTaskStateMachine)(nil).RequeueTasksOfWorker), arg0, arg1, arg2) +} + // TaskStatusChange mocks base method. func (m *MockTaskStateMachine) TaskStatusChange(arg0 context.Context, arg1 *persistence.Task, arg2 api.TaskStatus) error { m.ctrl.T.Helper() diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index 10f57337..ed536f16 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -168,7 +168,7 @@ func (f *Flamenco) SignOff(e echo.Context) error { } // Re-queue all tasks (should be only one) this worker is now working on. - err = f.workerRequeueActiveTasks(ctx, logger, w) + err = f.stateMachine.RequeueTasksOfWorker(ctx, w, "worker signed off") if err != nil { return sendAPIError(e, http.StatusInternalServerError, "error re-queueing your tasks") } @@ -180,44 +180,6 @@ func (f *Flamenco) SignOff(e echo.Context) error { return e.NoContent(http.StatusNoContent) } -// workerRequeueActiveTasks re-queues all active tasks (should be max one) of this worker. -func (f *Flamenco) workerRequeueActiveTasks(ctx context.Context, logger zerolog.Logger, worker *persistence.Worker) error { - // Fetch the tasks to update. - tasks, err := f.persist.FetchTasksOfWorkerInStatus(ctx, worker, api.TaskStatusActive) - if err != nil { - return fmt.Errorf("fetching tasks of worker %s in status %q: %w", worker.UUID, api.TaskStatusActive, err) - } - - // Run each task change through the task state machine. - var lastErr error - for _, task := range tasks { - logger.Info(). - Str("task", task.UUID). - Msg("re-queueing task") - - // Write to task activity that it got requeued because of worker sign-off. - task.Activity = "Task requeued because worked signed off" - if err := f.persist.SaveTaskActivity(ctx, task); err != nil { - logger.Warn().Err(err). - Str("task", task.UUID). - Msg("error queueing task on worker sign-off") - lastErr = err - } - - if err := f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusQueued); err != nil { - logger.Warn().Err(err). - Str("task", task.UUID). - Msg("error queueing task on worker sign-off") - lastErr = err - } - - _ = f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, - "Task was requeued by Manager because the worker assigned to it signed off.") - } - - return lastErr -} - // (GET /api/worker/state) func (f *Flamenco) WorkerState(e echo.Context) error { worker := requestWorkerOrPanic(e) diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index 4ec10d5f..dcdcc9f1 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -134,23 +134,6 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) { mf := newMockedFlamenco(mockCtrl) worker := testWorker() - job := persistence.Job{ - UUID: "583a7d59-887a-4c6c-b3e4-a753018f71b0", - } - // Mock that the worker has two active tasks. It shouldn't happen, but even - // when it does, both should be requeued when the worker signs off. - task1 := persistence.Task{ - UUID: "4107c7aa-e86d-4244-858b-6c4fce2af503", - Job: &job, - Status: api.TaskStatusActive, - } - task2 := persistence.Task{ - UUID: "beb3f39b-57a5-44bf-a0ad-533e3513a0b6", - Job: &job, - Status: api.TaskStatusActive, - } - workerTasks := []*persistence.Task{&task1, &task2} - // Signing off should be handled completely, even when the HTTP connection // breaks. This means using a different context than the one passed by Echo. echo := mf.prepareMockedRequest(nil) @@ -158,18 +141,7 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) { expectCtx := gomock.Not(gomock.Eq(echo.Request().Context())) // Expect worker's tasks to be re-queued. - mf.persistence.EXPECT(). - FetchTasksOfWorkerInStatus(expectCtx, &worker, api.TaskStatusActive). - Return(workerTasks, nil) - mf.stateMachine.EXPECT().TaskStatusChange(expectCtx, &task1, api.TaskStatusQueued) - mf.stateMachine.EXPECT().TaskStatusChange(expectCtx, &task2, api.TaskStatusQueued) - - // Expect this re-queueing to end up in the task's log and activity. - mf.persistence.EXPECT().SaveTaskActivity(expectCtx, &task1) // TODO: test saved activity value - mf.persistence.EXPECT().SaveTaskActivity(expectCtx, &task2) // TODO: test saved activity value - logMsg := "Task was requeued by Manager because the worker assigned to it signed off." - mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task1.UUID, logMsg) - mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task2.UUID, logMsg) + mf.stateMachine.EXPECT().RequeueTasksOfWorker(expectCtx, &worker, "worker signed off").Return(nil) // Expect worker to be saved as 'offline'. mf.persistence.EXPECT(). @@ -220,10 +192,7 @@ func TestWorkerSignoffStatusChangeRequest(t *testing.T) { savedWorker.StatusChangeClear() mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil) - // Mimick that no tasks are currently being worked on. - mf.persistence.EXPECT(). - FetchTasksOfWorkerInStatus(gomock.Any(), &worker, api.TaskStatusActive). - Return(nil, nil) + mf.stateMachine.EXPECT().RequeueTasksOfWorker(gomock.Any(), &worker, "worker signed off").Return(nil) // Perform the request echo := mf.prepareMockedRequest(nil) diff --git a/internal/manager/task_state_machine/interfaces.go b/internal/manager/task_state_machine/interfaces.go index 5a496f60..6a6f25f6 100644 --- a/internal/manager/task_state_machine/interfaces.go +++ b/internal/manager/task_state_machine/interfaces.go @@ -6,15 +6,18 @@ import ( "context" "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/internal/manager/task_logs" "git.blender.org/flamenco/internal/manager/webupdates" "git.blender.org/flamenco/pkg/api" + "github.com/rs/zerolog" ) // Generate mock implementations of these interfaces. -//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/task_state_machine PersistenceService,ChangeBroadcaster +//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/task_state_machine PersistenceService,ChangeBroadcaster,LogStorage type PersistenceService interface { SaveTask(ctx context.Context, task *persistence.Task) error + SaveTaskActivity(ctx context.Context, t *persistence.Task) error SaveJobStatus(ctx context.Context, j *persistence.Job) error JobHasTasksInStatus(ctx context.Context, job *persistence.Job, taskStatus api.TaskStatus) (bool, error) @@ -30,6 +33,7 @@ type PersistenceService interface { statusesToUpdate []api.TaskStatus, taskStatus api.TaskStatus, activity string) error FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*persistence.Job, error) + FetchTasksOfWorkerInStatus(context.Context, *persistence.Worker, api.TaskStatus) ([]*persistence.Task, error) } // PersistenceService should be a subset of persistence.DB @@ -45,3 +49,10 @@ type ChangeBroadcaster interface { // ChangeBroadcaster should be a subset of webupdates.BiDirComms var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) + +// LogStorage writes to task logs. +type LogStorage interface { + WriteTimestamped(logger zerolog.Logger, jobID, taskID string, logText string) error +} + +var _ LogStorage = (*task_logs.Storage)(nil) diff --git a/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go b/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go index 2a78499d..90eeddfd 100644 --- a/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go +++ b/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: git.blender.org/flamenco/internal/manager/task_state_machine (interfaces: PersistenceService,ChangeBroadcaster) +// Source: git.blender.org/flamenco/internal/manager/task_state_machine (interfaces: PersistenceService,ChangeBroadcaster,LogStorage) // Package mocks is a generated GoMock package. package mocks @@ -11,6 +11,7 @@ import ( persistence "git.blender.org/flamenco/internal/manager/persistence" api "git.blender.org/flamenco/pkg/api" gomock "github.com/golang/mock/gomock" + zerolog "github.com/rs/zerolog" ) // MockPersistenceService is a mock of PersistenceService interface. @@ -77,6 +78,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchJobsInStatus(arg0 interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJobsInStatus", reflect.TypeOf((*MockPersistenceService)(nil).FetchJobsInStatus), varargs...) } +// FetchTasksOfWorkerInStatus mocks base method. +func (m *MockPersistenceService) FetchTasksOfWorkerInStatus(arg0 context.Context, arg1 *persistence.Worker, arg2 api.TaskStatus) ([]*persistence.Task, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchTasksOfWorkerInStatus", arg0, arg1, arg2) + ret0, _ := ret[0].([]*persistence.Task) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchTasksOfWorkerInStatus indicates an expected call of FetchTasksOfWorkerInStatus. +func (mr *MockPersistenceServiceMockRecorder) FetchTasksOfWorkerInStatus(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTasksOfWorkerInStatus", reflect.TypeOf((*MockPersistenceService)(nil).FetchTasksOfWorkerInStatus), arg0, arg1, arg2) +} + // JobHasTasksInStatus mocks base method. func (m *MockPersistenceService) JobHasTasksInStatus(arg0 context.Context, arg1 *persistence.Job, arg2 api.TaskStatus) (bool, error) { m.ctrl.T.Helper() @@ -120,6 +136,20 @@ func (mr *MockPersistenceServiceMockRecorder) SaveTask(arg0, arg1 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveTask", reflect.TypeOf((*MockPersistenceService)(nil).SaveTask), arg0, arg1) } +// SaveTaskActivity mocks base method. +func (m *MockPersistenceService) SaveTaskActivity(arg0 context.Context, arg1 *persistence.Task) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SaveTaskActivity", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SaveTaskActivity indicates an expected call of SaveTaskActivity. +func (mr *MockPersistenceServiceMockRecorder) SaveTaskActivity(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveTaskActivity", reflect.TypeOf((*MockPersistenceService)(nil).SaveTaskActivity), arg0, arg1) +} + // UpdateJobsTaskStatuses mocks base method. func (m *MockPersistenceService) UpdateJobsTaskStatuses(arg0 context.Context, arg1 *persistence.Job, arg2 api.TaskStatus, arg3 string) error { m.ctrl.T.Helper() @@ -194,3 +224,40 @@ func (mr *MockChangeBroadcasterMockRecorder) BroadcastTaskUpdate(arg0 interface{ mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastTaskUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastTaskUpdate), arg0) } + +// MockLogStorage is a mock of LogStorage interface. +type MockLogStorage struct { + ctrl *gomock.Controller + recorder *MockLogStorageMockRecorder +} + +// MockLogStorageMockRecorder is the mock recorder for MockLogStorage. +type MockLogStorageMockRecorder struct { + mock *MockLogStorage +} + +// NewMockLogStorage creates a new mock instance. +func NewMockLogStorage(ctrl *gomock.Controller) *MockLogStorage { + mock := &MockLogStorage{ctrl: ctrl} + mock.recorder = &MockLogStorageMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockLogStorage) EXPECT() *MockLogStorageMockRecorder { + return m.recorder +} + +// WriteTimestamped mocks base method. +func (m *MockLogStorage) WriteTimestamped(arg0 zerolog.Logger, arg1, arg2, arg3 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WriteTimestamped", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// WriteTimestamped indicates an expected call of WriteTimestamped. +func (mr *MockLogStorageMockRecorder) WriteTimestamped(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTimestamped", reflect.TypeOf((*MockLogStorage)(nil).WriteTimestamped), arg0, arg1, arg2, arg3) +} diff --git a/internal/manager/task_state_machine/task_state_machine.go b/internal/manager/task_state_machine/task_state_machine.go index fd250437..dd1f755f 100644 --- a/internal/manager/task_state_machine/task_state_machine.go +++ b/internal/manager/task_state_machine/task_state_machine.go @@ -22,13 +22,14 @@ const taskFailJobPercentage = 10 // Integer from 0 to 100. type StateMachine struct { persist PersistenceService broadcaster ChangeBroadcaster + logStorage LogStorage } - -func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster) *StateMachine { +func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster, logStorage LogStorage) *StateMachine { return &StateMachine{ persist: persist, broadcaster: broadcaster, + logStorage: logStorage, } } diff --git a/internal/manager/task_state_machine/task_state_machine_test.go b/internal/manager/task_state_machine/task_state_machine_test.go index af160bf0..515f7f0c 100644 --- a/internal/manager/task_state_machine/task_state_machine_test.go +++ b/internal/manager/task_state_machine/task_state_machine_test.go @@ -20,6 +20,7 @@ import ( type StateMachineMocks struct { persist *mocks.MockPersistenceService broadcaster *mocks.MockChangeBroadcaster + logStorage *mocks.MockLogStorage } // In the comments below, "T" indicates the performed task status change, and @@ -361,8 +362,9 @@ func mockedTaskStateMachine(mockCtrl *gomock.Controller) (*StateMachine, *StateM mocks := StateMachineMocks{ persist: mocks.NewMockPersistenceService(mockCtrl), broadcaster: mocks.NewMockChangeBroadcaster(mockCtrl), + logStorage: mocks.NewMockLogStorage(mockCtrl), } - sm := NewStateMachine(mocks.persist, mocks.broadcaster) + sm := NewStateMachine(mocks.persist, mocks.broadcaster, mocks.logStorage) return sm, &mocks } diff --git a/internal/manager/task_state_machine/worker_requeue.go b/internal/manager/task_state_machine/worker_requeue.go new file mode 100644 index 00000000..d0aff5b8 --- /dev/null +++ b/internal/manager/task_state_machine/worker_requeue.go @@ -0,0 +1,62 @@ +package task_state_machine + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "fmt" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/pkg/api" + "github.com/rs/zerolog/log" +) + +// RequeueTasksOfWorker re-queues all active tasks (should be max one) of this worker. +// +// `reason`: a string that can be appended to text like "Task requeued because " +func (sm *StateMachine) RequeueTasksOfWorker( + ctx context.Context, + worker *persistence.Worker, + reason string, +) error { + logger := log.With(). + Str("worker", worker.UUID). + Logger() + + // Fetch the tasks to update. + tasks, err := sm.persist.FetchTasksOfWorkerInStatus(ctx, worker, api.TaskStatusActive) + if err != nil { + return fmt.Errorf("fetching tasks of worker %s in status %q: %w", worker.UUID, api.TaskStatusActive, err) + } + + // Run each task change through the task state machine. + var lastErr error + for _, task := range tasks { + logger.Info(). + Str("task", task.UUID). + Msg("re-queueing task") + + // Write to task activity that it got requeued because of worker sign-off. + task.Activity = "Task was requeued by Manager because " + reason + if err := sm.persist.SaveTaskActivity(ctx, task); err != nil { + logger.Warn().Err(err). + Str("task", task.UUID). + Str("reason", reason). + Str("activity", task.Activity). + Msg("error saving task activity to database") + lastErr = err + } + + if err := sm.TaskStatusChange(ctx, task, api.TaskStatusQueued); err != nil { + logger.Warn().Err(err). + Str("task", task.UUID). + Str("reason", reason). + Msg("error queueing task") + lastErr = err + } + + _ = sm.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, task.Activity) + } + + return lastErr +} diff --git a/internal/manager/task_state_machine/worker_requeue_test.go b/internal/manager/task_state_machine/worker_requeue_test.go new file mode 100644 index 00000000..abb19d5a --- /dev/null +++ b/internal/manager/task_state_machine/worker_requeue_test.go @@ -0,0 +1,64 @@ +package task_state_machine + +import ( + "testing" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/pkg/api" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +func TestRequeueTasksOfWorker(t *testing.T) { + mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t) + defer mockCtrl.Finish() + + worker := persistence.Worker{ + UUID: "3ed470c8-d41e-4668-92d0-d799997433a4", + Name: "testert", + } + + // Mock that the worker has two active tasks. It shouldn't happen, but even + // when it does, both should be requeued when the worker signs off. + task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusActive) + task2 := taskOfSameJob(task1, api.TaskStatusActive) + workerTasks := []*persistence.Task{task1, task2} + + task1PrevStatus := task1.Status + task2PrevStatus := task2.Status + + mocks.persist.EXPECT().FetchTasksOfWorkerInStatus(ctx, &worker, api.TaskStatusActive).Return(workerTasks, nil) + + // Expect this re-queueing to end up in the task's log and activity. + mocks.persist.EXPECT().SaveTaskActivity(ctx, task1) // TODO: test saved activity value + mocks.persist.EXPECT().SaveTaskActivity(ctx, task2) // TODO: test saved activity value + mocks.persist.EXPECT().SaveTask(ctx, task1) // TODO: test saved task status + mocks.persist.EXPECT().SaveTask(ctx, task2) // TODO: test saved task status + + logMsg := "Task was requeued by Manager because worker had to test" + mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task1.Job.UUID, task1.UUID, logMsg) + mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task2.Job.UUID, task2.UUID, logMsg) + + mocks.broadcaster.EXPECT().BroadcastTaskUpdate(api.SocketIOTaskUpdate{ + Activity: logMsg, + Id: task1.UUID, + JobId: task1.Job.UUID, + Name: task1.Name, + PreviousStatus: &task1PrevStatus, + Status: api.TaskStatusQueued, + Updated: task1.UpdatedAt, + }) + + mocks.broadcaster.EXPECT().BroadcastTaskUpdate(api.SocketIOTaskUpdate{ + Activity: logMsg, + Id: task2.UUID, + JobId: task2.Job.UUID, + Name: task2.Name, + PreviousStatus: &task2PrevStatus, + Status: api.TaskStatusQueued, + Updated: task2.UpdatedAt, + }) + + err := sm.RequeueTasksOfWorker(ctx, &worker, "worker had to test") + assert.NoError(t, err) +}