package task_state_machine // SPDX-License-Identifier: GPL-3.0-or-later import ( "context" "fmt" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/persistence" "projects.blender.org/studio/flamenco/pkg/api" ) // taskFailJobPercentage is the percentage of a job's tasks that need to fail to // trigger failure of the entire job. const taskFailJobPercentage = 10 // Integer from 0 to 100. // StateMachine handles task and job status changes. type StateMachine struct { persist PersistenceService broadcaster ChangeBroadcaster logStorage LogStorage } func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster, logStorage LogStorage) *StateMachine { return &StateMachine{ persist: persist, broadcaster: broadcaster, logStorage: logStorage, } } // TaskStatusChange updates the task's status to the new one. // `task` is expected to still have its original status, and have a filled `Job` pointer. func (sm *StateMachine) TaskStatusChange( ctx context.Context, task *persistence.Task, newTaskStatus api.TaskStatus, ) error { oldTaskStatus := task.Status if err := sm.taskStatusChangeOnly(ctx, task, newTaskStatus); err != nil { return err } if err := sm.updateJobAfterTaskStatusChange(ctx, task, oldTaskStatus); err != nil { return fmt.Errorf("updating job after task status change: %w", err) } return nil } // taskStatusChangeOnly updates the task's status to the new one, but does not "ripple" the change to the job. // `task` is expected to still have its original status, and have a filled `Job` pointer. func (sm *StateMachine) taskStatusChangeOnly( ctx context.Context, task *persistence.Task, newTaskStatus api.TaskStatus, ) error { if task.JobUUID == "" { log.Panic().Str("task", task.UUID).Msg("task without job UUID, cannot handle this") return nil // Will not run because of the panic. } oldTaskStatus := task.Status task.Status = newTaskStatus logger := log.With(). Str("task", task.UUID). Str("job", task.JobUUID). Str("taskStatusOld", string(oldTaskStatus)). Str("taskStatusNew", string(newTaskStatus)). Logger() logger.Debug().Msg("task state changed") if err := sm.persist.SaveTaskStatus(ctx, task); err != nil { return fmt.Errorf("saving task to database: %w", err) } if oldTaskStatus != newTaskStatus { // logStorage already logs any error, and an error here shouldn't block the // rest of the function. _ = sm.logStorage.WriteTimestamped(logger, task.JobUUID, task.UUID, fmt.Sprintf("task changed status %s -> %s", oldTaskStatus, newTaskStatus)) } // Broadcast this change to the SocketIO clients. taskUpdate := eventbus.NewTaskUpdate(task) taskUpdate.PreviousStatus = &oldTaskStatus sm.broadcaster.BroadcastTaskUpdate(taskUpdate) return nil } // updateJobAfterTaskStatusChange updates the job status based on the status of // this task and other tasks in the job. func (sm *StateMachine) updateJobAfterTaskStatusChange( ctx context.Context, task *persistence.Task, oldTaskStatus api.TaskStatus, ) error { job := task.Job if job == nil { log.Panic().Str("task", task.UUID).Msg("task without job, cannot handle this") return nil // Will not run because of the panic. } logger := log.With(). Str("job", task.JobUUID). Str("task", task.UUID). Str("taskStatusOld", string(oldTaskStatus)). Str("taskStatusNew", string(task.Status)). Logger() // Every 'case' in this switch MUST return. Just for sanity's sake. switch task.Status { case api.TaskStatusQueued: // Re-queueing a task on a completed job should re-queue the job too. return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusCompleted, api.JobStatusRequeueing, "task was queued") case api.TaskStatusPaused: // Pausing a task has no impact on the job. return nil case api.TaskStatusCanceled: return sm.updateJobOnTaskStatusCanceled(ctx, logger, job) case api.TaskStatusFailed: return sm.updateJobOnTaskStatusFailed(ctx, logger, job) case api.TaskStatusActive, api.TaskStatusSoftFailed: switch job.Status { case api.JobStatusActive, api.JobStatusCancelRequested: // Do nothing, job is already in the desired status. return nil default: logger.Info().Msg("job became active because one of its task changed status") reason := fmt.Sprintf("task became %s", task.Status) return sm.JobStatusChange(ctx, job, api.JobStatusActive, reason) } case api.TaskStatusCompleted: return sm.updateJobOnTaskStatusCompleted(ctx, logger, job) default: logger.Warn().Msg("task obtained status that Flamenco did not expect") return nil } } // If the job has status 'ifStatus', move it to status 'thenStatus'. func (sm *StateMachine) jobStatusIfAThenB( ctx context.Context, logger zerolog.Logger, job *persistence.Job, ifStatus, thenStatus api.JobStatus, reason string, ) error { if job.Status != ifStatus { return nil } logger.Info(). Str("jobStatusOld", string(ifStatus)). Str("jobStatusNew", string(thenStatus)). Msg("Job will change status because one of its task changed status") return sm.JobStatusChange(ctx, job, thenStatus, reason) } // isJobPausingComplete returns true when the job status is pause-requested and there are no more active tasks. func (sm *StateMachine) isJobPausingComplete(ctx context.Context, job *persistence.Job) (bool, error) { if job.Status != api.JobStatusPauseRequested { return false, nil } numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive) if err != nil { return false, err } return numActive == 0, nil } // updateJobOnTaskStatusCanceled conditionally escalates the cancellation of a task to cancel the job. func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error { // If no more tasks can run, cancel the job. numRunnable, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed) if err != nil { return err } if numRunnable == 0 { // NOTE: this does NOT cancel any non-runnable (paused/failed) tasks. If that's desired, just cancel the job as a whole. logger.Info().Msg("canceled task was last runnable task of job, canceling job") return sm.JobStatusChange(ctx, job, api.JobStatusCanceled, "canceled task was last runnable task of job, canceling job") } // Deal with the special case when the job is in pause-requested status. toBePaused, err := sm.isJobPausingComplete(ctx, job) if err != nil { return err } if toBePaused { return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task cancellation") } return nil } // updateJobOnTaskStatusFailed conditionally escalates the failure of a task to fail the entire job. func (sm *StateMachine) updateJobOnTaskStatusFailed(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error { // Count the number of failed tasks. If it is over the threshold, fail the job. numFailed, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusFailed) if err != nil { return err } failedPercentage := int(float64(numFailed) / float64(numTotal) * 100) failLogger := logger.With(). Int("taskNumTotal", numTotal). Int("taskNumFailed", numFailed). Int("failedPercentage", failedPercentage). Int("threshold", taskFailJobPercentage). Logger() if failedPercentage >= taskFailJobPercentage { failLogger.Info().Msg("failing job because too many of its tasks failed") return sm.JobStatusChange(ctx, job, api.JobStatusFailed, "too many tasks failed") } // If the job didn't fail, this failure indicates that at least the job is active. failLogger.Info().Msg("task failed, but not enough to fail the job") // Deal with the special case when the job is in pause-requested status. toBePaused, err := sm.isJobPausingComplete(ctx, job) if err != nil { return err } if toBePaused { return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task failure") } return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive, "task failed, but not enough to fail the job") } // updateJobOnTaskStatusCompleted conditionally escalates the completion of a task to complete the entire job. func (sm *StateMachine) updateJobOnTaskStatusCompleted(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error { numComplete, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted) if err != nil { return err } if numComplete == numTotal { logger.Info().Msg("all tasks of job are completed, job is completed") return sm.JobStatusChange(ctx, job, api.JobStatusCompleted, "all tasks completed") } // Deal with the special case when the job is in pause-requested status. toBePaused, err := sm.isJobPausingComplete(ctx, job) if err != nil { return err } if toBePaused { return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task completion") } logger.Info(). Int("taskNumTotal", numTotal). Int("taskNumComplete", numComplete). Msg("task completed; there are more tasks to do") return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive, "no more tasks to do") } // JobStatusChange gives a Job a new status, and handles the resulting status changes on its tasks. func (sm *StateMachine) JobStatusChange( ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus, reason string, ) error { // Job status changes can trigger task status changes, which can trigger the // next job status change. Keep looping over these job status changes until // there is no more change left to do. var err error for newJobStatus != "" && newJobStatus != job.Status { oldJobStatus := job.Status job.Activity = fmt.Sprintf("Changed to status %q: %s", newJobStatus, reason) logger := log.With(). Str("job", job.UUID). Str("jobStatusOld", string(oldJobStatus)). Str("jobStatusNew", string(newJobStatus)). Str("reason", reason). Logger() logger.Info().Msg("job status changed") newJobStatus, err = sm.jobStatusSet(ctx, job, newJobStatus, reason, logger) if err != nil { return err } } return nil } // jobStatusReenforce acts as if the job just transitioned to its current // status, and performs another round of task status updates. This is normally // not necessary, but can be used when normal job/task status updates got // interrupted somehow. func (sm *StateMachine) jobStatusReenforce( ctx context.Context, job *persistence.Job, reason string, ) error { // Job status changes can trigger task status changes, which can trigger the // next job status change. Keep looping over these job status changes until // there is no more change left to do. var err error newJobStatus := job.Status for { oldJobStatus := job.Status job.Activity = fmt.Sprintf("Reenforcing status %q: %s", newJobStatus, reason) logger := log.With(). Str("job", job.UUID). Str("reason", reason). Logger() if newJobStatus == job.Status { logger := logger.With(). Str("jobStatus", string(job.Status)). Logger() logger.Info().Msg("job status reenforced") } else { logger := logger.With(). Str("jobStatusOld", string(oldJobStatus)). Str("jobStatusNew", string(newJobStatus)). Logger() logger.Info().Msg("job status changed") } newJobStatus, err = sm.jobStatusSet(ctx, job, newJobStatus, reason, logger) if err != nil { return err } if newJobStatus == "" || newJobStatus == oldJobStatus { // Do this check at the end of the loop, and not the start, so that at // least one iteration is run. break } } return nil } // jobStatusSet saves the job with the new status and handles updates to tasks // as well. If the task status change should trigger another job status change, // the new job status is returned. func (sm *StateMachine) jobStatusSet(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus, reason string, logger zerolog.Logger, ) (api.JobStatus, error) { oldJobStatus := job.Status job.Status = newJobStatus // Persist the new job status. err := sm.persist.SaveJobStatus(ctx, job) if err != nil { return "", fmt.Errorf("saving job status change %q to %q to database: %w", oldJobStatus, newJobStatus, err) } // Handle the status change. result, err := sm.updateTasksAfterJobStatusChange(ctx, logger, job, oldJobStatus) if err != nil { return "", fmt.Errorf("updating job's tasks after job status change: %w", err) } // Broadcast this change to the SocketIO clients. jobUpdate := eventbus.NewJobUpdate(job) jobUpdate.PreviousStatus = &oldJobStatus jobUpdate.RefreshTasks = result.massTaskUpdate sm.broadcaster.BroadcastJobUpdate(jobUpdate) return result.followingJobStatus, nil } // tasksUpdateResult is returned by `updateTasksAfterJobStatusChange`. type tasksUpdateResult struct { // FollowingJobStatus is set when the task updates should trigger another job status update. followingJobStatus api.JobStatus // massTaskUpdate is true when multiple/all tasks were updated simultaneously. // This hasn't triggered individual task updates to SocketIO clients, and thus // the resulting SocketIO job update should indicate all tasks must be // reloaded. massTaskUpdate bool } // updateTasksAfterJobStatusChange updates the status of its tasks based on the // new status of this job. // // NOTE: this function assumes that the job already has its new status. // // Returns the new state the job should go into after this change, or an empty // string if there is no subsequent change necessary. func (sm *StateMachine) updateTasksAfterJobStatusChange( ctx context.Context, logger zerolog.Logger, job *persistence.Job, oldJobStatus api.JobStatus, ) (tasksUpdateResult, error) { // Every case in this switch MUST return, for sanity sake. switch job.Status { case api.JobStatusCompleted, api.JobStatusCanceled, api.JobStatusPaused: // Nothing to do; this will happen as a response to all tasks receiving this status. return tasksUpdateResult{}, nil case api.JobStatusActive: // Nothing to do; this happens when a task gets started, which has nothing to // do with other tasks in the job. return tasksUpdateResult{}, nil case api.JobStatusCancelRequested, api.JobStatusFailed: jobStatus, err := sm.cancelTasks(ctx, logger, job) return tasksUpdateResult{ followingJobStatus: jobStatus, massTaskUpdate: true, }, err case api.JobStatusPauseRequested: jobStatus, err := sm.pauseTasks(ctx, logger, job) return tasksUpdateResult{ followingJobStatus: jobStatus, massTaskUpdate: true, }, err case api.JobStatusRequeueing: jobStatus, err := sm.requeueTasks(ctx, logger, job, oldJobStatus) return tasksUpdateResult{ followingJobStatus: jobStatus, massTaskUpdate: true, }, err case api.JobStatusQueued: jobStatus, err := sm.checkTaskCompletion(ctx, logger, job) return tasksUpdateResult{ followingJobStatus: jobStatus, massTaskUpdate: true, }, err default: logger.Warn().Msg("unknown job status change, ignoring") return tasksUpdateResult{}, nil } } // Directly cancel any task that might run in the future. // // Returns the next job status, if a status change is required. func (sm *StateMachine) cancelTasks( ctx context.Context, logger zerolog.Logger, job *persistence.Job, ) (api.JobStatus, error) { logger.Info().Msg("cancelling tasks of job") // Any task that is running or might run in the future should get cancelled. taskStatusesToCancel := []api.TaskStatus{ api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed, } err := sm.persist.UpdateJobsTaskStatusesConditional( ctx, job, taskStatusesToCancel, api.TaskStatusCanceled, fmt.Sprintf("Manager cancelled this task because the job got status %q.", job.Status), ) if err != nil { return "", fmt.Errorf("cancelling tasks of job %s: %w", job.UUID, err) } // If cancellation was requested, it has now happened, so the job can transition. if job.Status == api.JobStatusCancelRequested { logger.Info().Msg("all tasks of job cancelled, job can go to 'cancelled' status") return api.JobStatusCanceled, nil } // This could mean cancellation was triggered by failure of the job, in which // case the job is already in the correct status. return "", nil } func (sm *StateMachine) pauseTasks( ctx context.Context, logger zerolog.Logger, job *persistence.Job, ) (api.JobStatus, error) { logger.Info().Msg("pausing tasks of job") // Any task that might run in the future should get paused. // Active tasks should remain active until finished. taskStatusesToPause := []api.TaskStatus{ api.TaskStatusQueued, api.TaskStatusSoftFailed, } err := sm.persist.UpdateJobsTaskStatusesConditional( ctx, job, taskStatusesToPause, api.TaskStatusPaused, fmt.Sprintf("Manager paused this task because the job got status %q.", job.Status), ) if err != nil { return "", fmt.Errorf("pausing tasks of job %s: %w", job.UUID, err) } // If pausing was requested, it has now happened, so the job can transition. toBePaused, err := sm.isJobPausingComplete(ctx, job) if err != nil { return "", err } if toBePaused { logger.Info().Msg("all tasks of job paused, job can go to 'paused' status") return api.JobStatusPaused, nil } return api.JobStatusPauseRequested, nil } // requeueTasks re-queues all tasks of the job. // // This function assumes that the current job status is "requeueing". // // Returns the new job status, if this status transition should be followed by // another one. func (sm *StateMachine) requeueTasks( ctx context.Context, logger zerolog.Logger, job *persistence.Job, oldJobStatus api.JobStatus, ) (api.JobStatus, error) { if job.Status != api.JobStatusRequeueing { logger.Warn().Msg("unexpected job status in StateMachine::requeueTasks()") } var err error switch oldJobStatus { case api.JobStatusUnderConstruction: // Nothing to do, the job compiler has just finished its work; the tasks have // already been set to 'queued' status. logger.Debug().Msg("ignoring job status change") return "", nil case api.JobStatusCompleted: // Re-queue all tasks. err = sm.persist.UpdateJobsTaskStatuses(ctx, job, api.TaskStatusQueued, fmt.Sprintf("Queued because job transitioned status from %q to %q", oldJobStatus, job.Status)) default: statusesToUpdate := []api.TaskStatus{ api.TaskStatusCanceled, api.TaskStatusFailed, api.TaskStatusPaused, api.TaskStatusSoftFailed, } // Re-queue only the non-completed tasks. err = sm.persist.UpdateJobsTaskStatusesConditional(ctx, job, statusesToUpdate, api.TaskStatusQueued, fmt.Sprintf("Queued because job transitioned status from %q to %q", oldJobStatus, job.Status)) } if err != nil { return "", fmt.Errorf("queueing tasks of job %s: %w", job.UUID, err) } // TODO: also reset the 'failed by workers' blocklist. // The appropriate tasks have been requeued, so now the job can go from "requeueing" to "queued". return api.JobStatusQueued, nil } // checkTaskCompletion returns "completed" as next job status when all tasks of // the job are completed. // // Returns the new job status, if this status transition should be followed by // another one. func (sm *StateMachine) checkTaskCompletion( ctx context.Context, logger zerolog.Logger, job *persistence.Job, ) (api.JobStatus, error) { numCompleted, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted) if err != nil { return "", fmt.Errorf("checking task completion of job %s: %w", job.UUID, err) } if numCompleted < numTotal { logger.Debug(). Int("numTasksCompleted", numCompleted). Int("numTasksTotal", numTotal). Msg("not all tasks of job are completed") return "", nil } logger.Info().Msg("job has all tasks completed, transition job to 'completed'") return api.JobStatusCompleted, nil } // CheckStuck finds jobs that are 'stuck' in their current status. This is meant // to run at startup of Flamenco Manager, and checks to see if there are any // jobs in a status that a human will not be able to fix otherwise. func (sm *StateMachine) CheckStuck(ctx context.Context) { stuckJobs, err := sm.persist.FetchJobsInStatus(ctx, api.JobStatusCancelRequested, api.JobStatusRequeueing) if err != nil { log.Error().Err(err).Msg("unable to fetch stuck jobs") return } for _, job := range stuckJobs { err := sm.jobStatusReenforce(ctx, job, "checking stuck jobs") if err != nil { log.Error().Str("job", job.UUID).Err(err).Msg("error getting job un-stuck") } } }