
Change the package base name of the Go code, from `git.blender.org/flamenco` to `projects.blender.org/studio/flamenco`. The old location, `git.blender.org`, has no longer been use since the [migration to Gitea][1]. The new package names now reflect the actual location where Flamenco is hosted. [1]: https://code.blender.org/2023/02/new-blender-development-infrastructure/
369 lines
13 KiB
Go
369 lines
13 KiB
Go
package api_impl
|
|
|
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
|
|
"github.com/gertd/go-pluralize"
|
|
"github.com/labstack/echo/v4"
|
|
"github.com/rs/zerolog"
|
|
|
|
"projects.blender.org/studio/flamenco/internal/manager/persistence"
|
|
"projects.blender.org/studio/flamenco/internal/uuid"
|
|
"projects.blender.org/studio/flamenco/pkg/api"
|
|
)
|
|
|
|
func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error {
|
|
logger := requestLogger(e)
|
|
worker := requestWorkerOrPanic(e)
|
|
|
|
if !uuid.IsValid(taskID) {
|
|
logger.Debug().Msg("invalid task ID received")
|
|
return sendAPIError(e, http.StatusBadRequest, "task ID not valid")
|
|
}
|
|
logger = logger.With().Str("taskID", taskID).Logger()
|
|
|
|
// Fetch the task, to see if this worker is even allowed to send us updates.
|
|
ctx := e.Request().Context()
|
|
dbTask, err := f.persist.FetchTask(ctx, taskID)
|
|
if err != nil {
|
|
logger.Warn().Err(err).Msg("cannot fetch task")
|
|
if errors.Is(err, persistence.ErrTaskNotFound) {
|
|
return sendAPIError(e, http.StatusNotFound, "task %+v not found", taskID)
|
|
}
|
|
return sendAPIError(e, http.StatusInternalServerError, "error fetching task")
|
|
}
|
|
if dbTask == nil {
|
|
panic("task could not be fetched, but database gave no error either")
|
|
}
|
|
|
|
// Decode the request body.
|
|
var taskUpdate api.TaskUpdate
|
|
if err := e.Bind(&taskUpdate); err != nil {
|
|
logger.Warn().Err(err).Msg("bad request received")
|
|
return sendAPIError(e, http.StatusBadRequest, "invalid format")
|
|
}
|
|
if dbTask.WorkerID == nil {
|
|
logger.Warn().
|
|
Msg("worker trying to update task that's not assigned to any worker")
|
|
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID)
|
|
}
|
|
if *dbTask.WorkerID != worker.ID {
|
|
logger.Warn().Msg("worker trying to update task that's assigned to another worker")
|
|
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID)
|
|
}
|
|
|
|
// Status 'soft-failed' should never be sent by a Worker. Distinguishing
|
|
// between soft and hard failures is up to the Manager.
|
|
// Workers should always just send 'failed' instead.
|
|
if taskUpdate.TaskStatus != nil && *taskUpdate.TaskStatus == api.TaskStatusSoftFailed {
|
|
logger.Warn().Str("status", string(*taskUpdate.TaskStatus)).
|
|
Msg("worker trying to update task to not-allowed status")
|
|
return sendAPIError(e, http.StatusBadRequest,
|
|
"task status %s not allowed to be sent by Worker", *taskUpdate.TaskStatus)
|
|
}
|
|
|
|
bgCtx, bgCtxCancel := bgContext()
|
|
defer bgCtxCancel()
|
|
|
|
taskUpdateErr := f.doTaskUpdate(bgCtx, logger, worker, dbTask, taskUpdate)
|
|
workerUpdateErr := f.workerPingedTask(logger, dbTask)
|
|
workerSeenErr := f.workerSeen(logger, worker)
|
|
|
|
if taskUpdateErr != nil {
|
|
return sendAPIError(e, http.StatusInternalServerError, "unable to handle task update: %v", taskUpdateErr)
|
|
}
|
|
if workerUpdateErr != nil {
|
|
return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker update: %v", workerUpdateErr)
|
|
}
|
|
if workerSeenErr != nil {
|
|
return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker 'last seen' update: %v", workerSeenErr)
|
|
}
|
|
|
|
return e.NoContent(http.StatusNoContent)
|
|
}
|
|
|
|
// doTaskUpdate actually updates the task and its log file.
|
|
func (f *Flamenco) doTaskUpdate(
|
|
ctx context.Context,
|
|
logger zerolog.Logger,
|
|
w *persistence.Worker,
|
|
dbTask *persistence.Task,
|
|
update api.TaskUpdate,
|
|
) error {
|
|
if dbTask.Job == nil {
|
|
logger.Panic().Msg("dbTask.Job is nil, unable to continue")
|
|
}
|
|
|
|
var dbErrActivity error
|
|
|
|
if update.Activity != nil {
|
|
dbTask.Activity = *update.Activity
|
|
// The state machine will also save the task, including its activity, but
|
|
// relying on that here would create strong cohesion.
|
|
dbErrActivity = f.persist.SaveTaskActivity(ctx, dbTask)
|
|
}
|
|
|
|
// Write the log first, because that's likely to contain the cause of the task
|
|
// state change. Any subsequent task logs, for example generated by the
|
|
// Manager in response to a status change, should be logged after that.
|
|
if update.Log != nil {
|
|
// Errors writing the log to disk are already logged by logStorage, and can be safely ignored here.
|
|
_ = f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log)
|
|
}
|
|
|
|
if update.TaskStatus == nil {
|
|
return dbErrActivity
|
|
}
|
|
|
|
oldTaskStatus := dbTask.Status
|
|
var err error
|
|
if *update.TaskStatus == api.TaskStatusFailed {
|
|
// Failure is more complex than just going to the failed state.
|
|
err = f.onTaskFailed(ctx, logger, w, dbTask, update)
|
|
} else {
|
|
// Just go to the given state.
|
|
err = f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus)
|
|
}
|
|
|
|
if err != nil {
|
|
logger.Error().Err(err).
|
|
Str("newTaskStatus", string(*update.TaskStatus)).
|
|
Str("oldTaskStatus", string(oldTaskStatus)).
|
|
Msg("error changing task status")
|
|
return fmt.Errorf("changing status of task %s to %q: %w",
|
|
dbTask.UUID, *update.TaskStatus, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// onTaskFailed decides whether a task is soft- or hard-failed. Note that this
|
|
// means that the task may NOT go to the status mentioned in the `update`
|
|
// parameter, but go to `soft-failed` instead.
|
|
func (f *Flamenco) onTaskFailed(
|
|
ctx context.Context,
|
|
logger zerolog.Logger,
|
|
worker *persistence.Worker,
|
|
task *persistence.Task,
|
|
update api.TaskUpdate,
|
|
) error {
|
|
// Sanity check.
|
|
if update.TaskStatus == nil || *update.TaskStatus != api.TaskStatusFailed {
|
|
panic("onTaskFailed should only be called with a task update that indicates task failure")
|
|
}
|
|
|
|
// Bookkeeping of failure.
|
|
numFailed, err := f.persist.AddWorkerToTaskFailedList(ctx, task, worker)
|
|
if err != nil {
|
|
return fmt.Errorf("adding worker to failure list of task: %w", err)
|
|
}
|
|
|
|
logger = logger.With().Str("taskType", task.Type).Logger()
|
|
wasBlacklisted, shoudlFailJob, err := f.maybeBlocklistWorker(ctx, logger, worker, task)
|
|
if err != nil {
|
|
return fmt.Errorf("block-listing worker: %w", err)
|
|
}
|
|
if shoudlFailJob {
|
|
// There are no more workers left to finish the job.
|
|
return f.failJobAfterCatastroficTaskFailure(ctx, logger, worker, task)
|
|
}
|
|
if wasBlacklisted {
|
|
// Requeue all tasks of this job & task type that were hard-failed before by this worker.
|
|
reason := fmt.Sprintf("worker %s was blocked from tasks of type %q", worker.Name, task.Type)
|
|
err := f.stateMachine.RequeueFailedTasksOfWorkerOfJob(ctx, worker, task.Job, reason)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Determine whether this is soft or hard failure.
|
|
threshold := f.config.Get().TaskFailAfterSoftFailCount
|
|
logger = logger.With().
|
|
Int("failedByWorkerCount", numFailed).
|
|
Int("threshold", threshold).
|
|
Logger()
|
|
|
|
if numFailed >= threshold {
|
|
return f.hardFailTask(ctx, logger, worker, task, numFailed)
|
|
}
|
|
|
|
numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// If number of workers capable of running the failed task again is "1",
|
|
// that means we have no worker besides the one that actually failed the task.
|
|
// Because at this point in code the worker hasn't been registered as failing this task yet,
|
|
// and thus it is still counted.
|
|
// In such condition we should just fail the job itself.
|
|
if numWorkers <= 1 {
|
|
return f.failJobAfterCatastroficTaskFailure(ctx, logger, worker, task)
|
|
}
|
|
return f.softFailTask(ctx, logger, worker, task, numFailed)
|
|
}
|
|
|
|
// maybeBlocklistWorker potentially block-lists the Worker, and checks whether
|
|
// there are any workers left to run tasks of this type.
|
|
//
|
|
// Returns whether the worker was blacklisted, and whether the entire job should
|
|
// be failed (in case this was the last worker that could have worked on this
|
|
// task).
|
|
func (f *Flamenco) maybeBlocklistWorker(
|
|
ctx context.Context,
|
|
logger zerolog.Logger,
|
|
worker *persistence.Worker,
|
|
task *persistence.Task,
|
|
) (wasBlacklisted, shouldFailJob bool, err error) {
|
|
numFailures, err := f.persist.CountTaskFailuresOfWorker(ctx, task.Job, worker, task.Type)
|
|
if err != nil {
|
|
return false, false, fmt.Errorf("counting failures of worker on job %q, task type %q: %w", task.Job.UUID, task.Type, err)
|
|
}
|
|
// The received task update hasn't been persisted in the database yet,
|
|
// so we should count that too.
|
|
numFailures++
|
|
|
|
threshold := f.config.Get().BlocklistThreshold
|
|
if numFailures < threshold {
|
|
logger.Info().
|
|
Int("numFailedTasks", numFailures).
|
|
Int("threshold", threshold).
|
|
Msg("not enough failed tasks to blocklist worker")
|
|
return false, false, nil
|
|
}
|
|
|
|
// Blocklist the Worker.
|
|
if err := f.blocklistWorker(ctx, logger, worker, task); err != nil {
|
|
return true, false, err
|
|
}
|
|
|
|
// Return hard-failure if there are no workers left for this task type.
|
|
numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task)
|
|
return true, numWorkers == 0, err
|
|
}
|
|
|
|
func (f *Flamenco) blocklistWorker(
|
|
ctx context.Context,
|
|
logger zerolog.Logger,
|
|
worker *persistence.Worker,
|
|
task *persistence.Task,
|
|
) error {
|
|
logger.Warn().
|
|
Str("job", task.Job.UUID).
|
|
Msg("block-listing worker")
|
|
err := f.persist.AddWorkerToJobBlocklist(ctx, task.Job, worker, task.Type)
|
|
if err != nil {
|
|
return fmt.Errorf("adding worker to block list: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *Flamenco) numWorkersCapableOfRunningTask(ctx context.Context, task *persistence.Task) (int, error) {
|
|
// See which workers are left to run tasks of this type, on this job,
|
|
workersLeft, err := f.persist.WorkersLeftToRun(ctx, task.Job, task.Type)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("fetching workers available to run tasks of type %q on job %q: %w",
|
|
task.Job.UUID, task.Type, err)
|
|
}
|
|
|
|
// Remove (from the list of available workers) those who failed this task before.
|
|
failers, err := f.persist.FetchTaskFailureList(ctx, task)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("fetching failure list of task %q: %w", task.UUID, err)
|
|
}
|
|
for _, failure := range failers {
|
|
delete(workersLeft, failure.UUID)
|
|
}
|
|
|
|
return len(workersLeft), nil
|
|
}
|
|
|
|
// failJobAfterCatastroficTaskFailure fails the entire job.
|
|
// This function is meant to be called when a task is failed, causing a block of
|
|
// the worker, and leaving no workers any more to do tasks of this type.
|
|
func (f *Flamenco) failJobAfterCatastroficTaskFailure(
|
|
ctx context.Context,
|
|
logger zerolog.Logger,
|
|
worker *persistence.Worker,
|
|
task *persistence.Task,
|
|
) error {
|
|
taskLog := fmt.Sprintf(
|
|
"Task failed by worker %s, Manager will fail the entire job as there are no more workers left for tasks of type %q.",
|
|
worker.Identifier(), task.Type,
|
|
)
|
|
if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil {
|
|
logger.Error().Err(err).Msg("error writing failure notice to task log")
|
|
}
|
|
|
|
if err := f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusFailed); err != nil {
|
|
logger.Error().
|
|
Err(err).
|
|
Str("newStatus", string(api.TaskStatusFailed)).
|
|
Msg("error changing task status")
|
|
}
|
|
|
|
newJobStatus := api.JobStatusFailed
|
|
logger.Info().
|
|
Str("job", task.Job.UUID).
|
|
Str("newJobStatus", string(newJobStatus)).
|
|
Msg("no more workers left to run tasks of this type, failing the entire job")
|
|
reason := fmt.Sprintf("no more workers left to run tasks of type %q", task.Type)
|
|
return f.stateMachine.JobStatusChange(ctx, task.Job, newJobStatus, reason)
|
|
}
|
|
|
|
func (f *Flamenco) hardFailTask(
|
|
ctx context.Context,
|
|
logger zerolog.Logger,
|
|
worker *persistence.Worker,
|
|
task *persistence.Task,
|
|
numFailed int,
|
|
) error {
|
|
// Add the failure to the task log.
|
|
pluralizer := pluralize.NewClient()
|
|
taskLog := fmt.Sprintf(
|
|
"Task failed by %s, Manager will mark it as hard failure",
|
|
pluralizer.Pluralize("worker", numFailed, true),
|
|
)
|
|
if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil {
|
|
logger.Error().Err(err).Msg("error writing failure notice to task log")
|
|
}
|
|
|
|
// Mark the task as failed.
|
|
logger.Info().Str("newTaskStatus", string(api.TaskStatusFailed)).
|
|
Msg("too many workers failed this task, hard-failing it")
|
|
return f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusFailed)
|
|
}
|
|
|
|
func (f *Flamenco) softFailTask(
|
|
ctx context.Context,
|
|
logger zerolog.Logger,
|
|
worker *persistence.Worker,
|
|
task *persistence.Task,
|
|
numFailed int,
|
|
) error {
|
|
threshold := f.config.Get().TaskFailAfterSoftFailCount
|
|
failsToThreshold := threshold - numFailed
|
|
|
|
// Add the failure to the task log.
|
|
pluralizer := pluralize.NewClient()
|
|
taskLog := fmt.Sprintf(
|
|
"Task failed by %s, Manager will mark it as soft failure. %d more %s will cause hard failure.",
|
|
pluralizer.Pluralize("worker", numFailed, true),
|
|
failsToThreshold,
|
|
pluralizer.Pluralize("failure", failsToThreshold, false),
|
|
)
|
|
if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil {
|
|
logger.Error().Err(err).Msg("error writing failure notice to task log")
|
|
}
|
|
|
|
// Mark the task as soft-failed.
|
|
logger.Info().Str("newTaskStatus", string(api.TaskStatusSoftFailed)).
|
|
Msg("worker failed this task, soft-failing to give another worker a try")
|
|
return f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusSoftFailed)
|
|
}
|