diff --git a/internal/manager/api_impl/worker_task_updates.go b/internal/manager/api_impl/worker_task_updates.go new file mode 100644 index 00000000..12f1be58 --- /dev/null +++ b/internal/manager/api_impl/worker_task_updates.go @@ -0,0 +1,203 @@ +package api_impl + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "errors" + "fmt" + "net/http" + + "github.com/gertd/go-pluralize" + "github.com/labstack/echo/v4" + "github.com/rs/zerolog" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/internal/uuid" + "git.blender.org/flamenco/pkg/api" +) + +func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error { + logger := requestLogger(e) + worker := requestWorkerOrPanic(e) + + if !uuid.IsValid(taskID) { + logger.Debug().Msg("invalid task ID received") + return sendAPIError(e, http.StatusBadRequest, "task ID not valid") + } + logger = logger.With().Str("taskID", taskID).Logger() + + // Fetch the task, to see if this worker is even allowed to send us updates. + ctx := e.Request().Context() + dbTask, err := f.persist.FetchTask(ctx, taskID) + if err != nil { + logger.Warn().Err(err).Msg("cannot fetch task") + if errors.Is(err, persistence.ErrTaskNotFound) { + return sendAPIError(e, http.StatusNotFound, "task %+v not found", taskID) + } + return sendAPIError(e, http.StatusInternalServerError, "error fetching task") + } + if dbTask == nil { + panic("task could not be fetched, but database gave no error either") + } + + // Decode the request body. + var taskUpdate api.TaskUpdate + if err := e.Bind(&taskUpdate); err != nil { + logger.Warn().Err(err).Msg("bad request received") + return sendAPIError(e, http.StatusBadRequest, "invalid format") + } + if dbTask.WorkerID == nil { + logger.Warn(). + Msg("worker trying to update task that's not assigned to any worker") + return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID) + } + if *dbTask.WorkerID != worker.ID { + logger.Warn().Msg("worker trying to update task that's assigned to another worker") + return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID) + } + + // Status 'soft-failed' should never be sent by a Worker. Distinguishing + // between soft and hard failures is up to the Manager. + // Workers should always just send 'failed' instead. + if taskUpdate.TaskStatus != nil && *taskUpdate.TaskStatus == api.TaskStatusSoftFailed { + logger.Warn().Str("status", string(*taskUpdate.TaskStatus)). + Msg("worker trying to update task to not-allowed status") + return sendAPIError(e, http.StatusBadRequest, + "task status %s not allowed to be sent by Worker", *taskUpdate.TaskStatus) + } + + taskUpdateErr := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate) + workerUpdateErr := f.workerPingedTask(ctx, logger, dbTask) + workerSeenErr := f.workerSeen(ctx, logger, worker) + + if taskUpdateErr != nil { + return sendAPIError(e, http.StatusInternalServerError, "unable to handle task update: %v", taskUpdateErr) + } + if workerUpdateErr != nil { + return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker update: %v", workerUpdateErr) + } + if workerSeenErr != nil { + return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker 'last seen' update: %v", workerSeenErr) + } + + return e.NoContent(http.StatusNoContent) +} + +// doTaskUpdate actually updates the task and its log file. +func (f *Flamenco) doTaskUpdate( + ctx context.Context, + logger zerolog.Logger, + w *persistence.Worker, + dbTask *persistence.Task, + update api.TaskUpdate, +) error { + if dbTask.Job == nil { + logger.Panic().Msg("dbTask.Job is nil, unable to continue") + } + + var dbErrActivity error + + if update.Activity != nil { + dbTask.Activity = *update.Activity + // The state machine will also save the task, including its activity, but + // relying on that here would create strong cohesion. + dbErrActivity = f.persist.SaveTaskActivity(ctx, dbTask) + } + + // Write the log first, because that's likely to contain the cause of the task + // state change. Any subsequent task logs, for example generated by the + // Manager in response to a status change, should be logged after that. + if update.Log != nil { + // Errors writing the log to disk are already logged by logStorage, and can be safely ignored here. + _ = f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log) + } + + if update.TaskStatus == nil { + return dbErrActivity + } + + oldTaskStatus := dbTask.Status + var err error + if *update.TaskStatus == api.TaskStatusFailed { + // Failure is more complex than just going to the failed state. + err = f.onTaskFailed(ctx, logger, w, dbTask, update) + } else { + // Just go to the given state. + err = f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus) + } + + if err != nil { + logger.Error().Err(err). + Str("newTaskStatus", string(*update.TaskStatus)). + Str("oldTaskStatus", string(oldTaskStatus)). + Msg("error changing task status") + return fmt.Errorf("changing status of task %s to %q: %w", + dbTask.UUID, *update.TaskStatus, err) + } + + return nil +} + +// onTaskFailed decides whether a task is soft- or hard-failed. Note that this +// means that the task may NOT go to the status mentioned in the `update` +// parameter, but go to `soft-failed` instead. +func (f *Flamenco) onTaskFailed( + ctx context.Context, + logger zerolog.Logger, + worker *persistence.Worker, + task *persistence.Task, + update api.TaskUpdate, +) error { + // Sanity check. + if update.TaskStatus == nil || *update.TaskStatus != api.TaskStatusFailed { + panic("onTaskFailed should only be called with a task update that indicates task failure") + } + + // Bookkeeping of failure. + numFailed, err := f.persist.AddWorkerToTaskFailedList(ctx, task, worker) + if err != nil { + return fmt.Errorf("adding worker to failure list of task: %w", err) + } + // f.maybeBlocklistWorker(ctx, logger, worker, task) + + // Determine whether this is soft or hard failure. + threshold := f.config.Get().TaskFailAfterSoftFailCount + logger = logger.With(). + Int("failedByWorkerCount", numFailed). + Int("threshold", threshold). + Logger() + + var ( + newStatus api.TaskStatus + localLog, taskLog string + ) + pluralizer := pluralize.NewClient() + if numFailed >= threshold { + newStatus = api.TaskStatusFailed + + localLog = "too many workers failed this task, hard-failing it" + taskLog = fmt.Sprintf( + "Task failed by %s, Manager will mark it as hard failure", + pluralizer.Pluralize("worker", numFailed, true), + ) + } else { + newStatus = api.TaskStatusSoftFailed + + localLog = "worker failed this task, soft-failing to give another worker a try" + failsToThreshold := threshold - numFailed + taskLog = fmt.Sprintf( + "Task failed by %s, Manager will mark it as soft failure. %d more %s will cause hard failure.", + pluralizer.Pluralize("worker", numFailed, true), + failsToThreshold, + pluralizer.Pluralize("failure", failsToThreshold, false), + ) + } + + if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil { + logger.Error().Err(err).Msg("error writing failure notice to task log") + } + + logger.Info().Str("newTaskStatus", string(newStatus)).Msg(localLog) + return f.stateMachine.TaskStatusChange(ctx, task, newStatus) +} diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index 23fefe75..3b8cb20c 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -10,7 +10,6 @@ import ( "strings" "time" - "github.com/gertd/go-pluralize" "github.com/labstack/echo/v4" "github.com/rs/zerolog" "golang.org/x/crypto/bcrypt" @@ -342,190 +341,6 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error { return e.JSON(http.StatusOK, customisedTask) } -func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error { - logger := requestLogger(e) - worker := requestWorkerOrPanic(e) - - if !uuid.IsValid(taskID) { - logger.Debug().Msg("invalid task ID received") - return sendAPIError(e, http.StatusBadRequest, "task ID not valid") - } - logger = logger.With().Str("taskID", taskID).Logger() - - // Fetch the task, to see if this worker is even allowed to send us updates. - ctx := e.Request().Context() - dbTask, err := f.persist.FetchTask(ctx, taskID) - if err != nil { - logger.Warn().Err(err).Msg("cannot fetch task") - if errors.Is(err, persistence.ErrTaskNotFound) { - return sendAPIError(e, http.StatusNotFound, "task %+v not found", taskID) - } - return sendAPIError(e, http.StatusInternalServerError, "error fetching task") - } - if dbTask == nil { - panic("task could not be fetched, but database gave no error either") - } - - // Decode the request body. - var taskUpdate api.TaskUpdate - if err := e.Bind(&taskUpdate); err != nil { - logger.Warn().Err(err).Msg("bad request received") - return sendAPIError(e, http.StatusBadRequest, "invalid format") - } - if dbTask.WorkerID == nil { - logger.Warn(). - Msg("worker trying to update task that's not assigned to any worker") - return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID) - } - if *dbTask.WorkerID != worker.ID { - logger.Warn().Msg("worker trying to update task that's assigned to another worker") - return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID) - } - - // Status 'soft-failed' should never be sent by a Worker. Distinguishing - // between soft and hard failures is up to the Manager. - // Workers should always just send 'failed' instead. - if taskUpdate.TaskStatus != nil && *taskUpdate.TaskStatus == api.TaskStatusSoftFailed { - logger.Warn().Str("status", string(*taskUpdate.TaskStatus)). - Msg("worker trying to update task to not-allowed status") - return sendAPIError(e, http.StatusBadRequest, - "task status %s not allowed to be sent by Worker", *taskUpdate.TaskStatus) - } - - taskUpdateErr := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate) - workerUpdateErr := f.workerPingedTask(ctx, logger, dbTask) - workerSeenErr := f.workerSeen(ctx, logger, worker) - - if taskUpdateErr != nil { - return sendAPIError(e, http.StatusInternalServerError, "unable to handle task update: %v", taskUpdateErr) - } - if workerUpdateErr != nil { - return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker update: %v", workerUpdateErr) - } - if workerSeenErr != nil { - return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker 'last seen' update: %v", workerSeenErr) - } - - return e.NoContent(http.StatusNoContent) -} - -// doTaskUpdate actually updates the task and its log file. -func (f *Flamenco) doTaskUpdate( - ctx context.Context, - logger zerolog.Logger, - w *persistence.Worker, - dbTask *persistence.Task, - update api.TaskUpdate, -) error { - if dbTask.Job == nil { - logger.Panic().Msg("dbTask.Job is nil, unable to continue") - } - - var dbErrActivity error - - if update.Activity != nil { - dbTask.Activity = *update.Activity - // The state machine will also save the task, including its activity, but - // relying on that here would create strong cohesion. - dbErrActivity = f.persist.SaveTaskActivity(ctx, dbTask) - } - - // Write the log first, because that's likely to contain the cause of the task - // state change. Any subsequent task logs, for example generated by the - // Manager in response to a status change, should be logged after that. - if update.Log != nil { - // Errors writing the log to disk are already logged by logStorage, and can be safely ignored here. - _ = f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log) - } - - if update.TaskStatus == nil { - return dbErrActivity - } - - oldTaskStatus := dbTask.Status - var err error - if *update.TaskStatus == api.TaskStatusFailed { - // Failure is more complex than just going to the failed state. - err = f.onTaskFailed(ctx, logger, w, dbTask, update) - } else { - // Just go to the given state. - err = f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus) - } - - if err != nil { - logger.Error().Err(err). - Str("newTaskStatus", string(*update.TaskStatus)). - Str("oldTaskStatus", string(oldTaskStatus)). - Msg("error changing task status") - return fmt.Errorf("changing status of task %s to %q: %w", - dbTask.UUID, *update.TaskStatus, err) - } - - return nil -} - -// onTaskFailed decides whether a task is soft- or hard-failed. Note that this -// means that the task may NOT go to the status mentioned in the `update` -// parameter, but go to `soft-failed` instead. -func (f *Flamenco) onTaskFailed( - ctx context.Context, - logger zerolog.Logger, - worker *persistence.Worker, - task *persistence.Task, - update api.TaskUpdate, -) error { - // Sanity check. - if update.TaskStatus == nil || *update.TaskStatus != api.TaskStatusFailed { - panic("onTaskFailed should only be called with a task update that indicates task failure") - } - - // Bookkeeping of failure. - numFailed, err := f.persist.AddWorkerToTaskFailedList(ctx, task, worker) - if err != nil { - return fmt.Errorf("adding worker to failure list of task: %w", err) - } - // f.maybeBlocklistWorker(ctx, w, dbTask) - - threshold := f.config.Get().TaskFailAfterSoftFailCount - logger = logger.With(). - Int("failedByWorkerCount", numFailed). - Int("threshold", threshold). - Logger() - - var ( - newStatus api.TaskStatus - localLog, taskLog string - ) - pluralizer := pluralize.NewClient() - if numFailed >= threshold { - newStatus = api.TaskStatusFailed - - localLog = "too many workers failed this task, hard-failing it" - taskLog = fmt.Sprintf( - "Task failed by %s, Manager will mark it as hard failure", - pluralizer.Pluralize("worker", numFailed, true), - ) - } else { - newStatus = api.TaskStatusSoftFailed - - localLog = "worker failed this task, soft-failing to give another worker a try" - failsToThreshold := threshold - numFailed - taskLog = fmt.Sprintf( - "Task failed by %s, Manager will mark it as soft failure. %d more %s will cause hard failure.", - pluralizer.Pluralize("worker", numFailed, true), - failsToThreshold, - pluralizer.Pluralize("failure", failsToThreshold, false), - ) - } - - if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil { - logger.Error().Err(err).Msg("error writing failure notice to task log") - } - - logger.Info().Str("newTaskStatus", string(newStatus)).Msg(localLog) - return f.stateMachine.TaskStatusChange(ctx, task, newStatus) -} - func (f *Flamenco) workerPingedTask( ctx context.Context, logger zerolog.Logger,