From d79fde17f351ff42f1b48b2361ec47e2ab6251ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Thu, 21 Apr 2022 12:32:07 +0200 Subject: [PATCH] Manager: keep track of the reason of job status changes To prepare for job status changes being requestable from the API, store the reason for any status change on the job itself. Not yet part of the API, just on the persistence layer. --- internal/manager/api_impl/api_impl.go | 2 +- .../api_impl/mocks/api_impl_mock.gen.go | 8 +++--- internal/manager/persistence/jobs.go | 4 ++- .../task_state_machine/task_state_machine.go | 28 +++++++++++++------ .../task_state_machine_test.go | 4 +-- 5 files changed, 29 insertions(+), 17 deletions(-) diff --git a/internal/manager/api_impl/api_impl.go b/internal/manager/api_impl/api_impl.go index edf816d0..778c147a 100644 --- a/internal/manager/api_impl/api_impl.go +++ b/internal/manager/api_impl/api_impl.go @@ -65,7 +65,7 @@ type TaskStateMachine interface { TaskStatusChange(ctx context.Context, task *persistence.Task, newStatus api.TaskStatus) error // JobStatusChange gives a Job a new status, and handles the resulting status changes on its tasks. - JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus) error + JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus, reason string) error } // TaskStateMachine should be a subset of task_state_machine.StateMachine. diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index e0c49fd7..4a2930d4 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -440,17 +440,17 @@ func (m *MockTaskStateMachine) EXPECT() *MockTaskStateMachineMockRecorder { } // JobStatusChange mocks base method. -func (m *MockTaskStateMachine) JobStatusChange(arg0 context.Context, arg1 *persistence.Job, arg2 api.JobStatus) error { +func (m *MockTaskStateMachine) JobStatusChange(arg0 context.Context, arg1 *persistence.Job, arg2 api.JobStatus, arg3 string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "JobStatusChange", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "JobStatusChange", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) return ret0 } // JobStatusChange indicates an expected call of JobStatusChange. -func (mr *MockTaskStateMachineMockRecorder) JobStatusChange(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockTaskStateMachineMockRecorder) JobStatusChange(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "JobStatusChange", reflect.TypeOf((*MockTaskStateMachine)(nil).JobStatusChange), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "JobStatusChange", reflect.TypeOf((*MockTaskStateMachine)(nil).JobStatusChange), arg0, arg1, arg2, arg3) } // TaskStatusChange mocks base method. diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index ac6fcc5b..bba341b3 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -22,6 +22,7 @@ type Job struct { JobType string `gorm:"type:varchar(32);default:''"` Priority int `gorm:"type:smallint;default:0"` Status api.JobStatus `gorm:"type:varchar(32);default:''"` + Activity string `gorm:"type:varchar(255);default:''"` Settings StringInterfaceMap `gorm:"type:jsonb"` Metadata StringStringMap `gorm:"type:jsonb"` @@ -178,10 +179,11 @@ func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) { return &dbJob, nil } +// SaveJobStatus saves the job's Status and Activity fields. func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error { tx := db.gormDB.WithContext(ctx). Model(j). - Updates(Job{Status: j.Status}) + Updates(Job{Status: j.Status, Activity: j.Activity}) if tx.Error != nil { return jobError(tx.Error, "saving job status") } diff --git a/internal/manager/task_state_machine/task_state_machine.go b/internal/manager/task_state_machine/task_state_machine.go index 771fa362..3e7e2163 100644 --- a/internal/manager/task_state_machine/task_state_machine.go +++ b/internal/manager/task_state_machine/task_state_machine.go @@ -113,7 +113,7 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange( switch task.Status { case api.TaskStatusQueued: // Re-queueing a task on a completed job should re-queue the job too. - return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusCompleted, api.JobStatusRequeued) + return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusCompleted, api.JobStatusRequeued, "task was queued") case api.TaskStatusCancelRequested: // Requesting cancellation of a single task has no influence on the job itself. @@ -136,7 +136,8 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange( return nil default: logger.Info().Msg("job became active because one of its task changed status") - return sm.JobStatusChange(ctx, job, api.JobStatusActive) + reason := fmt.Sprintf("task became %s", task.Status) + return sm.JobStatusChange(ctx, job, api.JobStatusActive, reason) } case api.TaskStatusCompleted: @@ -154,6 +155,7 @@ func (sm *StateMachine) jobStatusIfAThenB( logger zerolog.Logger, job *persistence.Job, ifStatus, thenStatus api.JobStatus, + reason string, ) error { if job.Status != ifStatus { return nil @@ -162,7 +164,7 @@ func (sm *StateMachine) jobStatusIfAThenB( Str("jobStatusOld", string(ifStatus)). Str("jobStatusNew", string(thenStatus)). Msg("Job will change status because one of its task changed status") - return sm.JobStatusChange(ctx, job, thenStatus) + return sm.JobStatusChange(ctx, job, thenStatus, reason) } // onTaskStatusCanceled conditionally escalates the cancellation of a task to cancel the job. @@ -180,7 +182,7 @@ func (sm *StateMachine) onTaskStatusCanceled(ctx context.Context, logger zerolog } if !hasCancelReq { logger.Info().Msg("last task of job went from cancel-requested to canceled") - return sm.JobStatusChange(ctx, job, api.JobStatusCanceled) + return sm.JobStatusChange(ctx, job, api.JobStatusCanceled, "tasks were canceled") } return nil } @@ -202,11 +204,12 @@ func (sm *StateMachine) onTaskStatusFailed(ctx context.Context, logger zerolog.L if failedPercentage >= taskFailJobPercentage { failLogger.Info().Msg("failing job because too many of its tasks failed") - return sm.JobStatusChange(ctx, job, api.JobStatusFailed) + return sm.JobStatusChange(ctx, job, api.JobStatusFailed, "too many tasks failed") } // If the job didn't fail, this failure indicates that at least the job is active. failLogger.Info().Msg("task failed, but not enough to fail the job") - return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive) + return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive, + "task failed, but not enough to fail the job") } // onTaskStatusCompleted conditionally escalates the completion of a task to complete the entire job. @@ -217,17 +220,22 @@ func (sm *StateMachine) onTaskStatusCompleted(ctx context.Context, logger zerolo } if numComplete == numTotal { logger.Info().Msg("all tasks of job are completed, job is completed") - return sm.JobStatusChange(ctx, job, api.JobStatusCompleted) + return sm.JobStatusChange(ctx, job, api.JobStatusCompleted, "all tasks completed") } logger.Info(). Int("taskNumTotal", numTotal). Int("taskNumComplete", numComplete). Msg("task completed; there are more tasks to do") - return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive) + return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive, "no more tasks to do") } // JobStatusChange gives a Job a new status, and handles the resulting status changes on its tasks. -func (sm *StateMachine) JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus) error { +func (sm *StateMachine) JobStatusChange( + ctx context.Context, + job *persistence.Job, + newJobStatus api.JobStatus, + reason string, +) error { // Job status changes can trigger task status changes, which can trigger the // next job status change. Keep looping over these job status changes until // there is no more change left to do. @@ -235,11 +243,13 @@ func (sm *StateMachine) JobStatusChange(ctx context.Context, job *persistence.Jo for newJobStatus != "" && newJobStatus != job.Status { oldJobStatus := job.Status job.Status = newJobStatus + job.Activity = fmt.Sprintf("Changed to status %q: %s", newJobStatus, reason) logger := log.With(). Str("job", job.UUID). Str("jobStatusOld", string(oldJobStatus)). Str("jobStatusNew", string(newJobStatus)). + Str("reason", reason). Logger() logger.Info().Msg("job status changed") diff --git a/internal/manager/task_state_machine/task_state_machine_test.go b/internal/manager/task_state_machine/task_state_machine_test.go index 7052dc59..27ea6880 100644 --- a/internal/manager/task_state_machine/task_state_machine_test.go +++ b/internal/manager/task_state_machine/task_state_machine_test.go @@ -213,7 +213,7 @@ func TestJobRequeueWithSomeCompletedTasks(t *testing.T) { mocks.expectBroadcastJobChange(job, api.JobStatusActive, api.JobStatusRequeued) mocks.expectBroadcastJobChange(job, api.JobStatusRequeued, api.JobStatusQueued) - assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusRequeued)) + assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusRequeued, "someone wrote a unittest")) } func TestJobRequeueWithAllCompletedTasks(t *testing.T) { @@ -244,7 +244,7 @@ func TestJobRequeueWithAllCompletedTasks(t *testing.T) { mocks.expectBroadcastJobChange(job, api.JobStatusCompleted, api.JobStatusRequeued) mocks.expectBroadcastJobChange(job, api.JobStatusRequeued, api.JobStatusQueued) - assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusRequeued)) + assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusRequeued, "someone wrote a unit test")) } func mockedTaskStateMachine(mockCtrl *gomock.Controller) (*StateMachine, *StateMachineMocks) {