diff --git a/internal/manager/task_state_machine/task_state_machine.go b/internal/manager/task_state_machine/task_state_machine.go index 915a4eca..0f7335b0 100644 --- a/internal/manager/task_state_machine/task_state_machine.go +++ b/internal/manager/task_state_machine/task_state_machine.go @@ -80,6 +80,11 @@ func (sm *StateMachine) taskStatusChangeOnly( return fmt.Errorf("saving task to database: %w", err) } + // logStorage already logs any error, and an error here shouldn't block the + // rest of the function. + _ = sm.logStorage.WriteTimestamped(logger, job.UUID, task.UUID, + fmt.Sprintf("task changed status %s -> %s", oldTaskStatus, newTaskStatus)) + // Broadcast this change to the SocketIO clients. taskUpdate := webupdates.NewTaskUpdate(task) taskUpdate.PreviousStatus = &oldTaskStatus diff --git a/internal/manager/task_state_machine/task_state_machine_test.go b/internal/manager/task_state_machine/task_state_machine_test.go index 403c6e91..f41e10bc 100644 --- a/internal/manager/task_state_machine/task_state_machine_test.go +++ b/internal/manager/task_state_machine/task_state_machine_test.go @@ -32,6 +32,7 @@ func TestTaskStatusChangeQueuedToActive(t *testing.T) { // T: queued > active --> J: queued > active task := taskWithStatus(api.JobStatusQueued, api.TaskStatusQueued) mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusActive) + mocks.expectWriteTaskLogTimestamped(t, task, "task changed status queued -> active") mocks.expectSaveJobWithStatus(t, task.Job, api.JobStatusActive) mocks.expectBroadcastJobChange(task.Job, api.JobStatusQueued, api.JobStatusActive) mocks.expectBroadcastTaskChange(task, api.TaskStatusQueued, api.TaskStatusActive) @@ -53,6 +54,7 @@ func TestTaskStatusChangeSaveTaskAfterJobChangeFailure(t *testing.T) { // Expect a call to save the task in the persistence layer, regardless of the above error. mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusActive) + mocks.expectWriteTaskLogTimestamped(t, task, "task changed status queued -> active") mocks.expectBroadcastTaskChange(task, api.TaskStatusQueued, api.TaskStatusActive) returnedErr := sm.TaskStatusChange(ctx, task, api.TaskStatusActive) @@ -70,23 +72,27 @@ func TestTaskStatusChangeActiveToCompleted(t *testing.T) { // First task completing: T: active > completed --> J: active > active mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusCompleted) + mocks.expectWriteTaskLogTimestamped(t, task, "task changed status active -> completed") mocks.expectBroadcastTaskChange(task, api.TaskStatusActive, api.TaskStatusCompleted) mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusCompleted).Return(1, 3, nil) // 1 of 3 complete. assert.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatusCompleted)) // Second task hickup: T: active > soft-failed --> J: active > active mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusSoftFailed) + mocks.expectWriteTaskLogTimestamped(t, task2, "task changed status active -> soft-failed") mocks.expectBroadcastTaskChange(task2, api.TaskStatusActive, api.TaskStatusSoftFailed) assert.NoError(t, sm.TaskStatusChange(ctx, task2, api.TaskStatusSoftFailed)) // Second task completing: T: soft-failed > completed --> J: active > active mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusCompleted) + mocks.expectWriteTaskLogTimestamped(t, task2, "task changed status soft-failed -> completed") mocks.expectBroadcastTaskChange(task2, api.TaskStatusSoftFailed, api.TaskStatusCompleted) mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusCompleted).Return(2, 3, nil) // 2 of 3 complete. assert.NoError(t, sm.TaskStatusChange(ctx, task2, api.TaskStatusCompleted)) // Third task completing: T: active > completed --> J: active > completed mocks.expectSaveTaskWithStatus(t, task3, api.TaskStatusCompleted) + mocks.expectWriteTaskLogTimestamped(t, task3, "task changed status active -> completed") mocks.expectBroadcastTaskChange(task3, api.TaskStatusActive, api.TaskStatusCompleted) mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusCompleted).Return(3, 3, nil) // 3 of 3 complete. mocks.expectSaveJobWithStatus(t, task.Job, api.JobStatusCompleted) @@ -102,6 +108,7 @@ func TestTaskStatusChangeQueuedToFailed(t *testing.T) { // T: queued > failed (1% task failure) --> J: queued > active task := taskWithStatus(api.JobStatusQueued, api.TaskStatusQueued) mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusFailed) + mocks.expectWriteTaskLogTimestamped(t, task, "task changed status queued -> failed") mocks.expectBroadcastTaskChange(task, api.TaskStatusQueued, api.TaskStatusFailed) mocks.expectSaveJobWithStatus(t, task.Job, api.JobStatusActive) mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusFailed).Return(1, 100, nil) // 1 out of 100 failed. @@ -117,6 +124,7 @@ func TestTaskStatusChangeActiveToFailedFailJob(t *testing.T) { // T: active > failed (10% task1 failure) --> J: active > failed + cancellation of any runnable tasks. task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusActive) mocks.expectSaveTaskWithStatus(t, task1, api.TaskStatusFailed) + mocks.expectWriteTaskLogTimestamped(t, task1, "task changed status active -> failed") // The change to the failed task should be broadcast. mocks.expectBroadcastTaskChange(task1, api.TaskStatusActive, api.TaskStatusFailed) mocks.expectSaveJobWithStatus(t, task1.Job, api.JobStatusFailed) @@ -146,6 +154,7 @@ func TestTaskStatusChangeRequeueOnCompletedJob(t *testing.T) { // T: completed > queued --> J: completed > requeueing > queued task := taskWithStatus(api.JobStatusCompleted, api.TaskStatusCompleted) mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusQueued) + mocks.expectWriteTaskLogTimestamped(t, task, "task changed status completed -> queued") mocks.expectBroadcastTaskChange(task, api.TaskStatusCompleted, api.TaskStatusQueued) mocks.expectSaveJobWithStatus(t, task.Job, api.JobStatusRequeueing) mocks.expectBroadcastJobChangeWithTaskRefresh(task.Job, api.JobStatusCompleted, api.JobStatusRequeueing) @@ -172,6 +181,7 @@ func TestTaskStatusChangeCancelSingleTask(t *testing.T) { // T1: active > cancelled --> J: cancel-requested > cancel-requested mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusCanceled) + mocks.expectWriteTaskLogTimestamped(t, task, "task changed status active -> canceled") mocks.expectBroadcastTaskChange(task, api.TaskStatusActive, api.TaskStatusCanceled) mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed). @@ -180,6 +190,7 @@ func TestTaskStatusChangeCancelSingleTask(t *testing.T) { // T2: queued > cancelled --> J: cancel-requested > canceled mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusCanceled) + mocks.expectWriteTaskLogTimestamped(t, task2, "task changed status queued -> canceled") mocks.expectBroadcastTaskChange(task2, api.TaskStatusQueued, api.TaskStatusCanceled) mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed). @@ -201,6 +212,7 @@ func TestTaskStatusChangeCancelSingleTaskWithOtherFailed(t *testing.T) { // T1: active > cancelled --> J: cancel-requested > canceled because T2 already failed and cannot run anyway. mocks.expectSaveTaskWithStatus(t, task1, api.TaskStatusCanceled) + mocks.expectWriteTaskLogTimestamped(t, task1, "task changed status active -> canceled") mocks.expectBroadcastTaskChange(task1, api.TaskStatusActive, api.TaskStatusCanceled) mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed). @@ -220,6 +232,7 @@ func TestTaskStatusChangeUnknownStatus(t *testing.T) { // T: queued > borked --> saved to DB but otherwise ignored w.r.t. job status changes. task := taskWithStatus(api.JobStatusQueued, api.TaskStatusQueued) mocks.expectSaveTaskWithStatus(t, task, api.TaskStatus("borked")) + mocks.expectWriteTaskLogTimestamped(t, task, "task changed status queued -> borked") mocks.expectBroadcastTaskChange(task, api.TaskStatusQueued, api.TaskStatus("borked")) assert.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatus("borked"))) @@ -379,6 +392,15 @@ func (m *StateMachineMocks) expectSaveTaskWithStatus( return nil }) } +func (m *StateMachineMocks) expectWriteTaskLogTimestamped( + t *testing.T, + task *persistence.Task, + logtext string, +) *gomock.Call { + return m.logStorage.EXPECT().WriteTimestamped( + gomock.Any(), task.Job.UUID, task.UUID, logtext, + ) +} func (m *StateMachineMocks) expectSaveJobWithStatus( t *testing.T, diff --git a/internal/manager/task_state_machine/worker_requeue_test.go b/internal/manager/task_state_machine/worker_requeue_test.go index 0a1b4544..13432b46 100644 --- a/internal/manager/task_state_machine/worker_requeue_test.go +++ b/internal/manager/task_state_machine/worker_requeue_test.go @@ -35,12 +35,16 @@ func TestRequeueActiveTasksOfWorker(t *testing.T) { mocks.persist.EXPECT().SaveTaskStatus(ctx, task1) // TODO: test saved task status mocks.persist.EXPECT().SaveTaskStatus(ctx, task2) // TODO: test saved task status - logMsg := "Task was requeued by Manager because worker had to test" - mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task1.Job.UUID, task1.UUID, logMsg) - mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task2.Job.UUID, task2.UUID, logMsg) + logMsg1 := "task changed status active -> queued" + mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task1.Job.UUID, task1.UUID, logMsg1) + mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task2.Job.UUID, task2.UUID, logMsg1) + + logMsg2 := "Task was requeued by Manager because worker had to test" + mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task1.Job.UUID, task1.UUID, logMsg2) + mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task2.Job.UUID, task2.UUID, logMsg2) mocks.broadcaster.EXPECT().BroadcastTaskUpdate(api.SocketIOTaskUpdate{ - Activity: logMsg, + Activity: logMsg2, Id: task1.UUID, JobId: task1.Job.UUID, Name: task1.Name, @@ -50,7 +54,7 @@ func TestRequeueActiveTasksOfWorker(t *testing.T) { }) mocks.broadcaster.EXPECT().BroadcastTaskUpdate(api.SocketIOTaskUpdate{ - Activity: logMsg, + Activity: logMsg2, Id: task2.UUID, JobId: task2.Job.UUID, Name: task2.Name,