Sybren A. Stüvel d10bb0a9d7 Manager: at startup check for stuck pause-requested jobs
When the Manager starts up, it now also checks whether `pause-requested`
jobs can actually go to `paused`.
2025-02-28 12:32:36 +01:00

682 lines
22 KiB
Go

package task_state_machine
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"fmt"
"sync"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"projects.blender.org/studio/flamenco/internal/manager/eventbus"
"projects.blender.org/studio/flamenco/internal/manager/persistence"
"projects.blender.org/studio/flamenco/pkg/api"
)
// taskFailJobPercentage is the percentage of a job's tasks that need to fail to
// trigger failure of the entire job.
const taskFailJobPercentage = 10 // Integer from 0 to 100.
// StateMachine handles task and job status changes.
type StateMachine struct {
persist PersistenceService
broadcaster ChangeBroadcaster
logStorage LogStorage
// mutex protects all public functions, so that only one function can run at a time.
// This is to avoid race conditions on task/job status updates.
mutex *sync.Mutex
}
func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster, logStorage LogStorage) *StateMachine {
return &StateMachine{
persist: persist,
broadcaster: broadcaster,
logStorage: logStorage,
mutex: new(sync.Mutex),
}
}
// TaskStatusChange updates the task's status to the new one.
// `task` is expected to still have its original status, and have a filled `Job` pointer.
// This is the external API endpoint, which ensures the state machine is locked.
func (sm *StateMachine) TaskStatusChange(
ctx context.Context,
task *persistence.Task,
newTaskStatus api.TaskStatus,
) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
return sm.taskStatusChange(ctx, task, newTaskStatus)
}
// taskStatusChange updates the task's status to the new one.
// `task` is expected to still have its original status, and have a filled `Job` pointer.
// This is the internal version fo TaskStatusChange(), which assumes the state
// machine is already locked.
func (sm *StateMachine) taskStatusChange(
ctx context.Context,
task *persistence.Task,
newTaskStatus api.TaskStatus,
) error {
if task.JobID == 0 {
log.Panic().Str("task", task.UUID).Msg("task without job ID, cannot handle this")
return nil // Will not run because of the panic.
}
job, err := sm.persist.FetchJobByID(ctx, task.JobID)
if err != nil {
return fmt.Errorf("cannot fetch the job of task %s: %w", task.UUID, err)
}
oldTaskStatus := task.Status
if err := sm.taskStatusChangeOnly(ctx, task, job, newTaskStatus); err != nil {
return err
}
if err := sm.updateJobAfterTaskStatusChange(ctx, task, job, oldTaskStatus); err != nil {
return fmt.Errorf("updating job after task status change: %w", err)
}
return nil
}
// taskStatusChangeOnly updates the task's status to the new one, but does not "ripple" the change to the job.
// `task` is expected to still have its original status, and have a filled `Job` pointer.
func (sm *StateMachine) taskStatusChangeOnly(
ctx context.Context,
task *persistence.Task,
job *persistence.Job,
newTaskStatus api.TaskStatus,
) error {
oldTaskStatus := task.Status
task.Status = newTaskStatus
logger := log.With().
Str("task", task.UUID).
Str("job", job.UUID).
Str("taskStatusOld", string(oldTaskStatus)).
Str("taskStatusNew", string(newTaskStatus)).
Logger()
logger.Debug().Msg("task state changed")
if err := sm.persist.SaveTaskStatus(ctx, task); err != nil {
return fmt.Errorf("saving task to database: %w", err)
}
if oldTaskStatus != newTaskStatus {
// logStorage already logs any error, and an error here shouldn't block the
// rest of the function.
_ = sm.logStorage.WriteTimestamped(logger, job.UUID, task.UUID,
fmt.Sprintf("task changed status %s -> %s", oldTaskStatus, newTaskStatus))
}
// Broadcast this change to the SocketIO clients.
taskUpdate := eventbus.NewTaskUpdate(*task, job.UUID)
taskUpdate.PreviousStatus = &oldTaskStatus
sm.broadcaster.BroadcastTaskUpdate(taskUpdate)
return nil
}
// updateJobAfterTaskStatusChange updates the job status based on the status of
// this task and other tasks in the job.
func (sm *StateMachine) updateJobAfterTaskStatusChange(
ctx context.Context,
task *persistence.Task,
job *persistence.Job,
oldTaskStatus api.TaskStatus,
) error {
if job == nil {
log.Panic().Str("task", task.UUID).Msg("task without job, cannot handle this")
return nil // Will not run because of the panic.
}
logger := log.With().
Str("job", job.UUID).
Str("task", task.UUID).
Str("taskStatusOld", string(oldTaskStatus)).
Str("taskStatusNew", string(task.Status)).
Logger()
// Every 'case' in this switch MUST return. Just for sanity's sake.
switch task.Status {
case api.TaskStatusQueued:
// Re-queueing a task on a completed job should re-queue the job too.
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusCompleted, api.JobStatusRequeueing, "task was queued")
case api.TaskStatusPaused:
return sm.updateJobOnTaskStatusPaused(ctx, job)
case api.TaskStatusCanceled:
return sm.updateJobOnTaskStatusCanceled(ctx, logger, job)
case api.TaskStatusFailed:
return sm.updateJobOnTaskStatusFailed(ctx, logger, job)
case api.TaskStatusActive, api.TaskStatusSoftFailed:
switch job.Status {
case api.JobStatusActive, api.JobStatusCancelRequested:
// Do nothing, job is already in the desired status.
return nil
default:
logger.Info().Msg("job became active because one of its task changed status")
reason := fmt.Sprintf("task became %s", task.Status)
return sm.jobStatusChange(ctx, job, api.JobStatusActive, reason)
}
case api.TaskStatusCompleted:
return sm.updateJobOnTaskStatusCompleted(ctx, logger, job)
default:
logger.Warn().Msg("task obtained status that Flamenco did not expect")
return nil
}
}
// If the job has status 'ifStatus', move it to status 'thenStatus'.
func (sm *StateMachine) jobStatusIfAThenB(
ctx context.Context,
logger zerolog.Logger,
job *persistence.Job,
ifStatus, thenStatus api.JobStatus,
reason string,
) error {
if job.Status != ifStatus {
return nil
}
logger.Info().
Str("jobStatusOld", string(ifStatus)).
Str("jobStatusNew", string(thenStatus)).
Msg("Job will change status because one of its task changed status")
return sm.jobStatusChange(ctx, job, thenStatus, reason)
}
// isJobPausingComplete returns true when the job status is pause-requested and there are no more active tasks.
func (sm *StateMachine) isJobPausingComplete(ctx context.Context, job *persistence.Job) (bool, error) {
if job.Status != api.JobStatusPauseRequested {
return false, nil
}
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
if err != nil {
return false, err
}
return numActive == 0, nil
}
// updateJobOnTaskStatusCanceled conditionally escalates the cancellation of a task to cancel the job.
func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error {
// If no more tasks can run, cancel the job.
numRunnable, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed)
if err != nil {
return err
}
if numRunnable == 0 {
// NOTE: this does NOT cancel any non-runnable (paused/failed) tasks. If that's desired, just cancel the job as a whole.
logger.Info().Msg("canceled task was last runnable task of job, canceling job")
return sm.jobStatusChange(ctx, job, api.JobStatusCanceled, "canceled task was last runnable task of job, canceling job")
}
// Deal with the special case when the job is in pause-requested status.
toBePaused, err := sm.isJobPausingComplete(ctx, job)
if err != nil {
return err
}
if toBePaused {
return sm.jobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task cancellation")
}
return nil
}
// updateJobOnTaskStatusPaused checks if all tasks are paused, and if so, pauses the entire job.
func (sm *StateMachine) updateJobOnTaskStatusPaused(ctx context.Context, job *persistence.Job) error {
toBePaused, err := sm.isJobPausingComplete(ctx, job)
if err != nil {
return err
}
if !toBePaused {
return nil
}
return sm.jobStatusChange(ctx, job, api.JobStatusPaused, "last task got paused")
}
// updateJobOnTaskStatusFailed conditionally escalates the failure of a task to fail the entire job.
func (sm *StateMachine) updateJobOnTaskStatusFailed(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error {
// Count the number of failed tasks. If it is over the threshold, fail the job.
numFailed, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusFailed)
if err != nil {
return err
}
failedPercentage := int(float64(numFailed) / float64(numTotal) * 100)
failLogger := logger.With().
Int("taskNumTotal", numTotal).
Int("taskNumFailed", numFailed).
Int("failedPercentage", failedPercentage).
Int("threshold", taskFailJobPercentage).
Logger()
if failedPercentage >= taskFailJobPercentage {
failLogger.Info().Msg("failing job because too many of its tasks failed")
return sm.jobStatusChange(ctx, job, api.JobStatusFailed, "too many tasks failed")
}
// If the job didn't fail, this failure indicates that at least the job is active.
failLogger.Info().Msg("task failed, but not enough to fail the job")
// Deal with the special case when the job is in pause-requested status.
toBePaused, err := sm.isJobPausingComplete(ctx, job)
if err != nil {
return err
}
if toBePaused {
return sm.jobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task failure")
}
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive,
"task failed, but not enough to fail the job")
}
// updateJobOnTaskStatusCompleted conditionally escalates the completion of a task to complete the entire job.
func (sm *StateMachine) updateJobOnTaskStatusCompleted(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error {
numComplete, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted)
if err != nil {
return err
}
if numComplete == numTotal {
logger.Info().Msg("all tasks of job are completed, job is completed")
return sm.jobStatusChange(ctx, job, api.JobStatusCompleted, "all tasks completed")
}
// Deal with the special case when the job is in pause-requested status.
toBePaused, err := sm.isJobPausingComplete(ctx, job)
if err != nil {
return err
}
if toBePaused {
return sm.jobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task completion")
}
logger.Info().
Int("taskNumTotal", numTotal).
Int("taskNumComplete", numComplete).
Msg("task completed; there are more tasks to do")
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive, "no more tasks to do")
}
// JobStatusChange gives a Job a new status, and handles the resulting status changes on its tasks.
func (sm *StateMachine) JobStatusChange(
ctx context.Context,
jobUUID string,
newJobStatus api.JobStatus,
reason string,
) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
job, err := sm.persist.FetchJob(ctx, jobUUID)
if err != nil {
return err
}
return sm.jobStatusChange(ctx, job, newJobStatus, reason)
}
// jobStatusChange gives a Job a new status, and handles the resulting status changes on its tasks.
//
// This is the private implementation, which takes the job as an argument. The
// public function (above) takes a job's UUID instead, so that it's easier to call.
func (sm *StateMachine) jobStatusChange(
ctx context.Context,
job *persistence.Job,
newJobStatus api.JobStatus,
reason string,
) error {
// Job status changes can trigger task status changes, which can trigger the
// next job status change. Keep looping over these job status changes until
// there is no more change left to do.
var err error
for newJobStatus != "" && newJobStatus != job.Status {
oldJobStatus := job.Status
job.Activity = fmt.Sprintf("Changed to status %q: %s", newJobStatus, reason)
logger := log.With().
Str("job", job.UUID).
Str("jobStatusOld", string(oldJobStatus)).
Str("jobStatusNew", string(newJobStatus)).
Str("reason", reason).
Logger()
logger.Info().Msg("job status changed")
newJobStatus, err = sm.jobStatusSet(ctx, job, newJobStatus, reason, logger)
if err != nil {
return err
}
}
return nil
}
// jobStatusReenforce acts as if the job just transitioned to its current
// status, and performs another round of task status updates. This is normally
// not necessary, but can be used when normal job/task status updates got
// interrupted somehow.
func (sm *StateMachine) jobStatusReenforce(
ctx context.Context,
job *persistence.Job,
reason string,
) error {
// Job status changes can trigger task status changes, which can trigger the
// next job status change. Keep looping over these job status changes until
// there is no more change left to do.
var err error
newJobStatus := job.Status
for {
oldJobStatus := job.Status
job.Activity = fmt.Sprintf("Reenforcing status %q: %s", newJobStatus, reason)
logger := log.With().
Str("job", job.UUID).
Str("reason", reason).
Logger()
if newJobStatus == job.Status {
logger := logger.With().
Str("jobStatus", string(job.Status)).
Logger()
logger.Info().Msg("job status reenforced")
} else {
logger := logger.With().
Str("jobStatusOld", string(oldJobStatus)).
Str("jobStatusNew", string(newJobStatus)).
Logger()
logger.Info().Msg("job status changed")
}
newJobStatus, err = sm.jobStatusSet(ctx, job, newJobStatus, reason, logger)
if err != nil {
return err
}
if newJobStatus == "" || newJobStatus == oldJobStatus {
// Do this check at the end of the loop, and not the start, so that at
// least one iteration is run.
break
}
}
return nil
}
// jobStatusSet saves the job with the new status and handles updates to tasks
// as well. If the task status change should trigger another job status change,
// the new job status is returned.
func (sm *StateMachine) jobStatusSet(ctx context.Context,
job *persistence.Job,
newJobStatus api.JobStatus,
reason string,
logger zerolog.Logger,
) (api.JobStatus, error) {
oldJobStatus := job.Status
job.Status = newJobStatus
// Persist the new job status.
err := sm.persist.SaveJobStatus(ctx, job)
if err != nil {
return "", fmt.Errorf("saving job status change %q to %q to database: %w",
oldJobStatus, newJobStatus, err)
}
// Handle the status change.
result, err := sm.updateTasksAfterjobStatusChange(ctx, logger, job, oldJobStatus)
if err != nil {
return "", fmt.Errorf("updating job's tasks after job status change: %w", err)
}
// Broadcast this change to the SocketIO clients.
jobUpdate := eventbus.NewJobUpdate(job)
jobUpdate.PreviousStatus = &oldJobStatus
jobUpdate.RefreshTasks = result.massTaskUpdate
sm.broadcaster.BroadcastJobUpdate(jobUpdate)
return result.followingJobStatus, nil
}
// tasksUpdateResult is returned by `updateTasksAfterjobStatusChange`.
type tasksUpdateResult struct {
// FollowingJobStatus is set when the task updates should trigger another job status update.
followingJobStatus api.JobStatus
// massTaskUpdate is true when multiple/all tasks were updated simultaneously.
// This hasn't triggered individual task updates to SocketIO clients, and thus
// the resulting SocketIO job update should indicate all tasks must be
// reloaded.
massTaskUpdate bool
}
// updateTasksAfterjobStatusChange updates the status of its tasks based on the
// new status of this job.
//
// NOTE: this function assumes that the job already has its new status.
//
// Returns the new state the job should go into after this change, or an empty
// string if there is no subsequent change necessary.
func (sm *StateMachine) updateTasksAfterjobStatusChange(
ctx context.Context,
logger zerolog.Logger,
job *persistence.Job,
oldJobStatus api.JobStatus,
) (tasksUpdateResult, error) {
// Every case in this switch MUST return, for sanity sake.
switch job.Status {
case api.JobStatusCompleted, api.JobStatusCanceled, api.JobStatusPaused:
// Nothing to do; this will happen as a response to all tasks receiving this status.
return tasksUpdateResult{}, nil
case api.JobStatusActive:
// Nothing to do; this happens when a task gets started, which has nothing to
// do with other tasks in the job.
return tasksUpdateResult{}, nil
case api.JobStatusCancelRequested, api.JobStatusFailed:
jobStatus, err := sm.cancelTasks(ctx, logger, job)
return tasksUpdateResult{
followingJobStatus: jobStatus,
massTaskUpdate: true,
}, err
case api.JobStatusPauseRequested:
jobStatus, err := sm.pauseTasks(ctx, logger, job)
return tasksUpdateResult{
followingJobStatus: jobStatus,
massTaskUpdate: true,
}, err
case api.JobStatusRequeueing:
jobStatus, err := sm.requeueTasks(ctx, logger, job, oldJobStatus)
return tasksUpdateResult{
followingJobStatus: jobStatus,
massTaskUpdate: true,
}, err
case api.JobStatusQueued:
jobStatus, err := sm.checkTaskCompletion(ctx, logger, job)
return tasksUpdateResult{
followingJobStatus: jobStatus,
massTaskUpdate: true,
}, err
default:
logger.Warn().Msg("unknown job status change, ignoring")
return tasksUpdateResult{}, nil
}
}
// Directly cancel any task that might run in the future.
//
// Returns the next job status, if a status change is required.
func (sm *StateMachine) cancelTasks(
ctx context.Context, logger zerolog.Logger, job *persistence.Job,
) (api.JobStatus, error) {
logger.Info().Msg("cancelling tasks of job")
// Any task that is running or might run in the future should get cancelled.
taskStatusesToCancel := []api.TaskStatus{
api.TaskStatusActive,
api.TaskStatusQueued,
api.TaskStatusSoftFailed,
}
err := sm.persist.UpdateJobsTaskStatusesConditional(
ctx, job, taskStatusesToCancel, api.TaskStatusCanceled,
fmt.Sprintf("Manager cancelled this task because the job got status %q.", job.Status),
)
if err != nil {
return "", fmt.Errorf("cancelling tasks of job %s: %w", job.UUID, err)
}
// If cancellation was requested, it has now happened, so the job can transition.
if job.Status == api.JobStatusCancelRequested {
logger.Info().Msg("all tasks of job cancelled, job can go to 'cancelled' status")
return api.JobStatusCanceled, nil
}
// This could mean cancellation was triggered by failure of the job, in which
// case the job is already in the correct status.
return "", nil
}
func (sm *StateMachine) pauseTasks(
ctx context.Context, logger zerolog.Logger, job *persistence.Job,
) (api.JobStatus, error) {
logger.Info().Msg("pausing tasks of job")
// Any task that might run in the future should get paused.
// Active tasks should remain active until finished.
taskStatusesToPause := []api.TaskStatus{
api.TaskStatusQueued,
api.TaskStatusSoftFailed,
}
err := sm.persist.UpdateJobsTaskStatusesConditional(
ctx, job, taskStatusesToPause, api.TaskStatusPaused,
fmt.Sprintf("Manager paused this task because the job got status %q.", job.Status),
)
if err != nil {
return "", fmt.Errorf("pausing tasks of job %s: %w", job.UUID, err)
}
// If pausing was requested, it has now happened, so the job can transition.
toBePaused, err := sm.isJobPausingComplete(ctx, job)
if err != nil {
return "", err
}
if toBePaused {
logger.Info().Msg("all tasks of job paused, job can go to 'paused' status")
return api.JobStatusPaused, nil
}
return api.JobStatusPauseRequested, nil
}
// requeueTasks re-queues all tasks of the job.
//
// This function assumes that the current job status is "requeueing".
//
// Returns the new job status, if this status transition should be followed by
// another one.
func (sm *StateMachine) requeueTasks(
ctx context.Context, logger zerolog.Logger, job *persistence.Job, oldJobStatus api.JobStatus,
) (api.JobStatus, error) {
if job.Status != api.JobStatusRequeueing {
logger.Warn().Msg("unexpected job status in StateMachine::requeueTasks()")
}
var err error
switch oldJobStatus {
case api.JobStatusUnderConstruction:
// Nothing to do, the job compiler has just finished its work; the tasks have
// already been set to 'queued' status.
logger.Debug().Msg("ignoring job status change")
return "", nil
case api.JobStatusCompleted:
// Re-queue all tasks.
err = sm.persist.UpdateJobsTaskStatuses(ctx, job, api.TaskStatusQueued,
fmt.Sprintf("Queued because job transitioned status from %q to %q", oldJobStatus, job.Status))
default:
statusesToUpdate := []api.TaskStatus{
api.TaskStatusCanceled,
api.TaskStatusFailed,
api.TaskStatusPaused,
api.TaskStatusSoftFailed,
}
// Re-queue only the non-completed tasks.
err = sm.persist.UpdateJobsTaskStatusesConditional(ctx, job,
statusesToUpdate, api.TaskStatusQueued,
fmt.Sprintf("Queued because job transitioned status from %q to %q", oldJobStatus, job.Status))
}
if err != nil {
return "", fmt.Errorf("queueing tasks of job %s: %w", job.UUID, err)
}
// TODO: also reset the 'failed by workers' blocklist.
// The appropriate tasks have been requeued, so now the job can go from "requeueing" to "queued".
return api.JobStatusQueued, nil
}
// checkTaskCompletion returns "completed" as next job status when all tasks of
// the job are completed.
//
// Returns the new job status, if this status transition should be followed by
// another one.
func (sm *StateMachine) checkTaskCompletion(
ctx context.Context, logger zerolog.Logger, job *persistence.Job,
) (api.JobStatus, error) {
numCompleted, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted)
if err != nil {
return "", fmt.Errorf("checking task completion of job %s: %w", job.UUID, err)
}
if numCompleted < numTotal {
logger.Debug().
Int("numTasksCompleted", numCompleted).
Int("numTasksTotal", numTotal).
Msg("not all tasks of job are completed")
return "", nil
}
logger.Info().Msg("job has all tasks completed, transition job to 'completed'")
return api.JobStatusCompleted, nil
}
// CheckStuck finds jobs that are 'stuck' in their current status. This is meant
// to run at startup of Flamenco Manager, and checks to see if there are any
// jobs in a status that a human will not be able to fix otherwise.
func (sm *StateMachine) CheckStuck(ctx context.Context) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
stuckJobs, err := sm.persist.FetchJobsInStatus(ctx,
api.JobStatusCancelRequested,
api.JobStatusRequeueing,
api.JobStatusPauseRequested,
)
if err != nil {
log.Error().Err(err).Msg("unable to fetch stuck jobs")
return
}
for _, job := range stuckJobs {
err := sm.jobStatusReenforce(ctx, job, "checking stuck jobs")
if err != nil {
log.Error().Str("job", job.UUID).Err(err).Msg("error getting job un-stuck")
}
}
}