
Replace old used-to-be-GORM datastructures (#104305) with sqlc-generated structs. This also makes it possible to use more specific structs that are more taylored to the specific queries, increasing efficiency. This commit mostly deals with workers, including the sleep schedule and task scheduler. Functional changes are kept to a minimum, as the API still serves the same data. Because this work covers so much of Flamenco's code, it's been split up into different commits. Each commit brings Flamenco to a state where it compiles and unit tests pass. Only the result of the final commit has actually been tested properly. Ref: #104343
369 lines
13 KiB
Go
369 lines
13 KiB
Go
package api_impl
|
|
|
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
|
|
"github.com/gertd/go-pluralize"
|
|
"github.com/labstack/echo/v4"
|
|
"github.com/rs/zerolog"
|
|
|
|
"projects.blender.org/studio/flamenco/internal/manager/persistence"
|
|
"projects.blender.org/studio/flamenco/internal/uuid"
|
|
"projects.blender.org/studio/flamenco/pkg/api"
|
|
)
|
|
|
|
func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error {
|
|
logger := requestLogger(e)
|
|
worker := requestWorkerOrPanic(e)
|
|
|
|
if !uuid.IsValid(taskID) {
|
|
logger.Debug().Msg("invalid task ID received")
|
|
return sendAPIError(e, http.StatusBadRequest, "task ID not valid")
|
|
}
|
|
logger = logger.With().Str("taskID", taskID).Logger()
|
|
|
|
// Fetch the task, to see if this worker is even allowed to send us updates.
|
|
ctx := e.Request().Context()
|
|
dbTask, err := f.persist.FetchTask(ctx, taskID)
|
|
if err != nil {
|
|
logger.Warn().Err(err).Msg("cannot fetch task")
|
|
if errors.Is(err, persistence.ErrTaskNotFound) {
|
|
return sendAPIError(e, http.StatusNotFound, "task %+v not found", taskID)
|
|
}
|
|
return sendAPIError(e, http.StatusInternalServerError, "error fetching task")
|
|
}
|
|
if dbTask == nil {
|
|
panic("task could not be fetched, but database gave no error either")
|
|
}
|
|
|
|
// Decode the request body.
|
|
var taskUpdate api.TaskUpdate
|
|
if err := e.Bind(&taskUpdate); err != nil {
|
|
logger.Warn().Err(err).Msg("bad request received")
|
|
return sendAPIError(e, http.StatusBadRequest, "invalid format")
|
|
}
|
|
if dbTask.WorkerID == nil {
|
|
logger.Warn().
|
|
Msg("worker trying to update task that's not assigned to any worker")
|
|
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID)
|
|
}
|
|
if *dbTask.WorkerID != uint(worker.ID) {
|
|
logger.Warn().Msg("worker trying to update task that's assigned to another worker")
|
|
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID)
|
|
}
|
|
|
|
// Status 'soft-failed' should never be sent by a Worker. Distinguishing
|
|
// between soft and hard failures is up to the Manager.
|
|
// Workers should always just send 'failed' instead.
|
|
if taskUpdate.TaskStatus != nil && *taskUpdate.TaskStatus == api.TaskStatusSoftFailed {
|
|
logger.Warn().Str("status", string(*taskUpdate.TaskStatus)).
|
|
Msg("worker trying to update task to not-allowed status")
|
|
return sendAPIError(e, http.StatusBadRequest,
|
|
"task status %s not allowed to be sent by Worker", *taskUpdate.TaskStatus)
|
|
}
|
|
|
|
bgCtx, bgCtxCancel := bgContext()
|
|
defer bgCtxCancel()
|
|
|
|
taskUpdateErr := f.doTaskUpdate(bgCtx, logger, worker, dbTask, taskUpdate)
|
|
workerUpdateErr := f.workerPingedTask(logger, dbTask)
|
|
workerSeenErr := f.workerSeen(logger, worker)
|
|
|
|
if taskUpdateErr != nil {
|
|
return sendAPIError(e, http.StatusInternalServerError, "unable to handle task update: %v", taskUpdateErr)
|
|
}
|
|
if workerUpdateErr != nil {
|
|
return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker update: %v", workerUpdateErr)
|
|
}
|
|
if workerSeenErr != nil {
|
|
return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker 'last seen' update: %v", workerSeenErr)
|
|
}
|
|
|
|
return e.NoContent(http.StatusNoContent)
|
|
}
|
|
|
|
// doTaskUpdate actually updates the task and its log file.
|
|
func (f *Flamenco) doTaskUpdate(
|
|
ctx context.Context,
|
|
logger zerolog.Logger,
|
|
w *persistence.Worker,
|
|
dbTask *persistence.Task,
|
|
update api.TaskUpdate,
|
|
) error {
|
|
if dbTask.Job == nil {
|
|
logger.Panic().Msg("dbTask.Job is nil, unable to continue")
|
|
}
|
|
|
|
var dbErrActivity error
|
|
|
|
if update.Activity != nil {
|
|
dbTask.Activity = *update.Activity
|
|
// The state machine will also save the task, including its activity, but
|
|
// relying on that here would create strong cohesion.
|
|
dbErrActivity = f.persist.SaveTaskActivity(ctx, dbTask)
|
|
}
|
|
|
|
// Write the log first, because that's likely to contain the cause of the task
|
|
// state change. Any subsequent task logs, for example generated by the
|
|
// Manager in response to a status change, should be logged after that.
|
|
if update.Log != nil {
|
|
// Errors writing the log to disk are already logged by logStorage, and can be safely ignored here.
|
|
_ = f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log)
|
|
}
|
|
|
|
if update.TaskStatus == nil {
|
|
return dbErrActivity
|
|
}
|
|
|
|
oldTaskStatus := dbTask.Status
|
|
var err error
|
|
if *update.TaskStatus == api.TaskStatusFailed {
|
|
// Failure is more complex than just going to the failed state.
|
|
err = f.onTaskFailed(ctx, logger, w, dbTask, update)
|
|
} else {
|
|
// Just go to the given state.
|
|
err = f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus)
|
|
}
|
|
|
|
if err != nil {
|
|
logger.Error().Err(err).
|
|
Str("newTaskStatus", string(*update.TaskStatus)).
|
|
Str("oldTaskStatus", string(oldTaskStatus)).
|
|
Msg("error changing task status")
|
|
return fmt.Errorf("changing status of task %s to %q: %w",
|
|
dbTask.UUID, *update.TaskStatus, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// onTaskFailed decides whether a task is soft- or hard-failed. Note that this
|
|
// means that the task may NOT go to the status mentioned in the `update`
|
|
// parameter, but go to `soft-failed` instead.
|
|
func (f *Flamenco) onTaskFailed(
|
|
ctx context.Context,
|
|
logger zerolog.Logger,
|
|
worker *persistence.Worker,
|
|
task *persistence.Task,
|
|
update api.TaskUpdate,
|
|
) error {
|
|
// Sanity check.
|
|
if update.TaskStatus == nil || *update.TaskStatus != api.TaskStatusFailed {
|
|
panic("onTaskFailed should only be called with a task update that indicates task failure")
|
|
}
|
|
|
|
// Bookkeeping of failure.
|
|
numFailed, err := f.persist.AddWorkerToTaskFailedList(ctx, task, worker)
|
|
if err != nil {
|
|
return fmt.Errorf("adding worker to failure list of task: %w", err)
|
|
}
|
|
|
|
logger = logger.With().Str("taskType", task.Type).Logger()
|
|
wasBlacklisted, shoudlFailJob, err := f.maybeBlocklistWorker(ctx, logger, worker, task)
|
|
if err != nil {
|
|
return fmt.Errorf("block-listing worker: %w", err)
|
|
}
|
|
if shoudlFailJob {
|
|
// There are no more workers left to finish the job.
|
|
return f.failJobAfterCatastroficTaskFailure(ctx, logger, worker, task)
|
|
}
|
|
if wasBlacklisted {
|
|
// Requeue all tasks of this job & task type that were hard-failed before by this worker.
|
|
reason := fmt.Sprintf("worker %s was blocked from tasks of type %q", worker.Name, task.Type)
|
|
err := f.stateMachine.RequeueFailedTasksOfWorkerOfJob(ctx, worker, task.Job, reason)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Determine whether this is soft or hard failure.
|
|
threshold := f.config.Get().TaskFailAfterSoftFailCount
|
|
logger = logger.With().
|
|
Int("failedByWorkerCount", numFailed).
|
|
Int("threshold", threshold).
|
|
Logger()
|
|
|
|
if numFailed >= threshold {
|
|
return f.hardFailTask(ctx, logger, worker, task, numFailed)
|
|
}
|
|
|
|
numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// If number of workers capable of running the failed task again is "1",
|
|
// that means we have no worker besides the one that actually failed the task.
|
|
// Because at this point in code the worker hasn't been registered as failing this task yet,
|
|
// and thus it is still counted.
|
|
// In such condition we should just fail the job itself.
|
|
if numWorkers <= 1 {
|
|
return f.failJobAfterCatastroficTaskFailure(ctx, logger, worker, task)
|
|
}
|
|
return f.softFailTask(ctx, logger, worker, task, numFailed)
|
|
}
|
|
|
|
// maybeBlocklistWorker potentially block-lists the Worker, and checks whether
|
|
// there are any workers left to run tasks of this type.
|
|
//
|
|
// Returns whether the worker was blacklisted, and whether the entire job should
|
|
// be failed (in case this was the last worker that could have worked on this
|
|
// task).
|
|
func (f *Flamenco) maybeBlocklistWorker(
|
|
ctx context.Context,
|
|
logger zerolog.Logger,
|
|
worker *persistence.Worker,
|
|
task *persistence.Task,
|
|
) (wasBlacklisted, shouldFailJob bool, err error) {
|
|
numFailures, err := f.persist.CountTaskFailuresOfWorker(ctx, task.Job, worker, task.Type)
|
|
if err != nil {
|
|
return false, false, fmt.Errorf("counting failures of worker on job %q, task type %q: %w", task.Job.UUID, task.Type, err)
|
|
}
|
|
// The received task update hasn't been persisted in the database yet,
|
|
// so we should count that too.
|
|
numFailures++
|
|
|
|
threshold := f.config.Get().BlocklistThreshold
|
|
if numFailures < threshold {
|
|
logger.Info().
|
|
Int("numFailedTasks", numFailures).
|
|
Int("threshold", threshold).
|
|
Msg("not enough failed tasks to blocklist worker")
|
|
return false, false, nil
|
|
}
|
|
|
|
// Blocklist the Worker.
|
|
if err := f.blocklistWorker(ctx, logger, worker, task); err != nil {
|
|
return true, false, err
|
|
}
|
|
|
|
// Return hard-failure if there are no workers left for this task type.
|
|
numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task)
|
|
return true, numWorkers == 0, err
|
|
}
|
|
|
|
func (f *Flamenco) blocklistWorker(
|
|
ctx context.Context,
|
|
logger zerolog.Logger,
|
|
worker *persistence.Worker,
|
|
task *persistence.Task,
|
|
) error {
|
|
logger.Warn().
|
|
Str("job", task.Job.UUID).
|
|
Msg("block-listing worker")
|
|
err := f.persist.AddWorkerToJobBlocklist(ctx, task.Job, worker, task.Type)
|
|
if err != nil {
|
|
return fmt.Errorf("adding worker to block list: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *Flamenco) numWorkersCapableOfRunningTask(ctx context.Context, task *persistence.Task) (int, error) {
|
|
// See which workers are left to run tasks of this type, on this job,
|
|
workersLeft, err := f.persist.WorkersLeftToRun(ctx, task.Job, task.Type)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("fetching workers available to run tasks of type %q on job %q: %w",
|
|
task.Job.UUID, task.Type, err)
|
|
}
|
|
|
|
// Remove (from the list of available workers) those who failed this task before.
|
|
failers, err := f.persist.FetchTaskFailureList(ctx, task)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("fetching failure list of task %q: %w", task.UUID, err)
|
|
}
|
|
for _, failure := range failers {
|
|
delete(workersLeft, failure.UUID)
|
|
}
|
|
|
|
return len(workersLeft), nil
|
|
}
|
|
|
|
// failJobAfterCatastroficTaskFailure fails the entire job.
|
|
// This function is meant to be called when a task is failed, causing a block of
|
|
// the worker, and leaving no workers any more to do tasks of this type.
|
|
func (f *Flamenco) failJobAfterCatastroficTaskFailure(
|
|
ctx context.Context,
|
|
logger zerolog.Logger,
|
|
worker *persistence.Worker,
|
|
task *persistence.Task,
|
|
) error {
|
|
taskLog := fmt.Sprintf(
|
|
"Task failed by worker %s, Manager will fail the entire job as there are no more workers left for tasks of type %q.",
|
|
worker.Identifier(), task.Type,
|
|
)
|
|
if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil {
|
|
logger.Error().Err(err).Msg("error writing failure notice to task log")
|
|
}
|
|
|
|
if err := f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusFailed); err != nil {
|
|
logger.Error().
|
|
Err(err).
|
|
Str("newStatus", string(api.TaskStatusFailed)).
|
|
Msg("error changing task status")
|
|
}
|
|
|
|
newJobStatus := api.JobStatusFailed
|
|
logger.Info().
|
|
Str("job", task.Job.UUID).
|
|
Str("newJobStatus", string(newJobStatus)).
|
|
Msg("no more workers left to run tasks of this type, failing the entire job")
|
|
reason := fmt.Sprintf("no more workers left to run tasks of type %q", task.Type)
|
|
return f.stateMachine.JobStatusChange(ctx, task.Job, newJobStatus, reason)
|
|
}
|
|
|
|
func (f *Flamenco) hardFailTask(
|
|
ctx context.Context,
|
|
logger zerolog.Logger,
|
|
worker *persistence.Worker,
|
|
task *persistence.Task,
|
|
numFailed int,
|
|
) error {
|
|
// Add the failure to the task log.
|
|
pluralizer := pluralize.NewClient()
|
|
taskLog := fmt.Sprintf(
|
|
"Task failed by %s, Manager will mark it as hard failure",
|
|
pluralizer.Pluralize("worker", numFailed, true),
|
|
)
|
|
if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil {
|
|
logger.Error().Err(err).Msg("error writing failure notice to task log")
|
|
}
|
|
|
|
// Mark the task as failed.
|
|
logger.Info().Str("newTaskStatus", string(api.TaskStatusFailed)).
|
|
Msg("too many workers failed this task, hard-failing it")
|
|
return f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusFailed)
|
|
}
|
|
|
|
func (f *Flamenco) softFailTask(
|
|
ctx context.Context,
|
|
logger zerolog.Logger,
|
|
worker *persistence.Worker,
|
|
task *persistence.Task,
|
|
numFailed int,
|
|
) error {
|
|
threshold := f.config.Get().TaskFailAfterSoftFailCount
|
|
failsToThreshold := threshold - numFailed
|
|
|
|
// Add the failure to the task log.
|
|
pluralizer := pluralize.NewClient()
|
|
taskLog := fmt.Sprintf(
|
|
"Task failed by %s, Manager will mark it as soft failure. %d more %s will cause hard failure.",
|
|
pluralizer.Pluralize("worker", numFailed, true),
|
|
failsToThreshold,
|
|
pluralizer.Pluralize("failure", failsToThreshold, false),
|
|
)
|
|
if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil {
|
|
logger.Error().Err(err).Msg("error writing failure notice to task log")
|
|
}
|
|
|
|
// Mark the task as soft-failed.
|
|
logger.Info().Str("newTaskStatus", string(api.TaskStatusSoftFailed)).
|
|
Msg("worker failed this task, soft-failing to give another worker a try")
|
|
return f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusSoftFailed)
|
|
}
|