diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index 2950d5b6..e5cbbd2b 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -133,6 +133,51 @@ func (f *Flamenco) SetJobStatus(e echo.Context, jobID string) error { return e.NoContent(http.StatusNoContent) } +func (f *Flamenco) SetTaskStatus(e echo.Context, taskID string) error { + logger := requestLogger(e) + ctx := e.Request().Context() + + logger = logger.With().Str("task", taskID).Logger() + + var statusChange api.SetTaskStatusJSONRequestBody + if err := e.Bind(&statusChange); err != nil { + logger.Warn().Err(err).Msg("bad request received") + return sendAPIError(e, http.StatusBadRequest, "invalid format") + } + + dbTask, err := f.persist.FetchTask(ctx, taskID) + if err != nil { + if errors.Is(err, persistence.ErrTaskNotFound) { + return sendAPIError(e, http.StatusNotFound, "no such task") + } + logger.Error().Err(err).Msg("error fetching task") + return sendAPIError(e, http.StatusInternalServerError, "error fetching task") + } + + logger = logger.With(). + Str("currentstatus", string(dbTask.Status)). + Str("requestedStatus", string(statusChange.Status)). + Str("reason", statusChange.Reason). + Logger() + logger.Info().Msg("task status change requested") + + // Store the reason for the status change in the task's Activity. + dbTask.Activity = statusChange.Reason + err = f.persist.SaveTaskActivity(ctx, dbTask) + if err != nil { + logger.Error().Err(err).Msg("error saving reason of task status change to its activity field") + return sendAPIError(e, http.StatusInternalServerError, "unexpected error changing task status") + } + + // Perform the actual status change. + err = f.stateMachine.TaskStatusChange(ctx, dbTask, statusChange.Status) + if err != nil { + logger.Error().Err(err).Msg("error changing task status") + return sendAPIError(e, http.StatusInternalServerError, "unexpected error changing task status") + } + return e.NoContent(http.StatusNoContent) +} + func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error { logger := requestLogger(e) worker := requestWorkerOrPanic(e) diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index 2e8217fb..6ef3925c 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -184,6 +184,7 @@ func (f *Flamenco) workerRequeueActiveTasks(ctx context.Context, logger zerolog. Msg("error queueing task on worker sign-off") lastErr = err } + // TODO: write to task activity that it got requeued because of worker sign-off. } return lastErr diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 7b551ad6..8d801bd3 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -257,7 +257,11 @@ func (db *DB) JobHasTasksInStatus(ctx context.Context, job *Job, taskStatus api. return numTasksInStatus > 0, nil } -func (db *DB) CountTasksOfJobInStatus(ctx context.Context, job *Job, taskStatus api.TaskStatus) (numInStatus, numTotal int, err error) { +func (db *DB) CountTasksOfJobInStatus( + ctx context.Context, + job *Job, + taskStatuses ...api.TaskStatus, +) (numInStatus, numTotal int, err error) { type Result struct { Status api.TaskStatus NumTasks int @@ -272,11 +276,18 @@ func (db *DB) CountTasksOfJobInStatus(ctx context.Context, job *Job, taskStatus Scan(&results) if tx.Error != nil { - return 0, 0, jobError(tx.Error, "count tasks of job %s in status %q", job.UUID, taskStatus) + return 0, 0, jobError(tx.Error, "count tasks of job %s in status %q", job.UUID, taskStatuses) } + // Create lookup table for which statuses to count. + countStatus := map[api.TaskStatus]bool{} + for _, status := range taskStatuses { + countStatus[status] = true + } + + // Count the number of tasks per status. for _, result := range results { - if result.Status == taskStatus { + if countStatus[result.Status] { numInStatus += result.NumTasks } numTotal += result.NumTasks diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index b3f634c7..8024f817 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -172,8 +172,8 @@ func TestFetchTasksOfWorkerInStatus(t *testing.T) { assert.Equal(t, task.ID, tasks[0].ID) assert.Equal(t, task.UUID, tasks[0].UUID) - assert.NotEqual(t, api.TaskStatusCancelRequested, task.Status) - tasks, err = db.FetchTasksOfWorkerInStatus(ctx, w, api.TaskStatusCancelRequested) + assert.NotEqual(t, api.TaskStatusCanceled, task.Status) + tasks, err = db.FetchTasksOfWorkerInStatus(ctx, w, api.TaskStatusCanceled) assert.NoError(t, err) assert.Empty(t, tasks, "worker should have no task in status %q", w) } diff --git a/internal/manager/task_state_machine/task_state_machine.go b/internal/manager/task_state_machine/task_state_machine.go index e53d5bef..f596be81 100644 --- a/internal/manager/task_state_machine/task_state_machine.go +++ b/internal/manager/task_state_machine/task_state_machine.go @@ -32,7 +32,7 @@ type PersistenceService interface { SaveJobStatus(ctx context.Context, j *persistence.Job) error JobHasTasksInStatus(ctx context.Context, job *persistence.Job, taskStatus api.TaskStatus) (bool, error) - CountTasksOfJobInStatus(ctx context.Context, job *persistence.Job, taskStatus api.TaskStatus) (numInStatus, numTotal int, err error) + CountTasksOfJobInStatus(ctx context.Context, job *persistence.Job, taskStatuses ...api.TaskStatus) (numInStatus, numTotal int, err error) FetchTasksOfJob(ctx context.Context, job *persistence.Job) ([]*persistence.Task, error) FetchTasksOfJobInStatus(ctx context.Context, job *persistence.Job, taskStatuses ...api.TaskStatus) ([]*persistence.Task, error) @@ -134,10 +134,6 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange( // Re-queueing a task on a completed job should re-queue the job too. return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusCompleted, api.JobStatusRequeued, "task was queued") - case api.TaskStatusCancelRequested: - // Requesting cancellation of a single task has no influence on the job itself. - return nil - case api.TaskStatusPaused: // Pausing a task has no impact on the job. return nil @@ -188,21 +184,18 @@ func (sm *StateMachine) jobStatusIfAThenB( // onTaskStatusCanceled conditionally escalates the cancellation of a task to cancel the job. func (sm *StateMachine) onTaskStatusCanceled(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error { - // Only trigger cancellation/failure of the job if that was actually requested. - // A user can also cancel a single task from the web UI or API, in which - // case the job should just keep running. - if job.Status != api.JobStatusCancelRequested { - return nil - } - // This could be the last 'cancel-requested' task to go to 'canceled'. - hasCancelReq, err := sm.persist.JobHasTasksInStatus(ctx, job, api.TaskStatusCancelRequested) + // If no more tasks can run, cancel the job. + numRunnable, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, + api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed) if err != nil { return err } - if !hasCancelReq { - logger.Info().Msg("last task of job went from cancel-requested to canceled") - return sm.JobStatusChange(ctx, job, api.JobStatusCanceled, "tasks were canceled") + if numRunnable == 0 { + // NOTE: this does NOT cancel any non-runnable (paused/failed) tasks. If that's desired, just cancel the job as a whole. + logger.Info().Msg("canceled task was last runnable task of job, canceling job") + return sm.JobStatusChange(ctx, job, api.JobStatusCanceled, "canceled task was last runnable task of job, canceling job") } + return nil } @@ -396,7 +389,6 @@ func (sm *StateMachine) requeueTasks( default: // Re-queue only the non-completed tasks. tasks, err = sm.persist.FetchTasksOfJobInStatus(ctx, job, - api.TaskStatusCancelRequested, api.TaskStatusCanceled, api.TaskStatusFailed, api.TaskStatusPaused, diff --git a/internal/manager/task_state_machine/task_state_machine_test.go b/internal/manager/task_state_machine/task_state_machine_test.go index c27e149f..e3669f34 100644 --- a/internal/manager/task_state_machine/task_state_machine_test.go +++ b/internal/manager/task_state_machine/task_state_machine_test.go @@ -177,26 +177,53 @@ func TestTaskStatusChangeCancelSingleTask(t *testing.T) { mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t) defer mockCtrl.Finish() - task := taskWithStatus(api.JobStatusCancelRequested, api.TaskStatusCancelRequested) - task2 := taskOfSameJob(task, api.TaskStatusCancelRequested) + task := taskWithStatus(api.JobStatusCancelRequested, api.TaskStatusActive) + task2 := taskOfSameJob(task, api.TaskStatusQueued) job := task.Job - // T1: cancel-requested > cancelled --> J: cancel-requested > cancel-requested + // T1: active > cancelled --> J: cancel-requested > cancel-requested mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusCanceled) - mocks.expectBroadcastTaskChange(task, api.TaskStatusCancelRequested, api.TaskStatusCanceled) - mocks.persist.EXPECT().JobHasTasksInStatus(ctx, job, api.TaskStatusCancelRequested).Return(true, nil) + mocks.expectBroadcastTaskChange(task, api.TaskStatusActive, api.TaskStatusCanceled) + mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, + api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed). + Return(1, 2, nil) assert.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatusCanceled)) - // T2: cancel-requested > cancelled --> J: cancel-requested > canceled + // T2: queued > cancelled --> J: cancel-requested > canceled mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusCanceled) - mocks.expectBroadcastTaskChange(task2, api.TaskStatusCancelRequested, api.TaskStatusCanceled) - mocks.persist.EXPECT().JobHasTasksInStatus(ctx, job, api.TaskStatusCancelRequested).Return(false, nil) + mocks.expectBroadcastTaskChange(task2, api.TaskStatusQueued, api.TaskStatusCanceled) + mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, + api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed). + Return(0, 2, nil) mocks.expectSaveJobWithStatus(t, job, api.JobStatusCanceled) mocks.expectBroadcastJobChange(task.Job, api.JobStatusCancelRequested, api.JobStatusCanceled) assert.NoError(t, sm.TaskStatusChange(ctx, task2, api.TaskStatusCanceled)) } +func TestTaskStatusChangeCancelSingleTaskWithOtherFailed(t *testing.T) { + mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t) + defer mockCtrl.Finish() + + task1 := taskWithStatus(api.JobStatusCancelRequested, api.TaskStatusActive) + task2 := taskOfSameJob(task1, api.TaskStatusFailed) + taskOfSameJob(task2, api.TaskStatusPaused) + job := task1.Job + + // T1: active > cancelled --> J: cancel-requested > canceled because T2 already failed and cannot run anyway. + mocks.expectSaveTaskWithStatus(t, task1, api.TaskStatusCanceled) + mocks.expectBroadcastTaskChange(task1, api.TaskStatusActive, api.TaskStatusCanceled) + mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, + api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed). + Return(0, 3, nil) + mocks.expectSaveJobWithStatus(t, job, api.JobStatusCanceled) + mocks.expectBroadcastJobChange(task1.Job, api.JobStatusCancelRequested, api.JobStatusCanceled) + + // The paused task just stays paused, so don't expectBroadcastTaskChange(task3). + + assert.NoError(t, sm.TaskStatusChange(ctx, task1, api.TaskStatusCanceled)) +} + func TestTaskStatusChangeUnknownStatus(t *testing.T) { mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t) defer mockCtrl.Finish() @@ -224,7 +251,6 @@ func TestJobRequeueWithSomeCompletedTasks(t *testing.T) { // Expect queueing of the job to trigger queueing of all its not-yet-completed tasks. mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).Return(1, 3, nil) mocks.persist.EXPECT().FetchTasksOfJobInStatus(ctx, job, - api.TaskStatusCancelRequested, api.TaskStatusCanceled, api.TaskStatusFailed, api.TaskStatusPaused, diff --git a/web/app/src/components/TaskActionsBar.vue b/web/app/src/components/TaskActionsBar.vue index 27a4cb4b..203c75cf 100644 --- a/web/app/src/components/TaskActionsBar.vue +++ b/web/app/src/components/TaskActionsBar.vue @@ -20,7 +20,7 @@ export default { methods: { onButtonCancel() { return this._handleTaskActionPromise( - this.tasks.cancelTasks(), "marked for cancellation"); + this.tasks.cancelTasks(), "cancelled"); }, onButtonRequeue() { return this._handleTaskActionPromise( diff --git a/web/app/src/stores/tasks.js b/web/app/src/stores/tasks.js index 09b4d727..a432b2fd 100644 --- a/web/app/src/stores/tasks.js +++ b/web/app/src/stores/tasks.js @@ -64,8 +64,8 @@ export const useTasks = defineStore('tasks', { * TODO: actually have these work on all selected tasks. For simplicity, the * code now assumes that only the active task needs to be operated on. */ - cancelTasks() { return this._setTaskStatus("cancel-requested"); }, - requeueTasks() { return this._setTaskStatus("requeued"); }, + cancelTasks() { return this._setTaskStatus("canceled"); }, + requeueTasks() { return this._setTaskStatus("queued"); }, // Internal methods.