diff --git a/CHANGELOG.md b/CHANGELOG.md index 29fe28de..8e00a864 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ bugs in actually-released versions. - The conversion from a known path prefix to a variable is now done in a case-insensitive way. This means that if the variable `{storage}` has value `S:\Flamenco`, a path `s:\flamenco\project\file.blend` will be recognised and stored as `{storage}\project\file.blend`. This happens uniformly, regardless of the platform. So also on Linux, which has a case-sensitive filesystem, this matching is done in a case-insensitive way. It is very unlikely that a Flamenco configuration has two separate variables, for paths that only differ in their case. +- Fix issue where jobs could get stuck in `pause-requested` state. + ## 3.6 - released 2024-12-01 diff --git a/internal/manager/task_state_machine/task_state_machine.go b/internal/manager/task_state_machine/task_state_machine.go index d90d6e64..4eb150af 100644 --- a/internal/manager/task_state_machine/task_state_machine.go +++ b/internal/manager/task_state_machine/task_state_machine.go @@ -41,19 +41,33 @@ func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster, // TaskStatusChange updates the task's status to the new one. // `task` is expected to still have its original status, and have a filled `Job` pointer. +// This is the external API endpoint, which ensures the state machine is locked. func (sm *StateMachine) TaskStatusChange( ctx context.Context, task *persistence.Task, newTaskStatus api.TaskStatus, ) error { + + sm.mutex.Lock() + defer sm.mutex.Unlock() + + return sm.taskStatusChange(ctx, task, newTaskStatus) +} + +// taskStatusChange updates the task's status to the new one. +// `task` is expected to still have its original status, and have a filled `Job` pointer. +// This is the internal version fo TaskStatusChange(), which assumes the state +// machine is already locked. +func (sm *StateMachine) taskStatusChange( + ctx context.Context, + task *persistence.Task, + newTaskStatus api.TaskStatus, +) error { if task.JobID == 0 { log.Panic().Str("task", task.UUID).Msg("task without job ID, cannot handle this") return nil // Will not run because of the panic. } - sm.mutex.Lock() - defer sm.mutex.Unlock() - job, err := sm.persist.FetchJobByID(ctx, task.JobID) if err != nil { return fmt.Errorf("cannot fetch the job of task %s: %w", task.UUID, err) @@ -137,8 +151,7 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange( return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusCompleted, api.JobStatusRequeueing, "task was queued") case api.TaskStatusPaused: - // Pausing a task has no impact on the job. - return nil + return sm.updateJobOnTaskStatusPaused(ctx, job) case api.TaskStatusCanceled: return sm.updateJobOnTaskStatusCanceled(ctx, logger, job) @@ -222,6 +235,19 @@ func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logge return nil } +// updateJobOnTaskStatusPaused checks if all tasks are paused, and if so, pauses the entire job. +func (sm *StateMachine) updateJobOnTaskStatusPaused(ctx context.Context, job *persistence.Job) error { + toBePaused, err := sm.isJobPausingComplete(ctx, job) + if err != nil { + return err + } + if !toBePaused { + return nil + } + + return sm.jobStatusChange(ctx, job, api.JobStatusPaused, "last task got paused") +} + // updateJobOnTaskStatusFailed conditionally escalates the failure of a task to fail the entire job. func (sm *StateMachine) updateJobOnTaskStatusFailed(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error { // Count the number of failed tasks. If it is over the threshold, fail the job. diff --git a/internal/manager/task_state_machine/worker_requeue.go b/internal/manager/task_state_machine/worker_requeue.go index 81392290..1a80c713 100644 --- a/internal/manager/task_state_machine/worker_requeue.go +++ b/internal/manager/task_state_machine/worker_requeue.go @@ -4,6 +4,7 @@ package task_state_machine import ( "context" + "fmt" "github.com/rs/zerolog/log" "projects.blender.org/studio/flamenco/internal/manager/persistence" @@ -18,6 +19,9 @@ func (sm *StateMachine) RequeueActiveTasksOfWorker( worker *persistence.Worker, reason string, ) error { + sm.mutex.Lock() + defer sm.mutex.Unlock() + // Fetch the tasks to update. tasksJobs, err := sm.persist.FetchTasksOfWorkerInStatus( ctx, worker, api.TaskStatusActive) @@ -28,7 +32,7 @@ func (sm *StateMachine) RequeueActiveTasksOfWorker( // Run each task change through the task state machine. var lastErr error for _, taskJobWorker := range tasksJobs { - lastErr = sm.requeueTaskOfWorker(ctx, &taskJobWorker.Task, taskJobWorker.JobUUID, worker, reason) + lastErr = sm.returnTaskOfWorker(ctx, &taskJobWorker.Task, taskJobWorker.JobUUID, worker, reason) } return lastErr @@ -43,6 +47,9 @@ func (sm *StateMachine) RequeueFailedTasksOfWorkerOfJob( jobUUID string, reason string, ) error { + sm.mutex.Lock() + defer sm.mutex.Unlock() + // Fetch the tasks to update. tasks, err := sm.persist.FetchTasksOfWorkerInStatusOfJob( ctx, worker, api.TaskStatusFailed, jobUUID) @@ -53,13 +60,15 @@ func (sm *StateMachine) RequeueFailedTasksOfWorkerOfJob( // Run each task change through the task state machine. var lastErr error for _, task := range tasks { - lastErr = sm.requeueTaskOfWorker(ctx, task, jobUUID, worker, reason) + lastErr = sm.returnTaskOfWorker(ctx, task, jobUUID, worker, reason) } return lastErr } -func (sm *StateMachine) requeueTaskOfWorker( +// returnTaskOfWorker returns the task to the task pool. +// This either re-queues the task for execution, or pauses it, depending on the current job status. +func (sm *StateMachine) returnTaskOfWorker( ctx context.Context, task *persistence.Task, jobUUID string, @@ -71,12 +80,27 @@ func (sm *StateMachine) requeueTaskOfWorker( Str("reason", reason). Logger() + job, err := sm.persist.FetchJob(ctx, jobUUID) + if err != nil { + return fmt.Errorf("could not requeue task of worker %q: %w", worker.UUID, err) + } + + // Depending on the job's status, a Worker returning its task to the pool should make it go to 'queued' or 'paused'. + var targetTaskStatus api.TaskStatus + switch job.Status { + case api.JobStatusPauseRequested, api.JobStatusPaused: + targetTaskStatus = api.TaskStatusPaused + default: + targetTaskStatus = api.TaskStatusQueued + } + logger.Info(). Str("task", task.UUID). - Msg("re-queueing task") + Str("newTaskStatus", string(targetTaskStatus)). + Msg("returning task to pool") // Write to task activity that it got requeued because of worker sign-off. - task.Activity = "Task was requeued by Manager because " + reason + task.Activity = fmt.Sprintf("Task was %s by Manager because %s", targetTaskStatus, reason) if err := sm.persist.SaveTaskActivity(ctx, task); err != nil { logger.Warn().Err(err). Str("task", task.UUID). @@ -85,12 +109,11 @@ func (sm *StateMachine) requeueTaskOfWorker( Msg("error saving task activity to database") } - err := sm.TaskStatusChange(ctx, task, api.TaskStatusQueued) - if err != nil { + if err := sm.taskStatusChange(ctx, task, targetTaskStatus); err != nil { logger.Warn().Err(err). Str("task", task.UUID). Str("reason", reason). - Msg("error queueing task") + Msg("error returning task to pool") } // The error is already logged by the log storage. diff --git a/internal/manager/task_state_machine/worker_requeue_test.go b/internal/manager/task_state_machine/worker_requeue_test.go index 02b16b04..5b03edd3 100644 --- a/internal/manager/task_state_machine/worker_requeue_test.go +++ b/internal/manager/task_state_machine/worker_requeue_test.go @@ -32,11 +32,12 @@ func TestRequeueActiveTasksOfWorker(t *testing.T) { task1PrevStatus := task1.Status task2PrevStatus := task2.Status + mocks.persist.EXPECT().FetchJob(ctx, job.UUID).Return(job, nil).Times(len(workerTasks)) mocks.persist.EXPECT().FetchTasksOfWorkerInStatus(ctx, &worker, api.TaskStatusActive).Return(workerTasks, nil) // Expect this re-queueing to end up in the task's log and activity. logMsg1 := "task changed status active -> queued" - logMsg2 := "Task was requeued by Manager because worker had to test" + logMsg2 := "Task was queued by Manager because worker had to test" task1WithActivity := *task1 task1WithActivity.Activity = logMsg2 task2WithActivity := *task2 @@ -82,3 +83,73 @@ func TestRequeueActiveTasksOfWorker(t *testing.T) { err := sm.RequeueActiveTasksOfWorker(ctx, &worker, "worker had to test") require.NoError(t, err) } + +// Check that a Worker's task goes to 'paused' when the worker signs off and the +// job is in 'pause-requested' state. This should also ensure that, if this +// happens to be the last-active task of the job, that the entire job goes to +// `paused` state. +func TestPauseActiveTasksOfWorker(t *testing.T) { + mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t) + defer mockCtrl.Finish() + + worker := persistence.Worker{ + UUID: "3ed470c8-d41e-4668-92d0-d799997433a4", + Name: "testert", + } + + // Mock that the job is in pause-requested, with one paused task and one active task. + task1, job := taskWithStatus(api.JobStatusPauseRequested, api.TaskStatusActive) + taskOfSameJob(task1, api.TaskStatusPaused) // Create a 2nd, already-paused task. + + jobPrevStatus := job.Status + task1PrevStatus := task1.Status + + mocks.persist.EXPECT().FetchJob(ctx, job.UUID).Return(job, nil) + mocks.persist.EXPECT().FetchTasksOfWorkerInStatus(ctx, &worker, api.TaskStatusActive). + Return([]persistence.TaskJob{{Task: *task1, JobUUID: job.UUID}}, nil) + + mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive).Return(0, 2, nil) // 0 of 2 active. + + // Expect this re-queueing to end up in the task's log and activity. + logMsg1 := "task changed status active -> paused" + logMsg2 := "Task was paused by Manager because worker had to test" + task1WithActivity := *task1 + task1WithActivity.Activity = logMsg2 + task1WithActivityAndStatus := task1WithActivity + task1WithActivityAndStatus.Status = api.TaskStatusPaused + mocks.persist.EXPECT().SaveTaskActivity(ctx, &task1WithActivity) + mocks.persist.EXPECT().SaveTaskStatus(ctx, &task1WithActivityAndStatus) + + jobWithStatus := *job + jobWithStatus.Status = api.JobStatusPaused + jobWithStatus.Activity = "Changed to status \"paused\": last task got paused" + mocks.persist.EXPECT().SaveJobStatus(ctx, &jobWithStatus) + + mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task1.UUID, logMsg1) + mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task1.UUID, logMsg2) + + mocks.broadcaster.EXPECT().BroadcastTaskUpdate(api.EventTaskUpdate{ + Activity: logMsg2, + Id: task1.UUID, + JobId: job.UUID, + Name: task1.Name, + PreviousStatus: &task1PrevStatus, + Status: api.TaskStatusPaused, + Updated: task1.UpdatedAt.Time, + }) + mocks.broadcaster.EXPECT().BroadcastJobUpdate(api.EventJobUpdate{ + Id: job.UUID, + Name: &job.Name, + PreviousStatus: &jobPrevStatus, + Priority: int(job.Priority), + RefreshTasks: false, + Status: api.JobStatusPaused, + Type: job.JobType, + Updated: job.UpdatedAt.Time, + }) + + mocks.expectFetchJobOfTask(task1, job) + + err := sm.RequeueActiveTasksOfWorker(ctx, &worker, "worker had to test") + require.NoError(t, err) +}