Implement task status changes from web interface

This also reworks some of the logic due to the recently-removed
`cancel-requested` task status.
This commit is contained in:
Sybren A. Stüvel 2022-05-05 16:44:09 +02:00
parent 23680c27bf
commit ba34652cd1
8 changed files with 109 additions and 34 deletions

View File

@ -133,6 +133,51 @@ func (f *Flamenco) SetJobStatus(e echo.Context, jobID string) error {
return e.NoContent(http.StatusNoContent)
}
func (f *Flamenco) SetTaskStatus(e echo.Context, taskID string) error {
logger := requestLogger(e)
ctx := e.Request().Context()
logger = logger.With().Str("task", taskID).Logger()
var statusChange api.SetTaskStatusJSONRequestBody
if err := e.Bind(&statusChange); err != nil {
logger.Warn().Err(err).Msg("bad request received")
return sendAPIError(e, http.StatusBadRequest, "invalid format")
}
dbTask, err := f.persist.FetchTask(ctx, taskID)
if err != nil {
if errors.Is(err, persistence.ErrTaskNotFound) {
return sendAPIError(e, http.StatusNotFound, "no such task")
}
logger.Error().Err(err).Msg("error fetching task")
return sendAPIError(e, http.StatusInternalServerError, "error fetching task")
}
logger = logger.With().
Str("currentstatus", string(dbTask.Status)).
Str("requestedStatus", string(statusChange.Status)).
Str("reason", statusChange.Reason).
Logger()
logger.Info().Msg("task status change requested")
// Store the reason for the status change in the task's Activity.
dbTask.Activity = statusChange.Reason
err = f.persist.SaveTaskActivity(ctx, dbTask)
if err != nil {
logger.Error().Err(err).Msg("error saving reason of task status change to its activity field")
return sendAPIError(e, http.StatusInternalServerError, "unexpected error changing task status")
}
// Perform the actual status change.
err = f.stateMachine.TaskStatusChange(ctx, dbTask, statusChange.Status)
if err != nil {
logger.Error().Err(err).Msg("error changing task status")
return sendAPIError(e, http.StatusInternalServerError, "unexpected error changing task status")
}
return e.NoContent(http.StatusNoContent)
}
func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error {
logger := requestLogger(e)
worker := requestWorkerOrPanic(e)

View File

@ -184,6 +184,7 @@ func (f *Flamenco) workerRequeueActiveTasks(ctx context.Context, logger zerolog.
Msg("error queueing task on worker sign-off")
lastErr = err
}
// TODO: write to task activity that it got requeued because of worker sign-off.
}
return lastErr

View File

@ -257,7 +257,11 @@ func (db *DB) JobHasTasksInStatus(ctx context.Context, job *Job, taskStatus api.
return numTasksInStatus > 0, nil
}
func (db *DB) CountTasksOfJobInStatus(ctx context.Context, job *Job, taskStatus api.TaskStatus) (numInStatus, numTotal int, err error) {
func (db *DB) CountTasksOfJobInStatus(
ctx context.Context,
job *Job,
taskStatuses ...api.TaskStatus,
) (numInStatus, numTotal int, err error) {
type Result struct {
Status api.TaskStatus
NumTasks int
@ -272,11 +276,18 @@ func (db *DB) CountTasksOfJobInStatus(ctx context.Context, job *Job, taskStatus
Scan(&results)
if tx.Error != nil {
return 0, 0, jobError(tx.Error, "count tasks of job %s in status %q", job.UUID, taskStatus)
return 0, 0, jobError(tx.Error, "count tasks of job %s in status %q", job.UUID, taskStatuses)
}
// Create lookup table for which statuses to count.
countStatus := map[api.TaskStatus]bool{}
for _, status := range taskStatuses {
countStatus[status] = true
}
// Count the number of tasks per status.
for _, result := range results {
if result.Status == taskStatus {
if countStatus[result.Status] {
numInStatus += result.NumTasks
}
numTotal += result.NumTasks

View File

@ -172,8 +172,8 @@ func TestFetchTasksOfWorkerInStatus(t *testing.T) {
assert.Equal(t, task.ID, tasks[0].ID)
assert.Equal(t, task.UUID, tasks[0].UUID)
assert.NotEqual(t, api.TaskStatusCancelRequested, task.Status)
tasks, err = db.FetchTasksOfWorkerInStatus(ctx, w, api.TaskStatusCancelRequested)
assert.NotEqual(t, api.TaskStatusCanceled, task.Status)
tasks, err = db.FetchTasksOfWorkerInStatus(ctx, w, api.TaskStatusCanceled)
assert.NoError(t, err)
assert.Empty(t, tasks, "worker should have no task in status %q", w)
}

View File

@ -32,7 +32,7 @@ type PersistenceService interface {
SaveJobStatus(ctx context.Context, j *persistence.Job) error
JobHasTasksInStatus(ctx context.Context, job *persistence.Job, taskStatus api.TaskStatus) (bool, error)
CountTasksOfJobInStatus(ctx context.Context, job *persistence.Job, taskStatus api.TaskStatus) (numInStatus, numTotal int, err error)
CountTasksOfJobInStatus(ctx context.Context, job *persistence.Job, taskStatuses ...api.TaskStatus) (numInStatus, numTotal int, err error)
FetchTasksOfJob(ctx context.Context, job *persistence.Job) ([]*persistence.Task, error)
FetchTasksOfJobInStatus(ctx context.Context, job *persistence.Job, taskStatuses ...api.TaskStatus) ([]*persistence.Task, error)
@ -134,10 +134,6 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange(
// Re-queueing a task on a completed job should re-queue the job too.
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusCompleted, api.JobStatusRequeued, "task was queued")
case api.TaskStatusCancelRequested:
// Requesting cancellation of a single task has no influence on the job itself.
return nil
case api.TaskStatusPaused:
// Pausing a task has no impact on the job.
return nil
@ -188,21 +184,18 @@ func (sm *StateMachine) jobStatusIfAThenB(
// onTaskStatusCanceled conditionally escalates the cancellation of a task to cancel the job.
func (sm *StateMachine) onTaskStatusCanceled(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error {
// Only trigger cancellation/failure of the job if that was actually requested.
// A user can also cancel a single task from the web UI or API, in which
// case the job should just keep running.
if job.Status != api.JobStatusCancelRequested {
return nil
}
// This could be the last 'cancel-requested' task to go to 'canceled'.
hasCancelReq, err := sm.persist.JobHasTasksInStatus(ctx, job, api.TaskStatusCancelRequested)
// If no more tasks can run, cancel the job.
numRunnable, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed)
if err != nil {
return err
}
if !hasCancelReq {
logger.Info().Msg("last task of job went from cancel-requested to canceled")
return sm.JobStatusChange(ctx, job, api.JobStatusCanceled, "tasks were canceled")
if numRunnable == 0 {
// NOTE: this does NOT cancel any non-runnable (paused/failed) tasks. If that's desired, just cancel the job as a whole.
logger.Info().Msg("canceled task was last runnable task of job, canceling job")
return sm.JobStatusChange(ctx, job, api.JobStatusCanceled, "canceled task was last runnable task of job, canceling job")
}
return nil
}
@ -396,7 +389,6 @@ func (sm *StateMachine) requeueTasks(
default:
// Re-queue only the non-completed tasks.
tasks, err = sm.persist.FetchTasksOfJobInStatus(ctx, job,
api.TaskStatusCancelRequested,
api.TaskStatusCanceled,
api.TaskStatusFailed,
api.TaskStatusPaused,

View File

@ -177,26 +177,53 @@ func TestTaskStatusChangeCancelSingleTask(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
task := taskWithStatus(api.JobStatusCancelRequested, api.TaskStatusCancelRequested)
task2 := taskOfSameJob(task, api.TaskStatusCancelRequested)
task := taskWithStatus(api.JobStatusCancelRequested, api.TaskStatusActive)
task2 := taskOfSameJob(task, api.TaskStatusQueued)
job := task.Job
// T1: cancel-requested > cancelled --> J: cancel-requested > cancel-requested
// T1: active > cancelled --> J: cancel-requested > cancel-requested
mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusCanceled)
mocks.expectBroadcastTaskChange(task, api.TaskStatusCancelRequested, api.TaskStatusCanceled)
mocks.persist.EXPECT().JobHasTasksInStatus(ctx, job, api.TaskStatusCancelRequested).Return(true, nil)
mocks.expectBroadcastTaskChange(task, api.TaskStatusActive, api.TaskStatusCanceled)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed).
Return(1, 2, nil)
assert.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatusCanceled))
// T2: cancel-requested > cancelled --> J: cancel-requested > canceled
// T2: queued > cancelled --> J: cancel-requested > canceled
mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusCanceled)
mocks.expectBroadcastTaskChange(task2, api.TaskStatusCancelRequested, api.TaskStatusCanceled)
mocks.persist.EXPECT().JobHasTasksInStatus(ctx, job, api.TaskStatusCancelRequested).Return(false, nil)
mocks.expectBroadcastTaskChange(task2, api.TaskStatusQueued, api.TaskStatusCanceled)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed).
Return(0, 2, nil)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusCanceled)
mocks.expectBroadcastJobChange(task.Job, api.JobStatusCancelRequested, api.JobStatusCanceled)
assert.NoError(t, sm.TaskStatusChange(ctx, task2, api.TaskStatusCanceled))
}
func TestTaskStatusChangeCancelSingleTaskWithOtherFailed(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
task1 := taskWithStatus(api.JobStatusCancelRequested, api.TaskStatusActive)
task2 := taskOfSameJob(task1, api.TaskStatusFailed)
taskOfSameJob(task2, api.TaskStatusPaused)
job := task1.Job
// T1: active > cancelled --> J: cancel-requested > canceled because T2 already failed and cannot run anyway.
mocks.expectSaveTaskWithStatus(t, task1, api.TaskStatusCanceled)
mocks.expectBroadcastTaskChange(task1, api.TaskStatusActive, api.TaskStatusCanceled)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed).
Return(0, 3, nil)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusCanceled)
mocks.expectBroadcastJobChange(task1.Job, api.JobStatusCancelRequested, api.JobStatusCanceled)
// The paused task just stays paused, so don't expectBroadcastTaskChange(task3).
assert.NoError(t, sm.TaskStatusChange(ctx, task1, api.TaskStatusCanceled))
}
func TestTaskStatusChangeUnknownStatus(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
@ -224,7 +251,6 @@ func TestJobRequeueWithSomeCompletedTasks(t *testing.T) {
// Expect queueing of the job to trigger queueing of all its not-yet-completed tasks.
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).Return(1, 3, nil)
mocks.persist.EXPECT().FetchTasksOfJobInStatus(ctx, job,
api.TaskStatusCancelRequested,
api.TaskStatusCanceled,
api.TaskStatusFailed,
api.TaskStatusPaused,

View File

@ -20,7 +20,7 @@ export default {
methods: {
onButtonCancel() {
return this._handleTaskActionPromise(
this.tasks.cancelTasks(), "marked for cancellation");
this.tasks.cancelTasks(), "cancelled");
},
onButtonRequeue() {
return this._handleTaskActionPromise(

View File

@ -64,8 +64,8 @@ export const useTasks = defineStore('tasks', {
* TODO: actually have these work on all selected tasks. For simplicity, the
* code now assumes that only the active task needs to be operated on.
*/
cancelTasks() { return this._setTaskStatus("cancel-requested"); },
requeueTasks() { return this._setTaskStatus("requeued"); },
cancelTasks() { return this._setTaskStatus("canceled"); },
requeueTasks() { return this._setTaskStatus("queued"); },
// Internal methods.