diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index e5cbbd2b..0b466b3c 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -3,7 +3,6 @@ package api_impl // SPDX-License-Identifier: GPL-3.0-or-later import ( - "context" "errors" "fmt" "net/http" @@ -12,9 +11,7 @@ import ( "git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/internal/manager/webupdates" "git.blender.org/flamenco/pkg/api" - "github.com/google/uuid" "github.com/labstack/echo/v4" - "github.com/rs/zerolog" ) func (f *Flamenco) GetJobTypes(e echo.Context) error { @@ -178,99 +175,6 @@ func (f *Flamenco) SetTaskStatus(e echo.Context, taskID string) error { return e.NoContent(http.StatusNoContent) } -func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error { - logger := requestLogger(e) - worker := requestWorkerOrPanic(e) - - if _, err := uuid.Parse(taskID); err != nil { - logger.Debug().Msg("invalid task ID received") - return sendAPIError(e, http.StatusBadRequest, "task ID not valid") - } - logger = logger.With().Str("taskID", taskID).Logger() - - // Fetch the task, to see if this worker is even allowed to send us updates. - ctx := e.Request().Context() - dbTask, err := f.persist.FetchTask(ctx, taskID) - if err != nil { - logger.Warn().Err(err).Msg("cannot fetch task") - if errors.Is(err, persistence.ErrTaskNotFound) { - return sendAPIError(e, http.StatusNotFound, "task %+v not found", taskID) - } - return sendAPIError(e, http.StatusInternalServerError, "error fetching task") - } - if dbTask == nil { - panic("task could not be fetched, but database gave no error either") - } - - // Decode the request body. - var taskUpdate api.TaskUpdateJSONRequestBody - if err := e.Bind(&taskUpdate); err != nil { - logger.Warn().Err(err).Msg("bad request received") - return sendAPIError(e, http.StatusBadRequest, "invalid format") - } - if dbTask.WorkerID == nil { - logger.Warn(). - Msg("worker trying to update task that's not assigned to any worker") - return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID) - } - if *dbTask.WorkerID != worker.ID { - logger.Warn().Msg("worker trying to update task that's assigned to another worker") - return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID) - } - - // TODO: check whether this task may undergo the requested status change. - - if err := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate); err != nil { - return sendAPIError(e, http.StatusInternalServerError, "unable to handle status update: %v", err) - } - - return e.NoContent(http.StatusNoContent) -} - -func (f *Flamenco) doTaskUpdate( - ctx context.Context, - logger zerolog.Logger, - w *persistence.Worker, - dbTask *persistence.Task, - update api.TaskUpdateJSONRequestBody, -) error { - if dbTask.Job == nil { - logger.Panic().Msg("dbTask.Job is nil, unable to continue") - } - - var dbErr error - - if update.TaskStatus != nil { - oldTaskStatus := dbTask.Status - err := f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus) - if err != nil { - logger.Error().Err(err). - Str("newTaskStatus", string(*update.TaskStatus)). - Str("oldTaskStatus", string(oldTaskStatus)). - Msg("error changing task status") - dbErr = fmt.Errorf("changing status of task %s to %q: %w", - dbTask.UUID, *update.TaskStatus, err) - } - } - - if update.Activity != nil { - dbTask.Activity = *update.Activity - dbErr = f.persist.SaveTaskActivity(ctx, dbTask) - } - - if update.Log != nil { - // Errors writing the log to file should be logged in our own logging - // system, but shouldn't abort the render. As such, `err` is not returned to - // the caller. - err := f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log) - if err != nil { - logger.Error().Err(err).Msg("error writing task log") - } - } - - return dbErr -} - func jobDBtoAPI(dbJob *persistence.Job) api.Job { apiJob := api.Job{ SubmittedJob: api.SubmittedJob{ diff --git a/internal/manager/api_impl/jobs_test.go b/internal/manager/api_impl/jobs_test.go index 78ce8de1..4d5e622d 100644 --- a/internal/manager/api_impl/jobs_test.go +++ b/internal/manager/api_impl/jobs_test.go @@ -3,7 +3,6 @@ package api_impl // SPDX-License-Identifier: GPL-3.0-or-later import ( - "context" "errors" "net/http" "testing" @@ -80,67 +79,6 @@ func TestSubmitJob(t *testing.T) { } -func TestTaskUpdate(t *testing.T) { - mockCtrl := gomock.NewController(t) - defer mockCtrl.Finish() - - mf := newMockedFlamenco(mockCtrl) - worker := testWorker() - - // Construct the JSON request object. - taskUpdate := api.TaskUpdateJSONRequestBody{ - Activity: ptr("testing"), - Log: ptr("line1\nline2\n"), - TaskStatus: ptr(api.TaskStatusFailed), - } - - // Construct the task that's supposed to be updated. - taskID := "181eab68-1123-4790-93b1-94309a899411" - jobID := "e4719398-7cfa-4877-9bab-97c2d6c158b5" - mockJob := persistence.Job{UUID: jobID} - mockTask := persistence.Task{ - UUID: taskID, - Worker: &worker, - WorkerID: &worker.ID, - Job: &mockJob, - Activity: "pre-update activity", - } - - // Expect the task to be fetched. - mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil) - - // Expect the task status change to be handed to the state machine. - var statusChangedtask persistence.Task - mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), gomock.AssignableToTypeOf(&persistence.Task{}), api.TaskStatusFailed). - DoAndReturn(func(ctx context.Context, task *persistence.Task, newStatus api.TaskStatus) error { - statusChangedtask = *task - return nil - }) - - // Expect the activity to be updated. - var actUpdatedTask persistence.Task - mf.persistence.EXPECT().SaveTaskActivity(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, task *persistence.Task) error { - actUpdatedTask = *task - return nil - }) - - // Expect the log to be written. - mf.logStorage.EXPECT().Write(gomock.Any(), jobID, taskID, "line1\nline2\n") - - // Do the call. - echoCtx := mf.prepareMockedJSONRequest(taskUpdate) - requestWorkerStore(echoCtx, &worker) - err := mf.flamenco.TaskUpdate(echoCtx, taskID) - - // Check the saved task. - assert.NoError(t, err) - assert.Equal(t, mockTask.UUID, statusChangedtask.UUID) - assert.Equal(t, mockTask.UUID, actUpdatedTask.UUID) - assert.Equal(t, "pre-update activity", statusChangedtask.Activity) // the 'save' should come from the change in status. - assert.Equal(t, "testing", actUpdatedTask.Activity) // the activity should be saved separately. -} - func TestGetJobTypeHappy(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index 340390d8..f5623db6 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -313,6 +313,99 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error { return e.JSON(http.StatusOK, customisedTask) } +func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error { + logger := requestLogger(e) + worker := requestWorkerOrPanic(e) + + if _, err := uuid.Parse(taskID); err != nil { + logger.Debug().Msg("invalid task ID received") + return sendAPIError(e, http.StatusBadRequest, "task ID not valid") + } + logger = logger.With().Str("taskID", taskID).Logger() + + // Fetch the task, to see if this worker is even allowed to send us updates. + ctx := e.Request().Context() + dbTask, err := f.persist.FetchTask(ctx, taskID) + if err != nil { + logger.Warn().Err(err).Msg("cannot fetch task") + if errors.Is(err, persistence.ErrTaskNotFound) { + return sendAPIError(e, http.StatusNotFound, "task %+v not found", taskID) + } + return sendAPIError(e, http.StatusInternalServerError, "error fetching task") + } + if dbTask == nil { + panic("task could not be fetched, but database gave no error either") + } + + // Decode the request body. + var taskUpdate api.TaskUpdateJSONRequestBody + if err := e.Bind(&taskUpdate); err != nil { + logger.Warn().Err(err).Msg("bad request received") + return sendAPIError(e, http.StatusBadRequest, "invalid format") + } + if dbTask.WorkerID == nil { + logger.Warn(). + Msg("worker trying to update task that's not assigned to any worker") + return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID) + } + if *dbTask.WorkerID != worker.ID { + logger.Warn().Msg("worker trying to update task that's assigned to another worker") + return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID) + } + + // TODO: check whether this task may undergo the requested status change. + + if err := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate); err != nil { + return sendAPIError(e, http.StatusInternalServerError, "unable to handle status update: %v", err) + } + + return e.NoContent(http.StatusNoContent) +} + +func (f *Flamenco) doTaskUpdate( + ctx context.Context, + logger zerolog.Logger, + w *persistence.Worker, + dbTask *persistence.Task, + update api.TaskUpdateJSONRequestBody, +) error { + if dbTask.Job == nil { + logger.Panic().Msg("dbTask.Job is nil, unable to continue") + } + + var dbErr error + + if update.TaskStatus != nil { + oldTaskStatus := dbTask.Status + err := f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus) + if err != nil { + logger.Error().Err(err). + Str("newTaskStatus", string(*update.TaskStatus)). + Str("oldTaskStatus", string(oldTaskStatus)). + Msg("error changing task status") + dbErr = fmt.Errorf("changing status of task %s to %q: %w", + dbTask.UUID, *update.TaskStatus, err) + } + } + + if update.Activity != nil { + dbTask.Activity = *update.Activity + dbErr = f.persist.SaveTaskActivity(ctx, dbTask) + } + + if update.Log != nil { + // Errors writing the log to file should be logged in our own logging + // system, but shouldn't abort the render. As such, `err` is not returned to + // the caller. + err := f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log) + if err != nil { + logger.Error().Err(err).Msg("error writing task log") + } + } + + return dbErr +} + func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error { logger := requestLogger(e) worker := requestWorkerOrPanic(e) diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index 7d2edba0..234fd28a 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -141,6 +141,67 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) { assert.Equal(t, http.StatusNoContent, resp.StatusCode) } +func TestTaskUpdate(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + worker := testWorker() + + // Construct the JSON request object. + taskUpdate := api.TaskUpdateJSONRequestBody{ + Activity: ptr("testing"), + Log: ptr("line1\nline2\n"), + TaskStatus: ptr(api.TaskStatusFailed), + } + + // Construct the task that's supposed to be updated. + taskID := "181eab68-1123-4790-93b1-94309a899411" + jobID := "e4719398-7cfa-4877-9bab-97c2d6c158b5" + mockJob := persistence.Job{UUID: jobID} + mockTask := persistence.Task{ + UUID: taskID, + Worker: &worker, + WorkerID: &worker.ID, + Job: &mockJob, + Activity: "pre-update activity", + } + + // Expect the task to be fetched. + mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil) + + // Expect the task status change to be handed to the state machine. + var statusChangedtask persistence.Task + mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), gomock.AssignableToTypeOf(&persistence.Task{}), api.TaskStatusFailed). + DoAndReturn(func(ctx context.Context, task *persistence.Task, newStatus api.TaskStatus) error { + statusChangedtask = *task + return nil + }) + + // Expect the activity to be updated. + var actUpdatedTask persistence.Task + mf.persistence.EXPECT().SaveTaskActivity(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, task *persistence.Task) error { + actUpdatedTask = *task + return nil + }) + + // Expect the log to be written. + mf.logStorage.EXPECT().Write(gomock.Any(), jobID, taskID, "line1\nline2\n") + + // Do the call. + echoCtx := mf.prepareMockedJSONRequest(taskUpdate) + requestWorkerStore(echoCtx, &worker) + err := mf.flamenco.TaskUpdate(echoCtx, taskID) + + // Check the saved task. + assert.NoError(t, err) + assert.Equal(t, mockTask.UUID, statusChangedtask.UUID) + assert.Equal(t, mockTask.UUID, actUpdatedTask.UUID) + assert.Equal(t, "pre-update activity", statusChangedtask.Activity) // the 'save' should come from the change in status. + assert.Equal(t, "testing", actUpdatedTask.Activity) // the activity should be saved separately. +} + func TestMayWorkerRun(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish()