package api_impl // SPDX-License-Identifier: GPL-3.0-or-later import ( "context" "errors" "fmt" "net/http" "github.com/gertd/go-pluralize" "github.com/labstack/echo/v4" "github.com/rs/zerolog" "git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/internal/uuid" "git.blender.org/flamenco/pkg/api" ) func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error { logger := requestLogger(e) worker := requestWorkerOrPanic(e) if !uuid.IsValid(taskID) { logger.Debug().Msg("invalid task ID received") return sendAPIError(e, http.StatusBadRequest, "task ID not valid") } logger = logger.With().Str("taskID", taskID).Logger() // Fetch the task, to see if this worker is even allowed to send us updates. ctx := e.Request().Context() dbTask, err := f.persist.FetchTask(ctx, taskID) if err != nil { logger.Warn().Err(err).Msg("cannot fetch task") if errors.Is(err, persistence.ErrTaskNotFound) { return sendAPIError(e, http.StatusNotFound, "task %+v not found", taskID) } return sendAPIError(e, http.StatusInternalServerError, "error fetching task") } if dbTask == nil { panic("task could not be fetched, but database gave no error either") } // Decode the request body. var taskUpdate api.TaskUpdate if err := e.Bind(&taskUpdate); err != nil { logger.Warn().Err(err).Msg("bad request received") return sendAPIError(e, http.StatusBadRequest, "invalid format") } if dbTask.WorkerID == nil { logger.Warn(). Msg("worker trying to update task that's not assigned to any worker") return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID) } if *dbTask.WorkerID != worker.ID { logger.Warn().Msg("worker trying to update task that's assigned to another worker") return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID) } // Status 'soft-failed' should never be sent by a Worker. Distinguishing // between soft and hard failures is up to the Manager. // Workers should always just send 'failed' instead. if taskUpdate.TaskStatus != nil && *taskUpdate.TaskStatus == api.TaskStatusSoftFailed { logger.Warn().Str("status", string(*taskUpdate.TaskStatus)). Msg("worker trying to update task to not-allowed status") return sendAPIError(e, http.StatusBadRequest, "task status %s not allowed to be sent by Worker", *taskUpdate.TaskStatus) } bgCtx, bgCtxCancel := bgContext() defer bgCtxCancel() taskUpdateErr := f.doTaskUpdate(bgCtx, logger, worker, dbTask, taskUpdate) workerUpdateErr := f.workerPingedTask(logger, dbTask) workerSeenErr := f.workerSeen(logger, worker) if taskUpdateErr != nil { return sendAPIError(e, http.StatusInternalServerError, "unable to handle task update: %v", taskUpdateErr) } if workerUpdateErr != nil { return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker update: %v", workerUpdateErr) } if workerSeenErr != nil { return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker 'last seen' update: %v", workerSeenErr) } return e.NoContent(http.StatusNoContent) } // doTaskUpdate actually updates the task and its log file. func (f *Flamenco) doTaskUpdate( ctx context.Context, logger zerolog.Logger, w *persistence.Worker, dbTask *persistence.Task, update api.TaskUpdate, ) error { if dbTask.Job == nil { logger.Panic().Msg("dbTask.Job is nil, unable to continue") } var dbErrActivity error if update.Activity != nil { dbTask.Activity = *update.Activity // The state machine will also save the task, including its activity, but // relying on that here would create strong cohesion. dbErrActivity = f.persist.SaveTaskActivity(ctx, dbTask) } // Write the log first, because that's likely to contain the cause of the task // state change. Any subsequent task logs, for example generated by the // Manager in response to a status change, should be logged after that. if update.Log != nil { // Errors writing the log to disk are already logged by logStorage, and can be safely ignored here. _ = f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log) } if update.TaskStatus == nil { return dbErrActivity } oldTaskStatus := dbTask.Status var err error if *update.TaskStatus == api.TaskStatusFailed { // Failure is more complex than just going to the failed state. err = f.onTaskFailed(ctx, logger, w, dbTask, update) } else { // Just go to the given state. err = f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus) } if err != nil { logger.Error().Err(err). Str("newTaskStatus", string(*update.TaskStatus)). Str("oldTaskStatus", string(oldTaskStatus)). Msg("error changing task status") return fmt.Errorf("changing status of task %s to %q: %w", dbTask.UUID, *update.TaskStatus, err) } return nil } // onTaskFailed decides whether a task is soft- or hard-failed. Note that this // means that the task may NOT go to the status mentioned in the `update` // parameter, but go to `soft-failed` instead. func (f *Flamenco) onTaskFailed( ctx context.Context, logger zerolog.Logger, worker *persistence.Worker, task *persistence.Task, update api.TaskUpdate, ) error { // Sanity check. if update.TaskStatus == nil || *update.TaskStatus != api.TaskStatusFailed { panic("onTaskFailed should only be called with a task update that indicates task failure") } // Bookkeeping of failure. numFailed, err := f.persist.AddWorkerToTaskFailedList(ctx, task, worker) if err != nil { return fmt.Errorf("adding worker to failure list of task: %w", err) } logger = logger.With().Str("taskType", task.Type).Logger() wasBlacklisted, shoudlFailJob, err := f.maybeBlocklistWorker(ctx, logger, worker, task) if err != nil { return fmt.Errorf("block-listing worker: %w", err) } if shoudlFailJob { // There are no more workers left to finish the job. return f.failJobAfterCatastroficTaskFailure(ctx, logger, worker, task) } if wasBlacklisted { // Requeue all tasks of this job & task type that were hard-failed before by this worker. reason := fmt.Sprintf("worker %s was blocked from tasks of type %q", worker.Name, task.Type) err := f.stateMachine.RequeueFailedTasksOfWorkerOfJob(ctx, worker, task.Job, reason) if err != nil { return err } } // Determine whether this is soft or hard failure. threshold := f.config.Get().TaskFailAfterSoftFailCount logger = logger.With(). Int("failedByWorkerCount", numFailed). Int("threshold", threshold). Logger() if numFailed < threshold { return f.softFailTask(ctx, logger, worker, task, numFailed) } return f.hardFailTask(ctx, logger, worker, task, numFailed) } // maybeBlocklistWorker potentially block-lists the Worker, and checks whether // there are any workers left to run tasks of this type. // // Returns whether the worker was blacklisted, and whether the entire job should // be failed (in case this was the last worker that could have worked on this // task). func (f *Flamenco) maybeBlocklistWorker( ctx context.Context, logger zerolog.Logger, worker *persistence.Worker, task *persistence.Task, ) (wasBlacklisted, shouldFailJob bool, err error) { numFailures, err := f.persist.CountTaskFailuresOfWorker(ctx, task.Job, worker, task.Type) if err != nil { return false, false, fmt.Errorf("counting failures of worker on job %q, task type %q: %w", task.Job.UUID, task.Type, err) } // The received task update hasn't been persisted in the database yet, // so we should count that too. numFailures++ threshold := f.config.Get().BlocklistThreshold if numFailures < threshold { logger.Info(). Int("numFailedTasks", numFailures). Int("threshold", threshold). Msg("not enough failed tasks to blocklist worker") return false, false, nil } // Blocklist the Worker. if err := f.blocklistWorker(ctx, logger, worker, task); err != nil { return true, false, err } // Return hard-failure if there are no workers left for this task type. numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task) return true, numWorkers == 0, err } func (f *Flamenco) blocklistWorker( ctx context.Context, logger zerolog.Logger, worker *persistence.Worker, task *persistence.Task, ) error { logger.Warn(). Str("job", task.Job.UUID). Msg("block-listing worker") err := f.persist.AddWorkerToJobBlocklist(ctx, task.Job, worker, task.Type) if err != nil { return fmt.Errorf("adding worker to block list: %w", err) } return nil } func (f *Flamenco) numWorkersCapableOfRunningTask(ctx context.Context, task *persistence.Task) (int, error) { // See which workers are left to run tasks of this type, on this job, workersLeft, err := f.persist.WorkersLeftToRun(ctx, task.Job, task.Type) if err != nil { return 0, fmt.Errorf("fetching workers available to run tasks of type %q on job %q: %w", task.Job.UUID, task.Type, err) } // Remove (from the list of available workers) those who failed this task before. failers, err := f.persist.FetchTaskFailureList(ctx, task) if err != nil { return 0, fmt.Errorf("fetching failure list of task %q: %w", task.UUID, err) } for _, failure := range failers { delete(workersLeft, failure.UUID) } return len(workersLeft), nil } // failJobAfterCatastroficTaskFailure fails the entire job. // This function is meant to be called when a task is failed, causing a block of // the worker, and leaving no workers any more to do tasks of this type. func (f *Flamenco) failJobAfterCatastroficTaskFailure( ctx context.Context, logger zerolog.Logger, worker *persistence.Worker, task *persistence.Task, ) error { taskLog := fmt.Sprintf( "Task failed by worker %s, Manager will fail the entire job as there are no more workers left for tasks of type %q.", worker.Identifier(), task.Type, ) if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil { logger.Error().Err(err).Msg("error writing failure notice to task log") } if err := f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusFailed); err != nil { logger.Error(). Err(err). Str("newStatus", string(api.TaskStatusFailed)). Msg("error changing task status") } newJobStatus := api.JobStatusFailed logger.Info(). Str("job", task.Job.UUID). Str("newJobStatus", string(newJobStatus)). Msg("no more workers left to run tasks of this type, failing the entire job") reason := fmt.Sprintf("no more workers left to run tasks of type %q", task.Type) return f.stateMachine.JobStatusChange(ctx, task.Job, newJobStatus, reason) } func (f *Flamenco) hardFailTask( ctx context.Context, logger zerolog.Logger, worker *persistence.Worker, task *persistence.Task, numFailed int, ) error { // Add the failure to the task log. pluralizer := pluralize.NewClient() taskLog := fmt.Sprintf( "Task failed by %s, Manager will mark it as hard failure", pluralizer.Pluralize("worker", numFailed, true), ) if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil { logger.Error().Err(err).Msg("error writing failure notice to task log") } // Mark the task as failed. logger.Info().Str("newTaskStatus", string(api.TaskStatusFailed)). Msg("too many workers failed this task, hard-failing it") return f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusFailed) } func (f *Flamenco) softFailTask( ctx context.Context, logger zerolog.Logger, worker *persistence.Worker, task *persistence.Task, numFailed int, ) error { threshold := f.config.Get().TaskFailAfterSoftFailCount failsToThreshold := threshold - numFailed // Add the failure to the task log. pluralizer := pluralize.NewClient() taskLog := fmt.Sprintf( "Task failed by %s, Manager will mark it as soft failure. %d more %s will cause hard failure.", pluralizer.Pluralize("worker", numFailed, true), failsToThreshold, pluralizer.Pluralize("failure", failsToThreshold, false), ) if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil { logger.Error().Err(err).Msg("error writing failure notice to task log") } // Mark the task as soft-failed. logger.Info().Str("newTaskStatus", string(api.TaskStatusSoftFailed)). Msg("worker failed this task, soft-failing to give another worker a try") return f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusSoftFailed) }