Manager: check for stuck jobs at startup

Check for jobs in 'cancel-requested' or 'requeued' statuses, and ensure
they transition to the right status. This happens at startup, before
even starting the web interface, so that a consistent state is presented.
This commit is contained in:
Sybren A. Stüvel 2022-05-06 16:07:27 +02:00
parent cbf07f1142
commit d673da7a0c
6 changed files with 238 additions and 27 deletions

View File

@ -105,11 +105,15 @@ func main() {
// go persist.PeriodicMaintenanceLoop(mainCtx)
webUpdater := webupdates.New()
flamenco := buildFlamencoAPI(configService, persist, webUpdater)
taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater)
flamenco := buildFlamencoAPI(configService, persist, taskStateMachine, webUpdater)
e := buildWebService(flamenco, persist, ssdp, webUpdater, urls)
installSignalHandler(mainCtxCancel)
// Before doing anything new, clean up in case we made a mess in an earlier run.
taskStateMachine.CheckStuck(mainCtx)
// All main goroutines should sync with this waitgroup. Once the waitgroup is
// done, the main() function will return and the process will stop.
wg := new(sync.WaitGroup)
@ -142,14 +146,18 @@ func main() {
log.Info().Msg("shutdown complete")
}
func buildFlamencoAPI(configService *config.Service, persist *persistence.DB, webUpdater *webupdates.BiDirComms) api.ServerInterface {
func buildFlamencoAPI(
configService *config.Service,
persist *persistence.DB,
taskStateMachine *task_state_machine.StateMachine,
webUpdater *webupdates.BiDirComms,
) api.ServerInterface {
timeService := clock.New()
compiler, err := job_compilers.Load(timeService)
if err != nil {
log.Fatal().Err(err).Msg("error loading job compilers")
}
logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath)
taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater)
shamanServer := shaman.NewServer(configService.Get().Shaman, nil)
flamenco := api_impl.NewFlamenco(compiler, persist, webUpdater, logStorage, configService, taskStateMachine, shamanServer)
return flamenco

View File

@ -180,6 +180,20 @@ func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) {
return &dbJob, nil
}
func (db *DB) FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*Job, error) {
var jobs []*Job
tx := db.gormDB.WithContext(ctx).
Model(&Job{}).
Where("status in ?", jobStatuses).
Scan(&jobs)
if tx.Error != nil {
return nil, jobError(tx.Error, "fetching jobs in status %q", jobStatuses)
}
return jobs, nil
}
// SaveJobStatus saves the job's Status and Activity fields.
func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error {
tx := db.gormDB.WithContext(ctx).

View File

@ -97,6 +97,41 @@ func TestCountTasksOfJobInStatus(t *testing.T) {
assert.Equal(t, 3, numTotal)
}
func TestFetchJobsInStatus(t *testing.T) {
ctx, close, db, job1, _ := jobTasksTestFixtures(t)
defer close()
ajob2 := createTestAuthoredJob("1f08e20b-ce24-41c2-b237-36120bd69fc6")
ajob3 := createTestAuthoredJob("3ac2dbb4-0c34-410e-ad3b-652e6d7e65a5")
job2 := persistAuthoredJob(t, ctx, db, ajob2)
job3 := persistAuthoredJob(t, ctx, db, ajob3)
// Sanity check
if !assert.Equal(t, api.JobStatusUnderConstruction, job1.Status) {
return
}
// Query single status
jobs, err := db.FetchJobsInStatus(ctx, api.JobStatusUnderConstruction)
assert.NoError(t, err)
assert.Equal(t, []*Job{job1, job2, job3}, jobs)
// Query two statuses, where only one matches all jobs.
jobs, err = db.FetchJobsInStatus(ctx, api.JobStatusCanceled, api.JobStatusUnderConstruction)
assert.NoError(t, err)
assert.Equal(t, []*Job{job1, job2, job3}, jobs)
// Update a job status, query for two of the three used statuses.
job1.Status = api.JobStatusQueued
assert.NoError(t, db.SaveJobStatus(ctx, job1))
job2.Status = api.JobStatusRequeued
assert.NoError(t, db.SaveJobStatus(ctx, job2))
jobs, err = db.FetchJobsInStatus(ctx, api.JobStatusQueued, api.JobStatusUnderConstruction)
assert.NoError(t, err)
assert.Equal(t, []*Job{job1, job3}, jobs)
}
func TestFetchTasksOfJobInStatus(t *testing.T) {
ctx, close, db, job, authoredJob := jobTasksTestFixtures(t)
defer close()
@ -219,8 +254,12 @@ func createTestAuthoredJobWithTasks() job_compilers.AuthoredJob {
Dependencies: []*job_compilers.AuthoredTask{&task1, &task2},
}
return createTestAuthoredJob("263fd47e-b9f8-4637-b726-fd7e47ecfdae", task1, task2, task3)
}
func createTestAuthoredJob(jobID string, tasks ...job_compilers.AuthoredTask) job_compilers.AuthoredJob {
job := job_compilers.AuthoredJob{
JobID: "263fd47e-b9f8-4637-b726-fd7e47ecfdae",
JobID: jobID,
Name: "Test job",
Status: api.JobStatusUnderConstruction,
Priority: 50,
@ -232,7 +271,7 @@ func createTestAuthoredJobWithTasks() job_compilers.AuthoredJob {
"author": "Sybren",
"project": "Sprite Fright",
},
Tasks: []job_compilers.AuthoredTask{task1, task2, task3},
Tasks: tasks,
}
return job

View File

@ -57,6 +57,26 @@ func (mr *MockPersistenceServiceMockRecorder) CountTasksOfJobInStatus(arg0, arg1
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountTasksOfJobInStatus", reflect.TypeOf((*MockPersistenceService)(nil).CountTasksOfJobInStatus), varargs...)
}
// FetchJobsInStatus mocks base method.
func (m *MockPersistenceService) FetchJobsInStatus(arg0 context.Context, arg1 ...api.JobStatus) ([]*persistence.Job, error) {
m.ctrl.T.Helper()
varargs := []interface{}{arg0}
for _, a := range arg1 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "FetchJobsInStatus", varargs...)
ret0, _ := ret[0].([]*persistence.Job)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchJobsInStatus indicates an expected call of FetchJobsInStatus.
func (mr *MockPersistenceServiceMockRecorder) FetchJobsInStatus(arg0 interface{}, arg1 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0}, arg1...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJobsInStatus", reflect.TypeOf((*MockPersistenceService)(nil).FetchJobsInStatus), varargs...)
}
// FetchTasksOfJob mocks base method.
func (m *MockPersistenceService) FetchTasksOfJob(arg0 context.Context, arg1 *persistence.Job) ([]*persistence.Task, error) {
m.ctrl.T.Helper()

View File

@ -36,6 +36,8 @@ type PersistenceService interface {
FetchTasksOfJob(ctx context.Context, job *persistence.Job) ([]*persistence.Task, error)
FetchTasksOfJobInStatus(ctx context.Context, job *persistence.Job, taskStatuses ...api.TaskStatus) ([]*persistence.Task, error)
FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*persistence.Job, error)
}
// PersistenceService should be a subset of persistence.DB
@ -52,6 +54,16 @@ type ChangeBroadcaster interface {
// ChangeBroadcaster should be a subset of webupdates.BiDirComms
var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil)
var (
// Task statuses that always get requeued when the job is requeued.
nonCompletedStatuses = []api.TaskStatus{
api.TaskStatusCanceled,
api.TaskStatusFailed,
api.TaskStatusPaused,
api.TaskStatusSoftFailed,
}
)
func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster) *StateMachine {
return &StateMachine{
persist: persist,
@ -254,7 +266,6 @@ func (sm *StateMachine) JobStatusChange(
var err error
for newJobStatus != "" && newJobStatus != job.Status {
oldJobStatus := job.Status
job.Status = newJobStatus
job.Activity = fmt.Sprintf("Changed to status %q: %s", newJobStatus, reason)
logger := log.With().
@ -265,26 +276,97 @@ func (sm *StateMachine) JobStatusChange(
Logger()
logger.Info().Msg("job status changed")
// Persist the new job status.
err = sm.persist.SaveJobStatus(ctx, job)
newJobStatus, err = sm.jobStatusSet(ctx, job, newJobStatus, reason, logger)
if err != nil {
return fmt.Errorf("saving job status change %q to %q to database: %w",
return err
}
}
return nil
}
// jobStatusReenforce acts as if the job just transitioned to its current
// status, and performs another round of task status updates. This is normally
// not necessary, but can be used when normal job/task status updates got
// interrupted somehow.
func (sm *StateMachine) jobStatusReenforce(
ctx context.Context,
job *persistence.Job,
reason string,
) error {
// Job status changes can trigger task status changes, which can trigger the
// next job status change. Keep looping over these job status changes until
// there is no more change left to do.
var err error
newJobStatus := job.Status
for {
oldJobStatus := job.Status
job.Activity = fmt.Sprintf("Reenforcing status %q: %s", newJobStatus, reason)
logger := log.With().
Str("job", job.UUID).
Str("reason", reason).
Logger()
if newJobStatus == job.Status {
logger := logger.With().
Str("jobStatus", string(job.Status)).
Logger()
logger.Info().Msg("job status reenforced")
} else {
logger := logger.With().
Str("jobStatusOld", string(oldJobStatus)).
Str("jobStatusNew", string(newJobStatus)).
Logger()
logger.Info().Msg("job status changed")
}
newJobStatus, err = sm.jobStatusSet(ctx, job, newJobStatus, reason, logger)
if err != nil {
return err
}
if newJobStatus == "" || newJobStatus == oldJobStatus {
// Do this check at the end of the loop, and not the start, so that at
// least one iteration is run.
break
}
}
return nil
}
// jobStatusSet saves the job with the new status and handles updates to tasks
// as well. If the task status change should trigger another job status change,
// the new job status is returned.
func (sm *StateMachine) jobStatusSet(ctx context.Context,
job *persistence.Job,
newJobStatus api.JobStatus,
reason string,
logger zerolog.Logger,
) (api.JobStatus, error) {
oldJobStatus := job.Status
job.Status = newJobStatus
// Persist the new job status.
err := sm.persist.SaveJobStatus(ctx, job)
if err != nil {
return "", fmt.Errorf("saving job status change %q to %q to database: %w",
oldJobStatus, newJobStatus, err)
}
// Handle the status change.
newJobStatus, err = sm.updateTasksAfterJobStatusChange(ctx, logger, job, oldJobStatus)
if err != nil {
return fmt.Errorf("updating job's tasks after job status change: %w", err)
return "", fmt.Errorf("updating job's tasks after job status change: %w", err)
}
// Broadcast this change to the SocketIO clients.
jobUpdate := webupdates.NewJobUpdate(job)
jobUpdate.PreviousStatus = &oldJobStatus
sm.broadcaster.BroadcastJobUpdate(jobUpdate)
}
return nil
return newJobStatus, nil
}
// updateTasksAfterJobStatusChange updates the status of its tasks based on the
@ -388,12 +470,7 @@ func (sm *StateMachine) requeueTasks(
tasks, err = sm.persist.FetchTasksOfJob(ctx, job)
default:
// Re-queue only the non-completed tasks.
tasks, err = sm.persist.FetchTasksOfJobInStatus(ctx, job,
api.TaskStatusCanceled,
api.TaskStatusFailed,
api.TaskStatusPaused,
api.TaskStatusSoftFailed,
)
tasks, err = sm.persist.FetchTasksOfJobInStatus(ctx, job, nonCompletedStatuses...)
}
if err != nil {
return "", err
@ -458,3 +535,21 @@ func (sm *StateMachine) checkTaskCompletion(
logger.Info().Msg("job has all tasks completed, transition job to 'completed'")
return api.JobStatusCompleted, nil
}
// CheckStuck finds jobs that are 'stuck' in their current status. This is meant
// to run at startup of Flamenco Manager, and checks to see if there are any
// jobs in a status that a human will not be able to fix otherwise.
func (sm *StateMachine) CheckStuck(ctx context.Context) {
stuckJobs, err := sm.persist.FetchJobsInStatus(ctx, api.JobStatusCancelRequested, api.JobStatusRequeued)
if err != nil {
log.Error().Err(err).Msg("unable to fetch stuck jobs")
return
}
for _, job := range stuckJobs {
err := sm.jobStatusReenforce(ctx, job, "checking stuck jobs")
if err != nil {
log.Error().Str("job", job.UUID).Err(err).Msg("error getting job un-stuck")
}
}
}

View File

@ -339,6 +339,41 @@ func TestJobCancelWithSomeCompletedTasks(t *testing.T) {
assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusCancelRequested, "someone wrote a unittest"))
}
func TestCheckStuck(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusCompleted)
task2 := taskOfSameJob(task1, api.TaskStatusFailed)
task3 := taskOfSameJob(task2, api.TaskStatusSoftFailed)
job := task1.Job
job.Status = api.JobStatusRequeued
mocks.persist.EXPECT().FetchJobsInStatus(ctx, api.JobStatusCancelRequested, api.JobStatusRequeued).
Return([]*persistence.Job{job}, nil)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).Return(2, 3, nil)
mocks.persist.EXPECT().FetchTasksOfJobInStatus(ctx, job,
api.TaskStatusCanceled,
api.TaskStatusFailed,
api.TaskStatusPaused,
api.TaskStatusSoftFailed,
).
Return([]*persistence.Task{task2, task3}, nil)
// Expect Job -> Queued and non-completed tasks -> Queued.
mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeued) // should be called once for the current status
mocks.expectSaveJobWithStatus(t, job, api.JobStatusQueued) // and then with the new status
mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusQueued)
mocks.expectSaveTaskWithStatus(t, task3, api.TaskStatusQueued)
mocks.expectBroadcastJobChange(job, api.JobStatusRequeued, api.JobStatusRequeued)
mocks.expectBroadcastJobChange(job, api.JobStatusRequeued, api.JobStatusQueued)
mocks.expectBroadcastTaskChange(task2, api.TaskStatusFailed, api.TaskStatusQueued)
mocks.expectBroadcastTaskChange(task3, api.TaskStatusSoftFailed, api.TaskStatusQueued)
sm.CheckStuck(ctx)
}
func mockedTaskStateMachine(mockCtrl *gomock.Controller) (*StateMachine, *StateMachineMocks) {
mocks := StateMachineMocks{
persist: mocks.NewMockPersistenceService(mockCtrl),