Cleanup: Manager, move worker task update handling code into its own file

Move the code related to task updates from workers to
`worker_task_updates.go`. It's going to get more complex with the
blocklisting in there; this prepares for that.

No functional changes.
This commit is contained in:
Sybren A. Stüvel 2022-06-17 11:46:07 +02:00
parent 50e795c595
commit 6feee74c54
2 changed files with 203 additions and 185 deletions

View File

@ -0,0 +1,203 @@
package api_impl
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"errors"
"fmt"
"net/http"
"github.com/gertd/go-pluralize"
"github.com/labstack/echo/v4"
"github.com/rs/zerolog"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/uuid"
"git.blender.org/flamenco/pkg/api"
)
func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error {
logger := requestLogger(e)
worker := requestWorkerOrPanic(e)
if !uuid.IsValid(taskID) {
logger.Debug().Msg("invalid task ID received")
return sendAPIError(e, http.StatusBadRequest, "task ID not valid")
}
logger = logger.With().Str("taskID", taskID).Logger()
// Fetch the task, to see if this worker is even allowed to send us updates.
ctx := e.Request().Context()
dbTask, err := f.persist.FetchTask(ctx, taskID)
if err != nil {
logger.Warn().Err(err).Msg("cannot fetch task")
if errors.Is(err, persistence.ErrTaskNotFound) {
return sendAPIError(e, http.StatusNotFound, "task %+v not found", taskID)
}
return sendAPIError(e, http.StatusInternalServerError, "error fetching task")
}
if dbTask == nil {
panic("task could not be fetched, but database gave no error either")
}
// Decode the request body.
var taskUpdate api.TaskUpdate
if err := e.Bind(&taskUpdate); err != nil {
logger.Warn().Err(err).Msg("bad request received")
return sendAPIError(e, http.StatusBadRequest, "invalid format")
}
if dbTask.WorkerID == nil {
logger.Warn().
Msg("worker trying to update task that's not assigned to any worker")
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID)
}
if *dbTask.WorkerID != worker.ID {
logger.Warn().Msg("worker trying to update task that's assigned to another worker")
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID)
}
// Status 'soft-failed' should never be sent by a Worker. Distinguishing
// between soft and hard failures is up to the Manager.
// Workers should always just send 'failed' instead.
if taskUpdate.TaskStatus != nil && *taskUpdate.TaskStatus == api.TaskStatusSoftFailed {
logger.Warn().Str("status", string(*taskUpdate.TaskStatus)).
Msg("worker trying to update task to not-allowed status")
return sendAPIError(e, http.StatusBadRequest,
"task status %s not allowed to be sent by Worker", *taskUpdate.TaskStatus)
}
taskUpdateErr := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate)
workerUpdateErr := f.workerPingedTask(ctx, logger, dbTask)
workerSeenErr := f.workerSeen(ctx, logger, worker)
if taskUpdateErr != nil {
return sendAPIError(e, http.StatusInternalServerError, "unable to handle task update: %v", taskUpdateErr)
}
if workerUpdateErr != nil {
return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker update: %v", workerUpdateErr)
}
if workerSeenErr != nil {
return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker 'last seen' update: %v", workerSeenErr)
}
return e.NoContent(http.StatusNoContent)
}
// doTaskUpdate actually updates the task and its log file.
func (f *Flamenco) doTaskUpdate(
ctx context.Context,
logger zerolog.Logger,
w *persistence.Worker,
dbTask *persistence.Task,
update api.TaskUpdate,
) error {
if dbTask.Job == nil {
logger.Panic().Msg("dbTask.Job is nil, unable to continue")
}
var dbErrActivity error
if update.Activity != nil {
dbTask.Activity = *update.Activity
// The state machine will also save the task, including its activity, but
// relying on that here would create strong cohesion.
dbErrActivity = f.persist.SaveTaskActivity(ctx, dbTask)
}
// Write the log first, because that's likely to contain the cause of the task
// state change. Any subsequent task logs, for example generated by the
// Manager in response to a status change, should be logged after that.
if update.Log != nil {
// Errors writing the log to disk are already logged by logStorage, and can be safely ignored here.
_ = f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log)
}
if update.TaskStatus == nil {
return dbErrActivity
}
oldTaskStatus := dbTask.Status
var err error
if *update.TaskStatus == api.TaskStatusFailed {
// Failure is more complex than just going to the failed state.
err = f.onTaskFailed(ctx, logger, w, dbTask, update)
} else {
// Just go to the given state.
err = f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus)
}
if err != nil {
logger.Error().Err(err).
Str("newTaskStatus", string(*update.TaskStatus)).
Str("oldTaskStatus", string(oldTaskStatus)).
Msg("error changing task status")
return fmt.Errorf("changing status of task %s to %q: %w",
dbTask.UUID, *update.TaskStatus, err)
}
return nil
}
// onTaskFailed decides whether a task is soft- or hard-failed. Note that this
// means that the task may NOT go to the status mentioned in the `update`
// parameter, but go to `soft-failed` instead.
func (f *Flamenco) onTaskFailed(
ctx context.Context,
logger zerolog.Logger,
worker *persistence.Worker,
task *persistence.Task,
update api.TaskUpdate,
) error {
// Sanity check.
if update.TaskStatus == nil || *update.TaskStatus != api.TaskStatusFailed {
panic("onTaskFailed should only be called with a task update that indicates task failure")
}
// Bookkeeping of failure.
numFailed, err := f.persist.AddWorkerToTaskFailedList(ctx, task, worker)
if err != nil {
return fmt.Errorf("adding worker to failure list of task: %w", err)
}
// f.maybeBlocklistWorker(ctx, logger, worker, task)
// Determine whether this is soft or hard failure.
threshold := f.config.Get().TaskFailAfterSoftFailCount
logger = logger.With().
Int("failedByWorkerCount", numFailed).
Int("threshold", threshold).
Logger()
var (
newStatus api.TaskStatus
localLog, taskLog string
)
pluralizer := pluralize.NewClient()
if numFailed >= threshold {
newStatus = api.TaskStatusFailed
localLog = "too many workers failed this task, hard-failing it"
taskLog = fmt.Sprintf(
"Task failed by %s, Manager will mark it as hard failure",
pluralizer.Pluralize("worker", numFailed, true),
)
} else {
newStatus = api.TaskStatusSoftFailed
localLog = "worker failed this task, soft-failing to give another worker a try"
failsToThreshold := threshold - numFailed
taskLog = fmt.Sprintf(
"Task failed by %s, Manager will mark it as soft failure. %d more %s will cause hard failure.",
pluralizer.Pluralize("worker", numFailed, true),
failsToThreshold,
pluralizer.Pluralize("failure", failsToThreshold, false),
)
}
if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil {
logger.Error().Err(err).Msg("error writing failure notice to task log")
}
logger.Info().Str("newTaskStatus", string(newStatus)).Msg(localLog)
return f.stateMachine.TaskStatusChange(ctx, task, newStatus)
}

View File

@ -10,7 +10,6 @@ import (
"strings"
"time"
"github.com/gertd/go-pluralize"
"github.com/labstack/echo/v4"
"github.com/rs/zerolog"
"golang.org/x/crypto/bcrypt"
@ -342,190 +341,6 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error {
return e.JSON(http.StatusOK, customisedTask)
}
func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error {
logger := requestLogger(e)
worker := requestWorkerOrPanic(e)
if !uuid.IsValid(taskID) {
logger.Debug().Msg("invalid task ID received")
return sendAPIError(e, http.StatusBadRequest, "task ID not valid")
}
logger = logger.With().Str("taskID", taskID).Logger()
// Fetch the task, to see if this worker is even allowed to send us updates.
ctx := e.Request().Context()
dbTask, err := f.persist.FetchTask(ctx, taskID)
if err != nil {
logger.Warn().Err(err).Msg("cannot fetch task")
if errors.Is(err, persistence.ErrTaskNotFound) {
return sendAPIError(e, http.StatusNotFound, "task %+v not found", taskID)
}
return sendAPIError(e, http.StatusInternalServerError, "error fetching task")
}
if dbTask == nil {
panic("task could not be fetched, but database gave no error either")
}
// Decode the request body.
var taskUpdate api.TaskUpdate
if err := e.Bind(&taskUpdate); err != nil {
logger.Warn().Err(err).Msg("bad request received")
return sendAPIError(e, http.StatusBadRequest, "invalid format")
}
if dbTask.WorkerID == nil {
logger.Warn().
Msg("worker trying to update task that's not assigned to any worker")
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID)
}
if *dbTask.WorkerID != worker.ID {
logger.Warn().Msg("worker trying to update task that's assigned to another worker")
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID)
}
// Status 'soft-failed' should never be sent by a Worker. Distinguishing
// between soft and hard failures is up to the Manager.
// Workers should always just send 'failed' instead.
if taskUpdate.TaskStatus != nil && *taskUpdate.TaskStatus == api.TaskStatusSoftFailed {
logger.Warn().Str("status", string(*taskUpdate.TaskStatus)).
Msg("worker trying to update task to not-allowed status")
return sendAPIError(e, http.StatusBadRequest,
"task status %s not allowed to be sent by Worker", *taskUpdate.TaskStatus)
}
taskUpdateErr := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate)
workerUpdateErr := f.workerPingedTask(ctx, logger, dbTask)
workerSeenErr := f.workerSeen(ctx, logger, worker)
if taskUpdateErr != nil {
return sendAPIError(e, http.StatusInternalServerError, "unable to handle task update: %v", taskUpdateErr)
}
if workerUpdateErr != nil {
return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker update: %v", workerUpdateErr)
}
if workerSeenErr != nil {
return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker 'last seen' update: %v", workerSeenErr)
}
return e.NoContent(http.StatusNoContent)
}
// doTaskUpdate actually updates the task and its log file.
func (f *Flamenco) doTaskUpdate(
ctx context.Context,
logger zerolog.Logger,
w *persistence.Worker,
dbTask *persistence.Task,
update api.TaskUpdate,
) error {
if dbTask.Job == nil {
logger.Panic().Msg("dbTask.Job is nil, unable to continue")
}
var dbErrActivity error
if update.Activity != nil {
dbTask.Activity = *update.Activity
// The state machine will also save the task, including its activity, but
// relying on that here would create strong cohesion.
dbErrActivity = f.persist.SaveTaskActivity(ctx, dbTask)
}
// Write the log first, because that's likely to contain the cause of the task
// state change. Any subsequent task logs, for example generated by the
// Manager in response to a status change, should be logged after that.
if update.Log != nil {
// Errors writing the log to disk are already logged by logStorage, and can be safely ignored here.
_ = f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log)
}
if update.TaskStatus == nil {
return dbErrActivity
}
oldTaskStatus := dbTask.Status
var err error
if *update.TaskStatus == api.TaskStatusFailed {
// Failure is more complex than just going to the failed state.
err = f.onTaskFailed(ctx, logger, w, dbTask, update)
} else {
// Just go to the given state.
err = f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus)
}
if err != nil {
logger.Error().Err(err).
Str("newTaskStatus", string(*update.TaskStatus)).
Str("oldTaskStatus", string(oldTaskStatus)).
Msg("error changing task status")
return fmt.Errorf("changing status of task %s to %q: %w",
dbTask.UUID, *update.TaskStatus, err)
}
return nil
}
// onTaskFailed decides whether a task is soft- or hard-failed. Note that this
// means that the task may NOT go to the status mentioned in the `update`
// parameter, but go to `soft-failed` instead.
func (f *Flamenco) onTaskFailed(
ctx context.Context,
logger zerolog.Logger,
worker *persistence.Worker,
task *persistence.Task,
update api.TaskUpdate,
) error {
// Sanity check.
if update.TaskStatus == nil || *update.TaskStatus != api.TaskStatusFailed {
panic("onTaskFailed should only be called with a task update that indicates task failure")
}
// Bookkeeping of failure.
numFailed, err := f.persist.AddWorkerToTaskFailedList(ctx, task, worker)
if err != nil {
return fmt.Errorf("adding worker to failure list of task: %w", err)
}
// f.maybeBlocklistWorker(ctx, w, dbTask)
threshold := f.config.Get().TaskFailAfterSoftFailCount
logger = logger.With().
Int("failedByWorkerCount", numFailed).
Int("threshold", threshold).
Logger()
var (
newStatus api.TaskStatus
localLog, taskLog string
)
pluralizer := pluralize.NewClient()
if numFailed >= threshold {
newStatus = api.TaskStatusFailed
localLog = "too many workers failed this task, hard-failing it"
taskLog = fmt.Sprintf(
"Task failed by %s, Manager will mark it as hard failure",
pluralizer.Pluralize("worker", numFailed, true),
)
} else {
newStatus = api.TaskStatusSoftFailed
localLog = "worker failed this task, soft-failing to give another worker a try"
failsToThreshold := threshold - numFailed
taskLog = fmt.Sprintf(
"Task failed by %s, Manager will mark it as soft failure. %d more %s will cause hard failure.",
pluralizer.Pluralize("worker", numFailed, true),
failsToThreshold,
pluralizer.Pluralize("failure", failsToThreshold, false),
)
}
if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil {
logger.Error().Err(err).Msg("error writing failure notice to task log")
}
logger.Info().Str("newTaskStatus", string(newStatus)).Msg(localLog)
return f.stateMachine.TaskStatusChange(ctx, task, newStatus)
}
func (f *Flamenco) workerPingedTask(
ctx context.Context,
logger zerolog.Logger,