flamenco/internal/manager/api_impl/worker_task_updates.go
Sybren A. Stüvel 02fac6a4df Change Go package name from git.blender.org to projects.blender.org
Change the package base name of the Go code, from
`git.blender.org/flamenco` to `projects.blender.org/studio/flamenco`.

The old location, `git.blender.org`, has no longer been use since the
[migration to Gitea][1]. The new package names now reflect the actual
location where Flamenco is hosted.

[1]: https://code.blender.org/2023/02/new-blender-development-infrastructure/
2023-08-01 12:42:31 +02:00

369 lines
13 KiB
Go

package api_impl
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"errors"
"fmt"
"net/http"
"github.com/gertd/go-pluralize"
"github.com/labstack/echo/v4"
"github.com/rs/zerolog"
"projects.blender.org/studio/flamenco/internal/manager/persistence"
"projects.blender.org/studio/flamenco/internal/uuid"
"projects.blender.org/studio/flamenco/pkg/api"
)
func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error {
logger := requestLogger(e)
worker := requestWorkerOrPanic(e)
if !uuid.IsValid(taskID) {
logger.Debug().Msg("invalid task ID received")
return sendAPIError(e, http.StatusBadRequest, "task ID not valid")
}
logger = logger.With().Str("taskID", taskID).Logger()
// Fetch the task, to see if this worker is even allowed to send us updates.
ctx := e.Request().Context()
dbTask, err := f.persist.FetchTask(ctx, taskID)
if err != nil {
logger.Warn().Err(err).Msg("cannot fetch task")
if errors.Is(err, persistence.ErrTaskNotFound) {
return sendAPIError(e, http.StatusNotFound, "task %+v not found", taskID)
}
return sendAPIError(e, http.StatusInternalServerError, "error fetching task")
}
if dbTask == nil {
panic("task could not be fetched, but database gave no error either")
}
// Decode the request body.
var taskUpdate api.TaskUpdate
if err := e.Bind(&taskUpdate); err != nil {
logger.Warn().Err(err).Msg("bad request received")
return sendAPIError(e, http.StatusBadRequest, "invalid format")
}
if dbTask.WorkerID == nil {
logger.Warn().
Msg("worker trying to update task that's not assigned to any worker")
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID)
}
if *dbTask.WorkerID != worker.ID {
logger.Warn().Msg("worker trying to update task that's assigned to another worker")
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID)
}
// Status 'soft-failed' should never be sent by a Worker. Distinguishing
// between soft and hard failures is up to the Manager.
// Workers should always just send 'failed' instead.
if taskUpdate.TaskStatus != nil && *taskUpdate.TaskStatus == api.TaskStatusSoftFailed {
logger.Warn().Str("status", string(*taskUpdate.TaskStatus)).
Msg("worker trying to update task to not-allowed status")
return sendAPIError(e, http.StatusBadRequest,
"task status %s not allowed to be sent by Worker", *taskUpdate.TaskStatus)
}
bgCtx, bgCtxCancel := bgContext()
defer bgCtxCancel()
taskUpdateErr := f.doTaskUpdate(bgCtx, logger, worker, dbTask, taskUpdate)
workerUpdateErr := f.workerPingedTask(logger, dbTask)
workerSeenErr := f.workerSeen(logger, worker)
if taskUpdateErr != nil {
return sendAPIError(e, http.StatusInternalServerError, "unable to handle task update: %v", taskUpdateErr)
}
if workerUpdateErr != nil {
return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker update: %v", workerUpdateErr)
}
if workerSeenErr != nil {
return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker 'last seen' update: %v", workerSeenErr)
}
return e.NoContent(http.StatusNoContent)
}
// doTaskUpdate actually updates the task and its log file.
func (f *Flamenco) doTaskUpdate(
ctx context.Context,
logger zerolog.Logger,
w *persistence.Worker,
dbTask *persistence.Task,
update api.TaskUpdate,
) error {
if dbTask.Job == nil {
logger.Panic().Msg("dbTask.Job is nil, unable to continue")
}
var dbErrActivity error
if update.Activity != nil {
dbTask.Activity = *update.Activity
// The state machine will also save the task, including its activity, but
// relying on that here would create strong cohesion.
dbErrActivity = f.persist.SaveTaskActivity(ctx, dbTask)
}
// Write the log first, because that's likely to contain the cause of the task
// state change. Any subsequent task logs, for example generated by the
// Manager in response to a status change, should be logged after that.
if update.Log != nil {
// Errors writing the log to disk are already logged by logStorage, and can be safely ignored here.
_ = f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log)
}
if update.TaskStatus == nil {
return dbErrActivity
}
oldTaskStatus := dbTask.Status
var err error
if *update.TaskStatus == api.TaskStatusFailed {
// Failure is more complex than just going to the failed state.
err = f.onTaskFailed(ctx, logger, w, dbTask, update)
} else {
// Just go to the given state.
err = f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus)
}
if err != nil {
logger.Error().Err(err).
Str("newTaskStatus", string(*update.TaskStatus)).
Str("oldTaskStatus", string(oldTaskStatus)).
Msg("error changing task status")
return fmt.Errorf("changing status of task %s to %q: %w",
dbTask.UUID, *update.TaskStatus, err)
}
return nil
}
// onTaskFailed decides whether a task is soft- or hard-failed. Note that this
// means that the task may NOT go to the status mentioned in the `update`
// parameter, but go to `soft-failed` instead.
func (f *Flamenco) onTaskFailed(
ctx context.Context,
logger zerolog.Logger,
worker *persistence.Worker,
task *persistence.Task,
update api.TaskUpdate,
) error {
// Sanity check.
if update.TaskStatus == nil || *update.TaskStatus != api.TaskStatusFailed {
panic("onTaskFailed should only be called with a task update that indicates task failure")
}
// Bookkeeping of failure.
numFailed, err := f.persist.AddWorkerToTaskFailedList(ctx, task, worker)
if err != nil {
return fmt.Errorf("adding worker to failure list of task: %w", err)
}
logger = logger.With().Str("taskType", task.Type).Logger()
wasBlacklisted, shoudlFailJob, err := f.maybeBlocklistWorker(ctx, logger, worker, task)
if err != nil {
return fmt.Errorf("block-listing worker: %w", err)
}
if shoudlFailJob {
// There are no more workers left to finish the job.
return f.failJobAfterCatastroficTaskFailure(ctx, logger, worker, task)
}
if wasBlacklisted {
// Requeue all tasks of this job & task type that were hard-failed before by this worker.
reason := fmt.Sprintf("worker %s was blocked from tasks of type %q", worker.Name, task.Type)
err := f.stateMachine.RequeueFailedTasksOfWorkerOfJob(ctx, worker, task.Job, reason)
if err != nil {
return err
}
}
// Determine whether this is soft or hard failure.
threshold := f.config.Get().TaskFailAfterSoftFailCount
logger = logger.With().
Int("failedByWorkerCount", numFailed).
Int("threshold", threshold).
Logger()
if numFailed >= threshold {
return f.hardFailTask(ctx, logger, worker, task, numFailed)
}
numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task)
if err != nil {
return err
}
// If number of workers capable of running the failed task again is "1",
// that means we have no worker besides the one that actually failed the task.
// Because at this point in code the worker hasn't been registered as failing this task yet,
// and thus it is still counted.
// In such condition we should just fail the job itself.
if numWorkers <= 1 {
return f.failJobAfterCatastroficTaskFailure(ctx, logger, worker, task)
}
return f.softFailTask(ctx, logger, worker, task, numFailed)
}
// maybeBlocklistWorker potentially block-lists the Worker, and checks whether
// there are any workers left to run tasks of this type.
//
// Returns whether the worker was blacklisted, and whether the entire job should
// be failed (in case this was the last worker that could have worked on this
// task).
func (f *Flamenco) maybeBlocklistWorker(
ctx context.Context,
logger zerolog.Logger,
worker *persistence.Worker,
task *persistence.Task,
) (wasBlacklisted, shouldFailJob bool, err error) {
numFailures, err := f.persist.CountTaskFailuresOfWorker(ctx, task.Job, worker, task.Type)
if err != nil {
return false, false, fmt.Errorf("counting failures of worker on job %q, task type %q: %w", task.Job.UUID, task.Type, err)
}
// The received task update hasn't been persisted in the database yet,
// so we should count that too.
numFailures++
threshold := f.config.Get().BlocklistThreshold
if numFailures < threshold {
logger.Info().
Int("numFailedTasks", numFailures).
Int("threshold", threshold).
Msg("not enough failed tasks to blocklist worker")
return false, false, nil
}
// Blocklist the Worker.
if err := f.blocklistWorker(ctx, logger, worker, task); err != nil {
return true, false, err
}
// Return hard-failure if there are no workers left for this task type.
numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task)
return true, numWorkers == 0, err
}
func (f *Flamenco) blocklistWorker(
ctx context.Context,
logger zerolog.Logger,
worker *persistence.Worker,
task *persistence.Task,
) error {
logger.Warn().
Str("job", task.Job.UUID).
Msg("block-listing worker")
err := f.persist.AddWorkerToJobBlocklist(ctx, task.Job, worker, task.Type)
if err != nil {
return fmt.Errorf("adding worker to block list: %w", err)
}
return nil
}
func (f *Flamenco) numWorkersCapableOfRunningTask(ctx context.Context, task *persistence.Task) (int, error) {
// See which workers are left to run tasks of this type, on this job,
workersLeft, err := f.persist.WorkersLeftToRun(ctx, task.Job, task.Type)
if err != nil {
return 0, fmt.Errorf("fetching workers available to run tasks of type %q on job %q: %w",
task.Job.UUID, task.Type, err)
}
// Remove (from the list of available workers) those who failed this task before.
failers, err := f.persist.FetchTaskFailureList(ctx, task)
if err != nil {
return 0, fmt.Errorf("fetching failure list of task %q: %w", task.UUID, err)
}
for _, failure := range failers {
delete(workersLeft, failure.UUID)
}
return len(workersLeft), nil
}
// failJobAfterCatastroficTaskFailure fails the entire job.
// This function is meant to be called when a task is failed, causing a block of
// the worker, and leaving no workers any more to do tasks of this type.
func (f *Flamenco) failJobAfterCatastroficTaskFailure(
ctx context.Context,
logger zerolog.Logger,
worker *persistence.Worker,
task *persistence.Task,
) error {
taskLog := fmt.Sprintf(
"Task failed by worker %s, Manager will fail the entire job as there are no more workers left for tasks of type %q.",
worker.Identifier(), task.Type,
)
if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil {
logger.Error().Err(err).Msg("error writing failure notice to task log")
}
if err := f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusFailed); err != nil {
logger.Error().
Err(err).
Str("newStatus", string(api.TaskStatusFailed)).
Msg("error changing task status")
}
newJobStatus := api.JobStatusFailed
logger.Info().
Str("job", task.Job.UUID).
Str("newJobStatus", string(newJobStatus)).
Msg("no more workers left to run tasks of this type, failing the entire job")
reason := fmt.Sprintf("no more workers left to run tasks of type %q", task.Type)
return f.stateMachine.JobStatusChange(ctx, task.Job, newJobStatus, reason)
}
func (f *Flamenco) hardFailTask(
ctx context.Context,
logger zerolog.Logger,
worker *persistence.Worker,
task *persistence.Task,
numFailed int,
) error {
// Add the failure to the task log.
pluralizer := pluralize.NewClient()
taskLog := fmt.Sprintf(
"Task failed by %s, Manager will mark it as hard failure",
pluralizer.Pluralize("worker", numFailed, true),
)
if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil {
logger.Error().Err(err).Msg("error writing failure notice to task log")
}
// Mark the task as failed.
logger.Info().Str("newTaskStatus", string(api.TaskStatusFailed)).
Msg("too many workers failed this task, hard-failing it")
return f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusFailed)
}
func (f *Flamenco) softFailTask(
ctx context.Context,
logger zerolog.Logger,
worker *persistence.Worker,
task *persistence.Task,
numFailed int,
) error {
threshold := f.config.Get().TaskFailAfterSoftFailCount
failsToThreshold := threshold - numFailed
// Add the failure to the task log.
pluralizer := pluralize.NewClient()
taskLog := fmt.Sprintf(
"Task failed by %s, Manager will mark it as soft failure. %d more %s will cause hard failure.",
pluralizer.Pluralize("worker", numFailed, true),
failsToThreshold,
pluralizer.Pluralize("failure", failsToThreshold, false),
)
if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil {
logger.Error().Err(err).Msg("error writing failure notice to task log")
}
// Mark the task as soft-failed.
logger.Info().Str("newTaskStatus", string(api.TaskStatusSoftFailed)).
Msg("worker failed this task, soft-failing to give another worker a try")
return f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusSoftFailed)
}