Manager: add entry to task log whenever task changes status
Add a line to the task log whenever task changes status. This only applies to directly-changed tasks, and not to mass-updates (like all tasks going from 'completed' to 'queued' on a job requeue).
This commit is contained in:
parent
043ca032cb
commit
ac3236786b
@ -80,6 +80,11 @@ func (sm *StateMachine) taskStatusChangeOnly(
|
||||
return fmt.Errorf("saving task to database: %w", err)
|
||||
}
|
||||
|
||||
// logStorage already logs any error, and an error here shouldn't block the
|
||||
// rest of the function.
|
||||
_ = sm.logStorage.WriteTimestamped(logger, job.UUID, task.UUID,
|
||||
fmt.Sprintf("task changed status %s -> %s", oldTaskStatus, newTaskStatus))
|
||||
|
||||
// Broadcast this change to the SocketIO clients.
|
||||
taskUpdate := webupdates.NewTaskUpdate(task)
|
||||
taskUpdate.PreviousStatus = &oldTaskStatus
|
||||
|
@ -32,6 +32,7 @@ func TestTaskStatusChangeQueuedToActive(t *testing.T) {
|
||||
// T: queued > active --> J: queued > active
|
||||
task := taskWithStatus(api.JobStatusQueued, api.TaskStatusQueued)
|
||||
mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusActive)
|
||||
mocks.expectWriteTaskLogTimestamped(t, task, "task changed status queued -> active")
|
||||
mocks.expectSaveJobWithStatus(t, task.Job, api.JobStatusActive)
|
||||
mocks.expectBroadcastJobChange(task.Job, api.JobStatusQueued, api.JobStatusActive)
|
||||
mocks.expectBroadcastTaskChange(task, api.TaskStatusQueued, api.TaskStatusActive)
|
||||
@ -53,6 +54,7 @@ func TestTaskStatusChangeSaveTaskAfterJobChangeFailure(t *testing.T) {
|
||||
|
||||
// Expect a call to save the task in the persistence layer, regardless of the above error.
|
||||
mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusActive)
|
||||
mocks.expectWriteTaskLogTimestamped(t, task, "task changed status queued -> active")
|
||||
mocks.expectBroadcastTaskChange(task, api.TaskStatusQueued, api.TaskStatusActive)
|
||||
|
||||
returnedErr := sm.TaskStatusChange(ctx, task, api.TaskStatusActive)
|
||||
@ -70,23 +72,27 @@ func TestTaskStatusChangeActiveToCompleted(t *testing.T) {
|
||||
|
||||
// First task completing: T: active > completed --> J: active > active
|
||||
mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusCompleted)
|
||||
mocks.expectWriteTaskLogTimestamped(t, task, "task changed status active -> completed")
|
||||
mocks.expectBroadcastTaskChange(task, api.TaskStatusActive, api.TaskStatusCompleted)
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusCompleted).Return(1, 3, nil) // 1 of 3 complete.
|
||||
assert.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatusCompleted))
|
||||
|
||||
// Second task hickup: T: active > soft-failed --> J: active > active
|
||||
mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusSoftFailed)
|
||||
mocks.expectWriteTaskLogTimestamped(t, task2, "task changed status active -> soft-failed")
|
||||
mocks.expectBroadcastTaskChange(task2, api.TaskStatusActive, api.TaskStatusSoftFailed)
|
||||
assert.NoError(t, sm.TaskStatusChange(ctx, task2, api.TaskStatusSoftFailed))
|
||||
|
||||
// Second task completing: T: soft-failed > completed --> J: active > active
|
||||
mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusCompleted)
|
||||
mocks.expectWriteTaskLogTimestamped(t, task2, "task changed status soft-failed -> completed")
|
||||
mocks.expectBroadcastTaskChange(task2, api.TaskStatusSoftFailed, api.TaskStatusCompleted)
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusCompleted).Return(2, 3, nil) // 2 of 3 complete.
|
||||
assert.NoError(t, sm.TaskStatusChange(ctx, task2, api.TaskStatusCompleted))
|
||||
|
||||
// Third task completing: T: active > completed --> J: active > completed
|
||||
mocks.expectSaveTaskWithStatus(t, task3, api.TaskStatusCompleted)
|
||||
mocks.expectWriteTaskLogTimestamped(t, task3, "task changed status active -> completed")
|
||||
mocks.expectBroadcastTaskChange(task3, api.TaskStatusActive, api.TaskStatusCompleted)
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusCompleted).Return(3, 3, nil) // 3 of 3 complete.
|
||||
mocks.expectSaveJobWithStatus(t, task.Job, api.JobStatusCompleted)
|
||||
@ -102,6 +108,7 @@ func TestTaskStatusChangeQueuedToFailed(t *testing.T) {
|
||||
// T: queued > failed (1% task failure) --> J: queued > active
|
||||
task := taskWithStatus(api.JobStatusQueued, api.TaskStatusQueued)
|
||||
mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusFailed)
|
||||
mocks.expectWriteTaskLogTimestamped(t, task, "task changed status queued -> failed")
|
||||
mocks.expectBroadcastTaskChange(task, api.TaskStatusQueued, api.TaskStatusFailed)
|
||||
mocks.expectSaveJobWithStatus(t, task.Job, api.JobStatusActive)
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusFailed).Return(1, 100, nil) // 1 out of 100 failed.
|
||||
@ -117,6 +124,7 @@ func TestTaskStatusChangeActiveToFailedFailJob(t *testing.T) {
|
||||
// T: active > failed (10% task1 failure) --> J: active > failed + cancellation of any runnable tasks.
|
||||
task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusActive)
|
||||
mocks.expectSaveTaskWithStatus(t, task1, api.TaskStatusFailed)
|
||||
mocks.expectWriteTaskLogTimestamped(t, task1, "task changed status active -> failed")
|
||||
// The change to the failed task should be broadcast.
|
||||
mocks.expectBroadcastTaskChange(task1, api.TaskStatusActive, api.TaskStatusFailed)
|
||||
mocks.expectSaveJobWithStatus(t, task1.Job, api.JobStatusFailed)
|
||||
@ -146,6 +154,7 @@ func TestTaskStatusChangeRequeueOnCompletedJob(t *testing.T) {
|
||||
// T: completed > queued --> J: completed > requeueing > queued
|
||||
task := taskWithStatus(api.JobStatusCompleted, api.TaskStatusCompleted)
|
||||
mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusQueued)
|
||||
mocks.expectWriteTaskLogTimestamped(t, task, "task changed status completed -> queued")
|
||||
mocks.expectBroadcastTaskChange(task, api.TaskStatusCompleted, api.TaskStatusQueued)
|
||||
mocks.expectSaveJobWithStatus(t, task.Job, api.JobStatusRequeueing)
|
||||
mocks.expectBroadcastJobChangeWithTaskRefresh(task.Job, api.JobStatusCompleted, api.JobStatusRequeueing)
|
||||
@ -172,6 +181,7 @@ func TestTaskStatusChangeCancelSingleTask(t *testing.T) {
|
||||
|
||||
// T1: active > cancelled --> J: cancel-requested > cancel-requested
|
||||
mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusCanceled)
|
||||
mocks.expectWriteTaskLogTimestamped(t, task, "task changed status active -> canceled")
|
||||
mocks.expectBroadcastTaskChange(task, api.TaskStatusActive, api.TaskStatusCanceled)
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
|
||||
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed).
|
||||
@ -180,6 +190,7 @@ func TestTaskStatusChangeCancelSingleTask(t *testing.T) {
|
||||
|
||||
// T2: queued > cancelled --> J: cancel-requested > canceled
|
||||
mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusCanceled)
|
||||
mocks.expectWriteTaskLogTimestamped(t, task2, "task changed status queued -> canceled")
|
||||
mocks.expectBroadcastTaskChange(task2, api.TaskStatusQueued, api.TaskStatusCanceled)
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
|
||||
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed).
|
||||
@ -201,6 +212,7 @@ func TestTaskStatusChangeCancelSingleTaskWithOtherFailed(t *testing.T) {
|
||||
|
||||
// T1: active > cancelled --> J: cancel-requested > canceled because T2 already failed and cannot run anyway.
|
||||
mocks.expectSaveTaskWithStatus(t, task1, api.TaskStatusCanceled)
|
||||
mocks.expectWriteTaskLogTimestamped(t, task1, "task changed status active -> canceled")
|
||||
mocks.expectBroadcastTaskChange(task1, api.TaskStatusActive, api.TaskStatusCanceled)
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
|
||||
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed).
|
||||
@ -220,6 +232,7 @@ func TestTaskStatusChangeUnknownStatus(t *testing.T) {
|
||||
// T: queued > borked --> saved to DB but otherwise ignored w.r.t. job status changes.
|
||||
task := taskWithStatus(api.JobStatusQueued, api.TaskStatusQueued)
|
||||
mocks.expectSaveTaskWithStatus(t, task, api.TaskStatus("borked"))
|
||||
mocks.expectWriteTaskLogTimestamped(t, task, "task changed status queued -> borked")
|
||||
mocks.expectBroadcastTaskChange(task, api.TaskStatusQueued, api.TaskStatus("borked"))
|
||||
|
||||
assert.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatus("borked")))
|
||||
@ -379,6 +392,15 @@ func (m *StateMachineMocks) expectSaveTaskWithStatus(
|
||||
return nil
|
||||
})
|
||||
}
|
||||
func (m *StateMachineMocks) expectWriteTaskLogTimestamped(
|
||||
t *testing.T,
|
||||
task *persistence.Task,
|
||||
logtext string,
|
||||
) *gomock.Call {
|
||||
return m.logStorage.EXPECT().WriteTimestamped(
|
||||
gomock.Any(), task.Job.UUID, task.UUID, logtext,
|
||||
)
|
||||
}
|
||||
|
||||
func (m *StateMachineMocks) expectSaveJobWithStatus(
|
||||
t *testing.T,
|
||||
|
@ -35,12 +35,16 @@ func TestRequeueActiveTasksOfWorker(t *testing.T) {
|
||||
mocks.persist.EXPECT().SaveTaskStatus(ctx, task1) // TODO: test saved task status
|
||||
mocks.persist.EXPECT().SaveTaskStatus(ctx, task2) // TODO: test saved task status
|
||||
|
||||
logMsg := "Task was requeued by Manager because worker had to test"
|
||||
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task1.Job.UUID, task1.UUID, logMsg)
|
||||
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task2.Job.UUID, task2.UUID, logMsg)
|
||||
logMsg1 := "task changed status active -> queued"
|
||||
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task1.Job.UUID, task1.UUID, logMsg1)
|
||||
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task2.Job.UUID, task2.UUID, logMsg1)
|
||||
|
||||
logMsg2 := "Task was requeued by Manager because worker had to test"
|
||||
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task1.Job.UUID, task1.UUID, logMsg2)
|
||||
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task2.Job.UUID, task2.UUID, logMsg2)
|
||||
|
||||
mocks.broadcaster.EXPECT().BroadcastTaskUpdate(api.SocketIOTaskUpdate{
|
||||
Activity: logMsg,
|
||||
Activity: logMsg2,
|
||||
Id: task1.UUID,
|
||||
JobId: task1.Job.UUID,
|
||||
Name: task1.Name,
|
||||
@ -50,7 +54,7 @@ func TestRequeueActiveTasksOfWorker(t *testing.T) {
|
||||
})
|
||||
|
||||
mocks.broadcaster.EXPECT().BroadcastTaskUpdate(api.SocketIOTaskUpdate{
|
||||
Activity: logMsg,
|
||||
Activity: logMsg2,
|
||||
Id: task2.UUID,
|
||||
JobId: task2.Job.UUID,
|
||||
Name: task2.Name,
|
||||
|
Loading…
x
Reference in New Issue
Block a user