From d673da7a0c2f7312bfa89f0532bb94906db6181c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Fri, 6 May 2022 16:07:27 +0200 Subject: [PATCH] Manager: check for stuck jobs at startup Check for jobs in 'cancel-requested' or 'requeued' statuses, and ensure they transition to the right status. This happens at startup, before even starting the web interface, so that a consistent state is presented. --- cmd/flamenco-manager/main.go | 14 +- internal/manager/persistence/jobs.go | 14 ++ internal/manager/persistence/jobs_test.go | 43 +++++- .../mocks/interfaces_mock.gen.go | 20 +++ .../task_state_machine/task_state_machine.go | 139 +++++++++++++++--- .../task_state_machine_test.go | 35 +++++ 6 files changed, 238 insertions(+), 27 deletions(-) diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index 1862724d..15db8f78 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -105,11 +105,15 @@ func main() { // go persist.PeriodicMaintenanceLoop(mainCtx) webUpdater := webupdates.New() - flamenco := buildFlamencoAPI(configService, persist, webUpdater) + taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater) + flamenco := buildFlamencoAPI(configService, persist, taskStateMachine, webUpdater) e := buildWebService(flamenco, persist, ssdp, webUpdater, urls) installSignalHandler(mainCtxCancel) + // Before doing anything new, clean up in case we made a mess in an earlier run. + taskStateMachine.CheckStuck(mainCtx) + // All main goroutines should sync with this waitgroup. Once the waitgroup is // done, the main() function will return and the process will stop. wg := new(sync.WaitGroup) @@ -142,14 +146,18 @@ func main() { log.Info().Msg("shutdown complete") } -func buildFlamencoAPI(configService *config.Service, persist *persistence.DB, webUpdater *webupdates.BiDirComms) api.ServerInterface { +func buildFlamencoAPI( + configService *config.Service, + persist *persistence.DB, + taskStateMachine *task_state_machine.StateMachine, + webUpdater *webupdates.BiDirComms, +) api.ServerInterface { timeService := clock.New() compiler, err := job_compilers.Load(timeService) if err != nil { log.Fatal().Err(err).Msg("error loading job compilers") } logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath) - taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater) shamanServer := shaman.NewServer(configService.Get().Shaman, nil) flamenco := api_impl.NewFlamenco(compiler, persist, webUpdater, logStorage, configService, taskStateMachine, shamanServer) return flamenco diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 8d801bd3..655156e5 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -180,6 +180,20 @@ func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) { return &dbJob, nil } +func (db *DB) FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*Job, error) { + var jobs []*Job + + tx := db.gormDB.WithContext(ctx). + Model(&Job{}). + Where("status in ?", jobStatuses). + Scan(&jobs) + + if tx.Error != nil { + return nil, jobError(tx.Error, "fetching jobs in status %q", jobStatuses) + } + return jobs, nil +} + // SaveJobStatus saves the job's Status and Activity fields. func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error { tx := db.gormDB.WithContext(ctx). diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index 8024f817..eea75d33 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -97,6 +97,41 @@ func TestCountTasksOfJobInStatus(t *testing.T) { assert.Equal(t, 3, numTotal) } +func TestFetchJobsInStatus(t *testing.T) { + ctx, close, db, job1, _ := jobTasksTestFixtures(t) + defer close() + + ajob2 := createTestAuthoredJob("1f08e20b-ce24-41c2-b237-36120bd69fc6") + ajob3 := createTestAuthoredJob("3ac2dbb4-0c34-410e-ad3b-652e6d7e65a5") + job2 := persistAuthoredJob(t, ctx, db, ajob2) + job3 := persistAuthoredJob(t, ctx, db, ajob3) + + // Sanity check + if !assert.Equal(t, api.JobStatusUnderConstruction, job1.Status) { + return + } + + // Query single status + jobs, err := db.FetchJobsInStatus(ctx, api.JobStatusUnderConstruction) + assert.NoError(t, err) + assert.Equal(t, []*Job{job1, job2, job3}, jobs) + + // Query two statuses, where only one matches all jobs. + jobs, err = db.FetchJobsInStatus(ctx, api.JobStatusCanceled, api.JobStatusUnderConstruction) + assert.NoError(t, err) + assert.Equal(t, []*Job{job1, job2, job3}, jobs) + + // Update a job status, query for two of the three used statuses. + job1.Status = api.JobStatusQueued + assert.NoError(t, db.SaveJobStatus(ctx, job1)) + job2.Status = api.JobStatusRequeued + assert.NoError(t, db.SaveJobStatus(ctx, job2)) + + jobs, err = db.FetchJobsInStatus(ctx, api.JobStatusQueued, api.JobStatusUnderConstruction) + assert.NoError(t, err) + assert.Equal(t, []*Job{job1, job3}, jobs) +} + func TestFetchTasksOfJobInStatus(t *testing.T) { ctx, close, db, job, authoredJob := jobTasksTestFixtures(t) defer close() @@ -219,8 +254,12 @@ func createTestAuthoredJobWithTasks() job_compilers.AuthoredJob { Dependencies: []*job_compilers.AuthoredTask{&task1, &task2}, } + return createTestAuthoredJob("263fd47e-b9f8-4637-b726-fd7e47ecfdae", task1, task2, task3) +} + +func createTestAuthoredJob(jobID string, tasks ...job_compilers.AuthoredTask) job_compilers.AuthoredJob { job := job_compilers.AuthoredJob{ - JobID: "263fd47e-b9f8-4637-b726-fd7e47ecfdae", + JobID: jobID, Name: "Test job", Status: api.JobStatusUnderConstruction, Priority: 50, @@ -232,7 +271,7 @@ func createTestAuthoredJobWithTasks() job_compilers.AuthoredJob { "author": "Sybren", "project": "Sprite Fright", }, - Tasks: []job_compilers.AuthoredTask{task1, task2, task3}, + Tasks: tasks, } return job diff --git a/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go b/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go index eff48c35..135989ec 100644 --- a/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go +++ b/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go @@ -57,6 +57,26 @@ func (mr *MockPersistenceServiceMockRecorder) CountTasksOfJobInStatus(arg0, arg1 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountTasksOfJobInStatus", reflect.TypeOf((*MockPersistenceService)(nil).CountTasksOfJobInStatus), varargs...) } +// FetchJobsInStatus mocks base method. +func (m *MockPersistenceService) FetchJobsInStatus(arg0 context.Context, arg1 ...api.JobStatus) ([]*persistence.Job, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "FetchJobsInStatus", varargs...) + ret0, _ := ret[0].([]*persistence.Job) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchJobsInStatus indicates an expected call of FetchJobsInStatus. +func (mr *MockPersistenceServiceMockRecorder) FetchJobsInStatus(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJobsInStatus", reflect.TypeOf((*MockPersistenceService)(nil).FetchJobsInStatus), varargs...) +} + // FetchTasksOfJob mocks base method. func (m *MockPersistenceService) FetchTasksOfJob(arg0 context.Context, arg1 *persistence.Job) ([]*persistence.Task, error) { m.ctrl.T.Helper() diff --git a/internal/manager/task_state_machine/task_state_machine.go b/internal/manager/task_state_machine/task_state_machine.go index f596be81..d985b180 100644 --- a/internal/manager/task_state_machine/task_state_machine.go +++ b/internal/manager/task_state_machine/task_state_machine.go @@ -36,6 +36,8 @@ type PersistenceService interface { FetchTasksOfJob(ctx context.Context, job *persistence.Job) ([]*persistence.Task, error) FetchTasksOfJobInStatus(ctx context.Context, job *persistence.Job, taskStatuses ...api.TaskStatus) ([]*persistence.Task, error) + + FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*persistence.Job, error) } // PersistenceService should be a subset of persistence.DB @@ -52,6 +54,16 @@ type ChangeBroadcaster interface { // ChangeBroadcaster should be a subset of webupdates.BiDirComms var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) +var ( + // Task statuses that always get requeued when the job is requeued. + nonCompletedStatuses = []api.TaskStatus{ + api.TaskStatusCanceled, + api.TaskStatusFailed, + api.TaskStatusPaused, + api.TaskStatusSoftFailed, + } +) + func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster) *StateMachine { return &StateMachine{ persist: persist, @@ -254,7 +266,6 @@ func (sm *StateMachine) JobStatusChange( var err error for newJobStatus != "" && newJobStatus != job.Status { oldJobStatus := job.Status - job.Status = newJobStatus job.Activity = fmt.Sprintf("Changed to status %q: %s", newJobStatus, reason) logger := log.With(). @@ -265,28 +276,99 @@ func (sm *StateMachine) JobStatusChange( Logger() logger.Info().Msg("job status changed") - // Persist the new job status. - err = sm.persist.SaveJobStatus(ctx, job) + newJobStatus, err = sm.jobStatusSet(ctx, job, newJobStatus, reason, logger) if err != nil { - return fmt.Errorf("saving job status change %q to %q to database: %w", - oldJobStatus, newJobStatus, err) + return err } - - // Handle the status change. - newJobStatus, err = sm.updateTasksAfterJobStatusChange(ctx, logger, job, oldJobStatus) - if err != nil { - return fmt.Errorf("updating job's tasks after job status change: %w", err) - } - - // Broadcast this change to the SocketIO clients. - jobUpdate := webupdates.NewJobUpdate(job) - jobUpdate.PreviousStatus = &oldJobStatus - sm.broadcaster.BroadcastJobUpdate(jobUpdate) } return nil } +// jobStatusReenforce acts as if the job just transitioned to its current +// status, and performs another round of task status updates. This is normally +// not necessary, but can be used when normal job/task status updates got +// interrupted somehow. +func (sm *StateMachine) jobStatusReenforce( + ctx context.Context, + job *persistence.Job, + reason string, +) error { + // Job status changes can trigger task status changes, which can trigger the + // next job status change. Keep looping over these job status changes until + // there is no more change left to do. + var err error + newJobStatus := job.Status + + for { + oldJobStatus := job.Status + job.Activity = fmt.Sprintf("Reenforcing status %q: %s", newJobStatus, reason) + + logger := log.With(). + Str("job", job.UUID). + Str("reason", reason). + Logger() + if newJobStatus == job.Status { + logger := logger.With(). + Str("jobStatus", string(job.Status)). + Logger() + logger.Info().Msg("job status reenforced") + } else { + logger := logger.With(). + Str("jobStatusOld", string(oldJobStatus)). + Str("jobStatusNew", string(newJobStatus)). + Logger() + logger.Info().Msg("job status changed") + } + + newJobStatus, err = sm.jobStatusSet(ctx, job, newJobStatus, reason, logger) + if err != nil { + return err + } + + if newJobStatus == "" || newJobStatus == oldJobStatus { + // Do this check at the end of the loop, and not the start, so that at + // least one iteration is run. + break + } + } + + return nil +} + +// jobStatusSet saves the job with the new status and handles updates to tasks +// as well. If the task status change should trigger another job status change, +// the new job status is returned. +func (sm *StateMachine) jobStatusSet(ctx context.Context, + job *persistence.Job, + newJobStatus api.JobStatus, + reason string, + logger zerolog.Logger, +) (api.JobStatus, error) { + oldJobStatus := job.Status + job.Status = newJobStatus + + // Persist the new job status. + err := sm.persist.SaveJobStatus(ctx, job) + if err != nil { + return "", fmt.Errorf("saving job status change %q to %q to database: %w", + oldJobStatus, newJobStatus, err) + } + + // Handle the status change. + newJobStatus, err = sm.updateTasksAfterJobStatusChange(ctx, logger, job, oldJobStatus) + if err != nil { + return "", fmt.Errorf("updating job's tasks after job status change: %w", err) + } + + // Broadcast this change to the SocketIO clients. + jobUpdate := webupdates.NewJobUpdate(job) + jobUpdate.PreviousStatus = &oldJobStatus + sm.broadcaster.BroadcastJobUpdate(jobUpdate) + + return newJobStatus, nil +} + // updateTasksAfterJobStatusChange updates the status of its tasks based on the // new status of this job. // @@ -388,12 +470,7 @@ func (sm *StateMachine) requeueTasks( tasks, err = sm.persist.FetchTasksOfJob(ctx, job) default: // Re-queue only the non-completed tasks. - tasks, err = sm.persist.FetchTasksOfJobInStatus(ctx, job, - api.TaskStatusCanceled, - api.TaskStatusFailed, - api.TaskStatusPaused, - api.TaskStatusSoftFailed, - ) + tasks, err = sm.persist.FetchTasksOfJobInStatus(ctx, job, nonCompletedStatuses...) } if err != nil { return "", err @@ -458,3 +535,21 @@ func (sm *StateMachine) checkTaskCompletion( logger.Info().Msg("job has all tasks completed, transition job to 'completed'") return api.JobStatusCompleted, nil } + +// CheckStuck finds jobs that are 'stuck' in their current status. This is meant +// to run at startup of Flamenco Manager, and checks to see if there are any +// jobs in a status that a human will not be able to fix otherwise. +func (sm *StateMachine) CheckStuck(ctx context.Context) { + stuckJobs, err := sm.persist.FetchJobsInStatus(ctx, api.JobStatusCancelRequested, api.JobStatusRequeued) + if err != nil { + log.Error().Err(err).Msg("unable to fetch stuck jobs") + return + } + + for _, job := range stuckJobs { + err := sm.jobStatusReenforce(ctx, job, "checking stuck jobs") + if err != nil { + log.Error().Str("job", job.UUID).Err(err).Msg("error getting job un-stuck") + } + } +} diff --git a/internal/manager/task_state_machine/task_state_machine_test.go b/internal/manager/task_state_machine/task_state_machine_test.go index e3669f34..bf4b4582 100644 --- a/internal/manager/task_state_machine/task_state_machine_test.go +++ b/internal/manager/task_state_machine/task_state_machine_test.go @@ -339,6 +339,41 @@ func TestJobCancelWithSomeCompletedTasks(t *testing.T) { assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusCancelRequested, "someone wrote a unittest")) } +func TestCheckStuck(t *testing.T) { + mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t) + defer mockCtrl.Finish() + + task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusCompleted) + task2 := taskOfSameJob(task1, api.TaskStatusFailed) + task3 := taskOfSameJob(task2, api.TaskStatusSoftFailed) + job := task1.Job + job.Status = api.JobStatusRequeued + + mocks.persist.EXPECT().FetchJobsInStatus(ctx, api.JobStatusCancelRequested, api.JobStatusRequeued). + Return([]*persistence.Job{job}, nil) + mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).Return(2, 3, nil) + mocks.persist.EXPECT().FetchTasksOfJobInStatus(ctx, job, + api.TaskStatusCanceled, + api.TaskStatusFailed, + api.TaskStatusPaused, + api.TaskStatusSoftFailed, + ). + Return([]*persistence.Task{task2, task3}, nil) + + // Expect Job -> Queued and non-completed tasks -> Queued. + mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeued) // should be called once for the current status + mocks.expectSaveJobWithStatus(t, job, api.JobStatusQueued) // and then with the new status + mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusQueued) + mocks.expectSaveTaskWithStatus(t, task3, api.TaskStatusQueued) + + mocks.expectBroadcastJobChange(job, api.JobStatusRequeued, api.JobStatusRequeued) + mocks.expectBroadcastJobChange(job, api.JobStatusRequeued, api.JobStatusQueued) + mocks.expectBroadcastTaskChange(task2, api.TaskStatusFailed, api.TaskStatusQueued) + mocks.expectBroadcastTaskChange(task3, api.TaskStatusSoftFailed, api.TaskStatusQueued) + + sm.CheckStuck(ctx) +} + func mockedTaskStateMachine(mockCtrl *gomock.Controller) (*StateMachine, *StateMachineMocks) { mocks := StateMachineMocks{ persist: mocks.NewMockPersistenceService(mockCtrl),