Manager: Support pausing jobs

A job first goes to `pause-requested` status, during which any `active` task
gets a chance to be completed. Once there are no more active tasks, the job
goes to `paused` state (or `failed`, if that is applicable).

Pull request: https://projects.blender.org/studio/flamenco/pulls/104313
This commit is contained in:
David Zhang 2024-07-01 10:50:54 -04:00
parent 1330487078
commit aac55e7e3c
5 changed files with 197 additions and 1 deletions

View File

@ -166,6 +166,18 @@ func (sm *StateMachine) jobStatusIfAThenB(
return sm.JobStatusChange(ctx, job, thenStatus, reason) return sm.JobStatusChange(ctx, job, thenStatus, reason)
} }
// isJobPausingComplete returns true when the job status is pause-requested and there are no more active tasks.
func (sm *StateMachine) isJobPausingComplete(ctx context.Context, job *persistence.Job) (bool, error) {
if job.Status != api.JobStatusPauseRequested {
return false, nil
}
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
if err != nil {
return false, err
}
return numActive == 0, nil
}
// updateJobOnTaskStatusCanceled conditionally escalates the cancellation of a task to cancel the job. // updateJobOnTaskStatusCanceled conditionally escalates the cancellation of a task to cancel the job.
func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error { func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error {
// If no more tasks can run, cancel the job. // If no more tasks can run, cancel the job.
@ -180,6 +192,15 @@ func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logge
return sm.JobStatusChange(ctx, job, api.JobStatusCanceled, "canceled task was last runnable task of job, canceling job") return sm.JobStatusChange(ctx, job, api.JobStatusCanceled, "canceled task was last runnable task of job, canceling job")
} }
// Deal with the special case when the job is in pause-requested status.
toBePaused, err := sm.isJobPausingComplete(ctx, job)
if err != nil {
return err
}
if toBePaused {
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task cancellation")
}
return nil return nil
} }
@ -204,6 +225,16 @@ func (sm *StateMachine) updateJobOnTaskStatusFailed(ctx context.Context, logger
} }
// If the job didn't fail, this failure indicates that at least the job is active. // If the job didn't fail, this failure indicates that at least the job is active.
failLogger.Info().Msg("task failed, but not enough to fail the job") failLogger.Info().Msg("task failed, but not enough to fail the job")
// Deal with the special case when the job is in pause-requested status.
toBePaused, err := sm.isJobPausingComplete(ctx, job)
if err != nil {
return err
}
if toBePaused {
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task failure")
}
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive, return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive,
"task failed, but not enough to fail the job") "task failed, but not enough to fail the job")
} }
@ -218,6 +249,16 @@ func (sm *StateMachine) updateJobOnTaskStatusCompleted(ctx context.Context, logg
logger.Info().Msg("all tasks of job are completed, job is completed") logger.Info().Msg("all tasks of job are completed, job is completed")
return sm.JobStatusChange(ctx, job, api.JobStatusCompleted, "all tasks completed") return sm.JobStatusChange(ctx, job, api.JobStatusCompleted, "all tasks completed")
} }
// Deal with the special case when the job is in pause-requested status.
toBePaused, err := sm.isJobPausingComplete(ctx, job)
if err != nil {
return err
}
if toBePaused {
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task completion")
}
logger.Info(). logger.Info().
Int("taskNumTotal", numTotal). Int("taskNumTotal", numTotal).
Int("taskNumComplete", numComplete). Int("taskNumComplete", numComplete).
@ -369,7 +410,7 @@ func (sm *StateMachine) updateTasksAfterJobStatusChange(
// Every case in this switch MUST return, for sanity sake. // Every case in this switch MUST return, for sanity sake.
switch job.Status { switch job.Status {
case api.JobStatusCompleted, api.JobStatusCanceled: case api.JobStatusCompleted, api.JobStatusCanceled, api.JobStatusPaused:
// Nothing to do; this will happen as a response to all tasks receiving this status. // Nothing to do; this will happen as a response to all tasks receiving this status.
return tasksUpdateResult{}, nil return tasksUpdateResult{}, nil
@ -385,6 +426,13 @@ func (sm *StateMachine) updateTasksAfterJobStatusChange(
massTaskUpdate: true, massTaskUpdate: true,
}, err }, err
case api.JobStatusPauseRequested:
jobStatus, err := sm.pauseTasks(ctx, logger, job)
return tasksUpdateResult{
followingJobStatus: jobStatus,
massTaskUpdate: true,
}, err
case api.JobStatusRequeueing: case api.JobStatusRequeueing:
jobStatus, err := sm.requeueTasks(ctx, logger, job, oldJobStatus) jobStatus, err := sm.requeueTasks(ctx, logger, job, oldJobStatus)
return tasksUpdateResult{ return tasksUpdateResult{
@ -438,6 +486,38 @@ func (sm *StateMachine) cancelTasks(
return "", nil return "", nil
} }
func (sm *StateMachine) pauseTasks(
ctx context.Context, logger zerolog.Logger, job *persistence.Job,
) (api.JobStatus, error) {
logger.Info().Msg("pausing tasks of job")
// Any task that might run in the future should get paused.
// Active tasks should remain active until finished.
taskStatusesToPause := []api.TaskStatus{
api.TaskStatusQueued,
api.TaskStatusSoftFailed,
}
err := sm.persist.UpdateJobsTaskStatusesConditional(
ctx, job, taskStatusesToPause, api.TaskStatusPaused,
fmt.Sprintf("Manager paused this task because the job got status %q.", job.Status),
)
if err != nil {
return "", fmt.Errorf("pausing tasks of job %s: %w", job.UUID, err)
}
// If pausing was requested, it has now happened, so the job can transition.
toBePaused, err := sm.isJobPausingComplete(ctx, job)
if err != nil {
return "", err
}
if toBePaused {
logger.Info().Msg("all tasks of job paused, job can go to 'paused' status")
return api.JobStatusPaused, nil
}
return api.JobStatusPauseRequested, nil
}
// requeueTasks re-queues all tasks of the job. // requeueTasks re-queues all tasks of the job.
// //
// This function assumes that the current job status is "requeueing". // This function assumes that the current job status is "requeueing".

View File

@ -336,6 +336,94 @@ func TestJobCancelWithSomeCompletedTasks(t *testing.T) {
require.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusCancelRequested, "someone wrote a unittest")) require.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusCancelRequested, "someone wrote a unittest"))
} }
func TestJobPauseWithAllQueuedTasks(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
task1 := taskWithStatus(api.JobStatusQueued, api.TaskStatusQueued)
task2 := taskOfSameJob(task1, api.TaskStatusQueued)
task3 := taskOfSameJob(task2, api.TaskStatusQueued)
job := task3.Job
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPauseRequested)
// Expect pausing of the job to trigger pausing of all its queued tasks.
mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
[]api.TaskStatus{
api.TaskStatusQueued,
api.TaskStatusSoftFailed,
},
api.TaskStatusPaused,
"Manager paused this task because the job got status \"pause-requested\".",
)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive).
Return(0, 3, nil)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPaused)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusQueued, api.JobStatusPauseRequested)
mocks.expectBroadcastJobChange(job, api.JobStatusPauseRequested, api.JobStatusPaused)
require.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusPauseRequested, "someone wrote a unittest"))
}
func TestJobPauseWithSomeCompletedTasks(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
task1 := taskWithStatus(api.JobStatusQueued, api.TaskStatusCompleted)
task2 := taskOfSameJob(task1, api.TaskStatusQueued)
task3 := taskOfSameJob(task2, api.TaskStatusQueued)
job := task3.Job
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPauseRequested)
// Expect pausing of the job to trigger pausing of all its queued tasks.
mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
[]api.TaskStatus{
api.TaskStatusQueued,
api.TaskStatusSoftFailed,
},
api.TaskStatusPaused,
"Manager paused this task because the job got status \"pause-requested\".",
)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive).
Return(0, 3, nil)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPaused)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusQueued, api.JobStatusPauseRequested)
mocks.expectBroadcastJobChange(job, api.JobStatusPauseRequested, api.JobStatusPaused)
require.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusPauseRequested, "someone wrote a unittest"))
}
func TestJobPauseWithSomeActiveTasks(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusActive)
task2 := taskOfSameJob(task1, api.TaskStatusCompleted)
task3 := taskOfSameJob(task2, api.TaskStatusQueued)
job := task3.Job
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPauseRequested)
// Expect pausing of the job to trigger pausing of all its queued tasks.
mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
[]api.TaskStatus{
api.TaskStatusQueued,
api.TaskStatusSoftFailed,
},
api.TaskStatusPaused,
"Manager paused this task because the job got status \"pause-requested\".",
)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive).
Return(1, 3, nil)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusActive, api.JobStatusPauseRequested)
require.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusPauseRequested, "someone wrote a unittest"))
}
func TestCheckStuck(t *testing.T) { func TestCheckStuck(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t) mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish() defer mockCtrl.Finish()

View File

@ -8,6 +8,9 @@
<button class="btn delete dangerous" v-on:click="onButtonDeleteConfirmed">Delete</button> <button class="btn delete dangerous" v-on:click="onButtonDeleteConfirmed">Delete</button>
</div> </div>
</div> </div>
<button class="btn pause" :disabled="!jobs.canPause" v-on:click="onButtonPause">
Pause Job
</button>
<button class="btn cancel" :disabled="!jobs.canCancel" v-on:click="onButtonCancel"> <button class="btn cancel" :disabled="!jobs.canCancel" v-on:click="onButtonCancel">
Cancel Job Cancel Job
</button> </button>
@ -69,6 +72,9 @@ export default {
onButtonRequeue() { onButtonRequeue() {
return this._handleJobActionPromise(this.jobs.requeueJobs(), 'requeueing'); return this._handleJobActionPromise(this.jobs.requeueJobs(), 'requeueing');
}, },
onButtonPause() {
return this._handleJobActionPromise(this.jobs.pauseJobs(), 'marked for pausing');
},
_handleJobActionPromise(promise, description) { _handleJobActionPromise(promise, description) {
return promise.then(() => { return promise.then(() => {

View File

@ -33,6 +33,9 @@ export const useJobs = defineStore('jobs', {
canRequeue() { canRequeue() {
return this._anyJobWithStatus(['canceled', 'completed', 'failed', 'paused']); return this._anyJobWithStatus(['canceled', 'completed', 'failed', 'paused']);
}, },
canPause() {
return this._anyJobWithStatus(['active', 'queued', 'canceled']);
},
}, },
actions: { actions: {
setIsJobless(isJobless) { setIsJobless(isJobless) {
@ -74,6 +77,9 @@ export const useJobs = defineStore('jobs', {
cancelJobs() { cancelJobs() {
return this._setJobStatus('cancel-requested'); return this._setJobStatus('cancel-requested');
}, },
pauseJobs() {
return this._setJobStatus('pause-requested');
},
requeueJobs() { requeueJobs() {
return this._setJobStatus('requeueing'); return this._setJobStatus('requeueing');
}, },

View File

@ -2,6 +2,7 @@ import { defineStore } from 'pinia';
import * as API from '@/manager-api'; import * as API from '@/manager-api';
import { getAPIClient } from '@/api-client'; import { getAPIClient } from '@/api-client';
import { useJobs } from '@/stores/jobs';
const jobsAPI = new API.JobsApi(getAPIClient()); const jobsAPI = new API.JobsApi(getAPIClient());
@ -19,6 +20,21 @@ export const useTasks = defineStore('tasks', {
}), }),
getters: { getters: {
canCancel() { canCancel() {
const jobs = useJobs();
const activeJob = jobs.activeJob;
if (!activeJob) {
console.warn('no active job, unable to determine whether the active task is cancellable');
return false;
}
if (activeJob.status == 'pause-requested') {
// Cancelling a task should not be possible while the job is being paused.
// In the future this might be supported, see issue #104315.
return false;
}
// Allow cancellation for specified task statuses.
return this._anyTaskWithStatus(['queued', 'active', 'soft-failed']); return this._anyTaskWithStatus(['queued', 'active', 'soft-failed']);
}, },
canRequeue() { canRequeue() {