Manager: move TaskUpdate API function from jobs.go to workers.go

The OpenAPI spec tags this operation as `workers`, so it should be in
`workers.go`.

No functional changes.
This commit is contained in:
Sybren A. Stüvel 2022-05-19 14:20:02 +02:00
parent 12d5b2a1fc
commit 43f244ecab
4 changed files with 154 additions and 158 deletions

View File

@ -3,7 +3,6 @@ package api_impl
// SPDX-License-Identifier: GPL-3.0-or-later // SPDX-License-Identifier: GPL-3.0-or-later
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
@ -12,9 +11,7 @@ import (
"git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/manager/webupdates" "git.blender.org/flamenco/internal/manager/webupdates"
"git.blender.org/flamenco/pkg/api" "git.blender.org/flamenco/pkg/api"
"github.com/google/uuid"
"github.com/labstack/echo/v4" "github.com/labstack/echo/v4"
"github.com/rs/zerolog"
) )
func (f *Flamenco) GetJobTypes(e echo.Context) error { func (f *Flamenco) GetJobTypes(e echo.Context) error {
@ -178,99 +175,6 @@ func (f *Flamenco) SetTaskStatus(e echo.Context, taskID string) error {
return e.NoContent(http.StatusNoContent) return e.NoContent(http.StatusNoContent)
} }
func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error {
logger := requestLogger(e)
worker := requestWorkerOrPanic(e)
if _, err := uuid.Parse(taskID); err != nil {
logger.Debug().Msg("invalid task ID received")
return sendAPIError(e, http.StatusBadRequest, "task ID not valid")
}
logger = logger.With().Str("taskID", taskID).Logger()
// Fetch the task, to see if this worker is even allowed to send us updates.
ctx := e.Request().Context()
dbTask, err := f.persist.FetchTask(ctx, taskID)
if err != nil {
logger.Warn().Err(err).Msg("cannot fetch task")
if errors.Is(err, persistence.ErrTaskNotFound) {
return sendAPIError(e, http.StatusNotFound, "task %+v not found", taskID)
}
return sendAPIError(e, http.StatusInternalServerError, "error fetching task")
}
if dbTask == nil {
panic("task could not be fetched, but database gave no error either")
}
// Decode the request body.
var taskUpdate api.TaskUpdateJSONRequestBody
if err := e.Bind(&taskUpdate); err != nil {
logger.Warn().Err(err).Msg("bad request received")
return sendAPIError(e, http.StatusBadRequest, "invalid format")
}
if dbTask.WorkerID == nil {
logger.Warn().
Msg("worker trying to update task that's not assigned to any worker")
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID)
}
if *dbTask.WorkerID != worker.ID {
logger.Warn().Msg("worker trying to update task that's assigned to another worker")
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID)
}
// TODO: check whether this task may undergo the requested status change.
if err := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate); err != nil {
return sendAPIError(e, http.StatusInternalServerError, "unable to handle status update: %v", err)
}
return e.NoContent(http.StatusNoContent)
}
func (f *Flamenco) doTaskUpdate(
ctx context.Context,
logger zerolog.Logger,
w *persistence.Worker,
dbTask *persistence.Task,
update api.TaskUpdateJSONRequestBody,
) error {
if dbTask.Job == nil {
logger.Panic().Msg("dbTask.Job is nil, unable to continue")
}
var dbErr error
if update.TaskStatus != nil {
oldTaskStatus := dbTask.Status
err := f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus)
if err != nil {
logger.Error().Err(err).
Str("newTaskStatus", string(*update.TaskStatus)).
Str("oldTaskStatus", string(oldTaskStatus)).
Msg("error changing task status")
dbErr = fmt.Errorf("changing status of task %s to %q: %w",
dbTask.UUID, *update.TaskStatus, err)
}
}
if update.Activity != nil {
dbTask.Activity = *update.Activity
dbErr = f.persist.SaveTaskActivity(ctx, dbTask)
}
if update.Log != nil {
// Errors writing the log to file should be logged in our own logging
// system, but shouldn't abort the render. As such, `err` is not returned to
// the caller.
err := f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log)
if err != nil {
logger.Error().Err(err).Msg("error writing task log")
}
}
return dbErr
}
func jobDBtoAPI(dbJob *persistence.Job) api.Job { func jobDBtoAPI(dbJob *persistence.Job) api.Job {
apiJob := api.Job{ apiJob := api.Job{
SubmittedJob: api.SubmittedJob{ SubmittedJob: api.SubmittedJob{

View File

@ -3,7 +3,6 @@ package api_impl
// SPDX-License-Identifier: GPL-3.0-or-later // SPDX-License-Identifier: GPL-3.0-or-later
import ( import (
"context"
"errors" "errors"
"net/http" "net/http"
"testing" "testing"
@ -80,67 +79,6 @@ func TestSubmitJob(t *testing.T) {
} }
func TestTaskUpdate(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
worker := testWorker()
// Construct the JSON request object.
taskUpdate := api.TaskUpdateJSONRequestBody{
Activity: ptr("testing"),
Log: ptr("line1\nline2\n"),
TaskStatus: ptr(api.TaskStatusFailed),
}
// Construct the task that's supposed to be updated.
taskID := "181eab68-1123-4790-93b1-94309a899411"
jobID := "e4719398-7cfa-4877-9bab-97c2d6c158b5"
mockJob := persistence.Job{UUID: jobID}
mockTask := persistence.Task{
UUID: taskID,
Worker: &worker,
WorkerID: &worker.ID,
Job: &mockJob,
Activity: "pre-update activity",
}
// Expect the task to be fetched.
mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil)
// Expect the task status change to be handed to the state machine.
var statusChangedtask persistence.Task
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), gomock.AssignableToTypeOf(&persistence.Task{}), api.TaskStatusFailed).
DoAndReturn(func(ctx context.Context, task *persistence.Task, newStatus api.TaskStatus) error {
statusChangedtask = *task
return nil
})
// Expect the activity to be updated.
var actUpdatedTask persistence.Task
mf.persistence.EXPECT().SaveTaskActivity(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, task *persistence.Task) error {
actUpdatedTask = *task
return nil
})
// Expect the log to be written.
mf.logStorage.EXPECT().Write(gomock.Any(), jobID, taskID, "line1\nline2\n")
// Do the call.
echoCtx := mf.prepareMockedJSONRequest(taskUpdate)
requestWorkerStore(echoCtx, &worker)
err := mf.flamenco.TaskUpdate(echoCtx, taskID)
// Check the saved task.
assert.NoError(t, err)
assert.Equal(t, mockTask.UUID, statusChangedtask.UUID)
assert.Equal(t, mockTask.UUID, actUpdatedTask.UUID)
assert.Equal(t, "pre-update activity", statusChangedtask.Activity) // the 'save' should come from the change in status.
assert.Equal(t, "testing", actUpdatedTask.Activity) // the activity should be saved separately.
}
func TestGetJobTypeHappy(t *testing.T) { func TestGetJobTypeHappy(t *testing.T) {
mockCtrl := gomock.NewController(t) mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish() defer mockCtrl.Finish()

View File

@ -313,6 +313,99 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error {
return e.JSON(http.StatusOK, customisedTask) return e.JSON(http.StatusOK, customisedTask)
} }
func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error {
logger := requestLogger(e)
worker := requestWorkerOrPanic(e)
if _, err := uuid.Parse(taskID); err != nil {
logger.Debug().Msg("invalid task ID received")
return sendAPIError(e, http.StatusBadRequest, "task ID not valid")
}
logger = logger.With().Str("taskID", taskID).Logger()
// Fetch the task, to see if this worker is even allowed to send us updates.
ctx := e.Request().Context()
dbTask, err := f.persist.FetchTask(ctx, taskID)
if err != nil {
logger.Warn().Err(err).Msg("cannot fetch task")
if errors.Is(err, persistence.ErrTaskNotFound) {
return sendAPIError(e, http.StatusNotFound, "task %+v not found", taskID)
}
return sendAPIError(e, http.StatusInternalServerError, "error fetching task")
}
if dbTask == nil {
panic("task could not be fetched, but database gave no error either")
}
// Decode the request body.
var taskUpdate api.TaskUpdateJSONRequestBody
if err := e.Bind(&taskUpdate); err != nil {
logger.Warn().Err(err).Msg("bad request received")
return sendAPIError(e, http.StatusBadRequest, "invalid format")
}
if dbTask.WorkerID == nil {
logger.Warn().
Msg("worker trying to update task that's not assigned to any worker")
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID)
}
if *dbTask.WorkerID != worker.ID {
logger.Warn().Msg("worker trying to update task that's assigned to another worker")
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID)
}
// TODO: check whether this task may undergo the requested status change.
if err := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate); err != nil {
return sendAPIError(e, http.StatusInternalServerError, "unable to handle status update: %v", err)
}
return e.NoContent(http.StatusNoContent)
}
func (f *Flamenco) doTaskUpdate(
ctx context.Context,
logger zerolog.Logger,
w *persistence.Worker,
dbTask *persistence.Task,
update api.TaskUpdateJSONRequestBody,
) error {
if dbTask.Job == nil {
logger.Panic().Msg("dbTask.Job is nil, unable to continue")
}
var dbErr error
if update.TaskStatus != nil {
oldTaskStatus := dbTask.Status
err := f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus)
if err != nil {
logger.Error().Err(err).
Str("newTaskStatus", string(*update.TaskStatus)).
Str("oldTaskStatus", string(oldTaskStatus)).
Msg("error changing task status")
dbErr = fmt.Errorf("changing status of task %s to %q: %w",
dbTask.UUID, *update.TaskStatus, err)
}
}
if update.Activity != nil {
dbTask.Activity = *update.Activity
dbErr = f.persist.SaveTaskActivity(ctx, dbTask)
}
if update.Log != nil {
// Errors writing the log to file should be logged in our own logging
// system, but shouldn't abort the render. As such, `err` is not returned to
// the caller.
err := f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log)
if err != nil {
logger.Error().Err(err).Msg("error writing task log")
}
}
return dbErr
}
func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error { func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error {
logger := requestLogger(e) logger := requestLogger(e)
worker := requestWorkerOrPanic(e) worker := requestWorkerOrPanic(e)

View File

@ -141,6 +141,67 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) {
assert.Equal(t, http.StatusNoContent, resp.StatusCode) assert.Equal(t, http.StatusNoContent, resp.StatusCode)
} }
func TestTaskUpdate(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
worker := testWorker()
// Construct the JSON request object.
taskUpdate := api.TaskUpdateJSONRequestBody{
Activity: ptr("testing"),
Log: ptr("line1\nline2\n"),
TaskStatus: ptr(api.TaskStatusFailed),
}
// Construct the task that's supposed to be updated.
taskID := "181eab68-1123-4790-93b1-94309a899411"
jobID := "e4719398-7cfa-4877-9bab-97c2d6c158b5"
mockJob := persistence.Job{UUID: jobID}
mockTask := persistence.Task{
UUID: taskID,
Worker: &worker,
WorkerID: &worker.ID,
Job: &mockJob,
Activity: "pre-update activity",
}
// Expect the task to be fetched.
mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil)
// Expect the task status change to be handed to the state machine.
var statusChangedtask persistence.Task
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), gomock.AssignableToTypeOf(&persistence.Task{}), api.TaskStatusFailed).
DoAndReturn(func(ctx context.Context, task *persistence.Task, newStatus api.TaskStatus) error {
statusChangedtask = *task
return nil
})
// Expect the activity to be updated.
var actUpdatedTask persistence.Task
mf.persistence.EXPECT().SaveTaskActivity(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, task *persistence.Task) error {
actUpdatedTask = *task
return nil
})
// Expect the log to be written.
mf.logStorage.EXPECT().Write(gomock.Any(), jobID, taskID, "line1\nline2\n")
// Do the call.
echoCtx := mf.prepareMockedJSONRequest(taskUpdate)
requestWorkerStore(echoCtx, &worker)
err := mf.flamenco.TaskUpdate(echoCtx, taskID)
// Check the saved task.
assert.NoError(t, err)
assert.Equal(t, mockTask.UUID, statusChangedtask.UUID)
assert.Equal(t, mockTask.UUID, actUpdatedTask.UUID)
assert.Equal(t, "pre-update activity", statusChangedtask.Activity) // the 'save' should come from the change in status.
assert.Equal(t, "testing", actUpdatedTask.Activity) // the activity should be saved separately.
}
func TestMayWorkerRun(t *testing.T) { func TestMayWorkerRun(t *testing.T) {
mockCtrl := gomock.NewController(t) mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish() defer mockCtrl.Finish()