diff --git a/internal/manager/api_impl/api_impl.go b/internal/manager/api_impl/api_impl.go index edf816d0..778c147a 100644 --- a/internal/manager/api_impl/api_impl.go +++ b/internal/manager/api_impl/api_impl.go @@ -65,7 +65,7 @@ type TaskStateMachine interface { TaskStatusChange(ctx context.Context, task *persistence.Task, newStatus api.TaskStatus) error // JobStatusChange gives a Job a new status, and handles the resulting status changes on its tasks. - JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus) error + JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus, reason string) error } // TaskStateMachine should be a subset of task_state_machine.StateMachine. diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index e0c49fd7..4a2930d4 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -440,17 +440,17 @@ func (m *MockTaskStateMachine) EXPECT() *MockTaskStateMachineMockRecorder { } // JobStatusChange mocks base method. -func (m *MockTaskStateMachine) JobStatusChange(arg0 context.Context, arg1 *persistence.Job, arg2 api.JobStatus) error { +func (m *MockTaskStateMachine) JobStatusChange(arg0 context.Context, arg1 *persistence.Job, arg2 api.JobStatus, arg3 string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "JobStatusChange", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "JobStatusChange", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) return ret0 } // JobStatusChange indicates an expected call of JobStatusChange. -func (mr *MockTaskStateMachineMockRecorder) JobStatusChange(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockTaskStateMachineMockRecorder) JobStatusChange(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "JobStatusChange", reflect.TypeOf((*MockTaskStateMachine)(nil).JobStatusChange), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "JobStatusChange", reflect.TypeOf((*MockTaskStateMachine)(nil).JobStatusChange), arg0, arg1, arg2, arg3) } // TaskStatusChange mocks base method. diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index ac6fcc5b..bba341b3 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -22,6 +22,7 @@ type Job struct { JobType string `gorm:"type:varchar(32);default:''"` Priority int `gorm:"type:smallint;default:0"` Status api.JobStatus `gorm:"type:varchar(32);default:''"` + Activity string `gorm:"type:varchar(255);default:''"` Settings StringInterfaceMap `gorm:"type:jsonb"` Metadata StringStringMap `gorm:"type:jsonb"` @@ -178,10 +179,11 @@ func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) { return &dbJob, nil } +// SaveJobStatus saves the job's Status and Activity fields. func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error { tx := db.gormDB.WithContext(ctx). Model(j). - Updates(Job{Status: j.Status}) + Updates(Job{Status: j.Status, Activity: j.Activity}) if tx.Error != nil { return jobError(tx.Error, "saving job status") } diff --git a/internal/manager/task_state_machine/task_state_machine.go b/internal/manager/task_state_machine/task_state_machine.go index 771fa362..3e7e2163 100644 --- a/internal/manager/task_state_machine/task_state_machine.go +++ b/internal/manager/task_state_machine/task_state_machine.go @@ -113,7 +113,7 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange( switch task.Status { case api.TaskStatusQueued: // Re-queueing a task on a completed job should re-queue the job too. - return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusCompleted, api.JobStatusRequeued) + return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusCompleted, api.JobStatusRequeued, "task was queued") case api.TaskStatusCancelRequested: // Requesting cancellation of a single task has no influence on the job itself. @@ -136,7 +136,8 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange( return nil default: logger.Info().Msg("job became active because one of its task changed status") - return sm.JobStatusChange(ctx, job, api.JobStatusActive) + reason := fmt.Sprintf("task became %s", task.Status) + return sm.JobStatusChange(ctx, job, api.JobStatusActive, reason) } case api.TaskStatusCompleted: @@ -154,6 +155,7 @@ func (sm *StateMachine) jobStatusIfAThenB( logger zerolog.Logger, job *persistence.Job, ifStatus, thenStatus api.JobStatus, + reason string, ) error { if job.Status != ifStatus { return nil @@ -162,7 +164,7 @@ func (sm *StateMachine) jobStatusIfAThenB( Str("jobStatusOld", string(ifStatus)). Str("jobStatusNew", string(thenStatus)). Msg("Job will change status because one of its task changed status") - return sm.JobStatusChange(ctx, job, thenStatus) + return sm.JobStatusChange(ctx, job, thenStatus, reason) } // onTaskStatusCanceled conditionally escalates the cancellation of a task to cancel the job. @@ -180,7 +182,7 @@ func (sm *StateMachine) onTaskStatusCanceled(ctx context.Context, logger zerolog } if !hasCancelReq { logger.Info().Msg("last task of job went from cancel-requested to canceled") - return sm.JobStatusChange(ctx, job, api.JobStatusCanceled) + return sm.JobStatusChange(ctx, job, api.JobStatusCanceled, "tasks were canceled") } return nil } @@ -202,11 +204,12 @@ func (sm *StateMachine) onTaskStatusFailed(ctx context.Context, logger zerolog.L if failedPercentage >= taskFailJobPercentage { failLogger.Info().Msg("failing job because too many of its tasks failed") - return sm.JobStatusChange(ctx, job, api.JobStatusFailed) + return sm.JobStatusChange(ctx, job, api.JobStatusFailed, "too many tasks failed") } // If the job didn't fail, this failure indicates that at least the job is active. failLogger.Info().Msg("task failed, but not enough to fail the job") - return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive) + return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive, + "task failed, but not enough to fail the job") } // onTaskStatusCompleted conditionally escalates the completion of a task to complete the entire job. @@ -217,17 +220,22 @@ func (sm *StateMachine) onTaskStatusCompleted(ctx context.Context, logger zerolo } if numComplete == numTotal { logger.Info().Msg("all tasks of job are completed, job is completed") - return sm.JobStatusChange(ctx, job, api.JobStatusCompleted) + return sm.JobStatusChange(ctx, job, api.JobStatusCompleted, "all tasks completed") } logger.Info(). Int("taskNumTotal", numTotal). Int("taskNumComplete", numComplete). Msg("task completed; there are more tasks to do") - return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive) + return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive, "no more tasks to do") } // JobStatusChange gives a Job a new status, and handles the resulting status changes on its tasks. -func (sm *StateMachine) JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus) error { +func (sm *StateMachine) JobStatusChange( + ctx context.Context, + job *persistence.Job, + newJobStatus api.JobStatus, + reason string, +) error { // Job status changes can trigger task status changes, which can trigger the // next job status change. Keep looping over these job status changes until // there is no more change left to do. @@ -235,11 +243,13 @@ func (sm *StateMachine) JobStatusChange(ctx context.Context, job *persistence.Jo for newJobStatus != "" && newJobStatus != job.Status { oldJobStatus := job.Status job.Status = newJobStatus + job.Activity = fmt.Sprintf("Changed to status %q: %s", newJobStatus, reason) logger := log.With(). Str("job", job.UUID). Str("jobStatusOld", string(oldJobStatus)). Str("jobStatusNew", string(newJobStatus)). + Str("reason", reason). Logger() logger.Info().Msg("job status changed") diff --git a/internal/manager/task_state_machine/task_state_machine_test.go b/internal/manager/task_state_machine/task_state_machine_test.go index 7052dc59..27ea6880 100644 --- a/internal/manager/task_state_machine/task_state_machine_test.go +++ b/internal/manager/task_state_machine/task_state_machine_test.go @@ -213,7 +213,7 @@ func TestJobRequeueWithSomeCompletedTasks(t *testing.T) { mocks.expectBroadcastJobChange(job, api.JobStatusActive, api.JobStatusRequeued) mocks.expectBroadcastJobChange(job, api.JobStatusRequeued, api.JobStatusQueued) - assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusRequeued)) + assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusRequeued, "someone wrote a unittest")) } func TestJobRequeueWithAllCompletedTasks(t *testing.T) { @@ -244,7 +244,7 @@ func TestJobRequeueWithAllCompletedTasks(t *testing.T) { mocks.expectBroadcastJobChange(job, api.JobStatusCompleted, api.JobStatusRequeued) mocks.expectBroadcastJobChange(job, api.JobStatusRequeued, api.JobStatusQueued) - assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusRequeued)) + assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusRequeued, "someone wrote a unit test")) } func mockedTaskStateMachine(mockCtrl *gomock.Controller) (*StateMachine, *StateMachineMocks) {