Sybren A. Stüvel 9a5bbb4131 Manager: implement persistence layer interface for task status machine
Implement the functions used by the task status machine in the DB
persistence layer.
2022-02-25 14:34:29 +01:00

395 lines
14 KiB
Go

package task_state_machine
/* ***** BEGIN GPL LICENSE BLOCK *****
*
* Original Code Copyright (C) 2022 Blender Foundation.
*
* This file is part of Flamenco.
*
* Flamenco is free software: you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free Software
* Foundation, either version 3 of the License, or (at your option) any later
* version.
*
* Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Flamenco. If not, see <https://www.gnu.org/licenses/>.
*
* ***** END GPL LICENSE BLOCK ***** */
import (
"context"
"fmt"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"gitlab.com/blender/flamenco-ng-poc/internal/manager/persistence"
"gitlab.com/blender/flamenco-ng-poc/pkg/api"
)
// taskFailJobPercentage is the percentage of a job's tasks that need to fail to
// trigger failure of the entire job.
const taskFailJobPercentage = 10 // Integer from 0 to 100.
// StateMachine handles task and job status changes.
type StateMachine struct {
persist PersistenceService
}
// Generate mock implementations of these interfaces.
//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks gitlab.com/blender/flamenco-ng-poc/internal/manager/task_state_machine PersistenceService
type PersistenceService interface {
SaveTask(ctx context.Context, task *persistence.Task) error
SaveJobStatus(ctx context.Context, j *persistence.Job) error
JobHasTasksInStatus(ctx context.Context, job *persistence.Job, taskStatus api.TaskStatus) (bool, error)
CountTasksOfJobInStatus(ctx context.Context, job *persistence.Job, taskStatus api.TaskStatus) (numInStatus, numTotal int, err error)
// UpdateJobsTaskStatuses updates the status & activity of the tasks of `job`.
UpdateJobsTaskStatuses(ctx context.Context, job *persistence.Job,
taskStatus api.TaskStatus, activity string) error
// UpdateJobsTaskStatusesConditional updates the status & activity of the tasks of `job`,
// limited to those tasks with status in `statusesToUpdate`.
UpdateJobsTaskStatusesConditional(ctx context.Context, job *persistence.Job,
statusesToUpdate []api.TaskStatus, taskStatus api.TaskStatus, activity string) error
}
// PersistenceService should be a subset of persistence.DB
var _ PersistenceService = (*persistence.DB)(nil)
func NewStateMachine(persist PersistenceService) *StateMachine {
return &StateMachine{
persist: persist,
}
}
// TaskStatusChange updates the task's status to the new one.
// `task` is expected to still have its original status, and have a filled `Job` pointer.
func (sm *StateMachine) TaskStatusChange(
ctx context.Context,
task *persistence.Task,
newTaskStatus api.TaskStatus,
) error {
job := task.Job
if job == nil {
log.Panic().Str("task", task.UUID).Msg("task without job, cannot handle this")
return nil // Will not run because of the panic.
}
oldTaskStatus := task.Status
task.Status = newTaskStatus
logger := log.With().
Str("task", task.UUID).
Str("job", job.UUID).
Str("taskStatusOld", string(oldTaskStatus)).
Str("taskStatusNew", string(newTaskStatus)).
Logger()
logger.Debug().Msg("task state changed")
if err := sm.persist.SaveTask(ctx, task); err != nil {
return fmt.Errorf("error saving task to database: %w", err)
}
if err := sm.updateJobAfterTaskStatusChange(ctx, task, oldTaskStatus); err != nil {
return fmt.Errorf("error updating job after task status change: %w", err)
}
return nil
}
// updateJobAfterTaskStatusChange updates the job status based on the status of
// this task and other tasks in the job.
func (sm *StateMachine) updateJobAfterTaskStatusChange(
ctx context.Context, task *persistence.Task, oldTaskStatus api.TaskStatus,
) error {
job := task.Job
logger := log.With().
Str("job", job.UUID).
Str("task", task.UUID).
Str("taskStatusOld", string(oldTaskStatus)).
Str("taskStatusNew", string(task.Status)).
Logger()
// If the job has status 'ifStatus', move it to status 'thenStatus'.
jobStatusIfAThenB := func(ifStatus, thenStatus api.JobStatus) error {
if job.Status != ifStatus {
return nil
}
logger.Info().
Str("jobStatusOld", string(ifStatus)).
Str("jobStatusNew", string(thenStatus)).
Msg("Job will change status because one of its task changed status")
return sm.JobStatusChange(ctx, job, thenStatus)
}
// Every 'case' in this switch MUST return. Just for sanity's sake.
switch task.Status {
case api.TaskStatusQueued:
// Re-queueing a task on a completed job should re-queue the job too.
return jobStatusIfAThenB(api.JobStatusCompleted, api.JobStatusRequeued)
case api.TaskStatusCancelRequested:
// Requesting cancellation of a single task has no influence on the job itself.
return nil
case api.TaskStatusPaused:
// Pausing a task has no impact on the job.
return nil
case api.TaskStatusCanceled:
// Only trigger cancellation/failure of the job if that was actually requested.
// A user can also cancel a single task from the web UI or API, in which
// case the job should just keep running.
if job.Status != api.JobStatusCancelRequested {
return nil
}
// This could be the last 'cancel-requested' task to go to 'canceled'.
hasCancelReq, err := sm.persist.JobHasTasksInStatus(ctx, job, api.TaskStatusCancelRequested)
if err != nil {
return err
}
if !hasCancelReq {
logger.Info().Msg("last task of job went from cancel-requested to canceled")
return sm.JobStatusChange(ctx, job, api.JobStatusCanceled)
}
return nil
case api.TaskStatusFailed:
// Count the number of failed tasks. If it is over the threshold, fail the job.
numFailed, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusFailed)
if err != nil {
return err
}
failedPercentage := int(float64(numFailed) / float64(numTotal) * 100)
failLogger := logger.With().
Int("taskNumTotal", numTotal).
Int("taskNumFailed", numFailed).
Int("failedPercentage", failedPercentage).
Int("threshold", taskFailJobPercentage).
Logger()
if failedPercentage >= taskFailJobPercentage {
failLogger.Info().Msg("failing job because too many of its tasks failed")
return sm.JobStatusChange(ctx, job, api.JobStatusFailed)
}
// If the job didn't fail, this failure indicates that at least the job is active.
failLogger.Info().Msg("task failed, but not enough to fail the job")
return jobStatusIfAThenB(api.JobStatusQueued, api.JobStatusActive)
case api.TaskStatusActive, api.TaskStatusSoftFailed:
switch job.Status {
case api.JobStatusActive, api.JobStatusCancelRequested:
// Do nothing, job is already in the desired status.
return nil
default:
logger.Info().Msg("job became active because one of its task changed status")
return sm.JobStatusChange(ctx, job, api.JobStatusActive)
}
case api.TaskStatusCompleted:
numComplete, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted)
if err != nil {
return err
}
if numComplete == numTotal {
logger.Info().Msg("all tasks of job are completed, job is completed")
return sm.JobStatusChange(ctx, job, api.JobStatusCompleted)
}
logger.Info().
Int("taskNumTotal", numTotal).
Int("taskNumComplete", numComplete).
Msg("task completed; there are more tasks to do")
return jobStatusIfAThenB(api.JobStatusQueued, api.JobStatusActive)
default:
logger.Warn().Msg("task obtained status that Flamenco did not expect")
return nil
}
}
func (sm *StateMachine) JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus) error {
// Job status changes can trigger task status changes, which can trigger the
// next job status change. Keep looping over these job status changes until
// there is no more change left to do.
var err error
for newJobStatus != "" && newJobStatus != job.Status {
oldJobStatus := job.Status
job.Status = newJobStatus
logger := log.With().
Str("job", job.UUID).
Str("jobStatusOld", string(oldJobStatus)).
Str("jobStatusNew", string(newJobStatus)).
Logger()
logger.Info().Msg("job status changed")
// Persist the new job status.
err = sm.persist.SaveJobStatus(ctx, job)
if err != nil {
return fmt.Errorf("error saving job status change %q to %q to database: %w",
oldJobStatus, newJobStatus, err)
}
// Handle the status change.
newJobStatus, err = sm.updateTasksAfterJobStatusChange(ctx, logger, job, oldJobStatus)
if err != nil {
return fmt.Errorf("error updating job's tasks after job status change: %w", err)
}
}
return nil
}
// updateTasksAfterJobStatusChange updates the status of its tasks based on the
// new status of this job.
//
// NOTE: this function assumes that the job already has its new status.
//
// Returns the new state the job should go into after this change, or an empty
// string if there is no subsequent change necessary.
func (sm *StateMachine) updateTasksAfterJobStatusChange(
ctx context.Context,
logger zerolog.Logger,
job *persistence.Job,
oldJobStatus api.JobStatus,
) (api.JobStatus, error) {
// Every case in this switch MUST return, for sanity sake.
switch job.Status {
case api.JobStatusCompleted, api.JobStatusCanceled:
// Nothing to do; this will happen as a response to all tasks receiving this status.
return "", nil
case api.JobStatusActive:
// Nothing to do; this happens when a task gets started, which has nothing to
// do with other tasks in the job.
return "", nil
case api.JobStatusCancelRequested, api.JobStatusFailed:
return sm.cancelTasks(ctx, logger, job)
case api.JobStatusRequeued:
return sm.requeueTasks(ctx, logger, job, oldJobStatus)
case api.JobStatusQueued:
return sm.checkTaskCompletion(ctx, logger, job)
default:
logger.Warn().Msg("unknown job status change, ignoring")
return "", nil
}
}
// Directly cancel any task that might run in the future.
//
// Returns the next job status, if a status change is required.
func (sm *StateMachine) cancelTasks(
ctx context.Context, logger zerolog.Logger, job *persistence.Job,
) (api.JobStatus, error) {
logger.Info().Msg("cancelling tasks of job")
// Any task that is running or might run in the future should get cancelled.
taskStatusesToCancel := []api.TaskStatus{
api.TaskStatusActive,
api.TaskStatusQueued,
api.TaskStatusSoftFailed,
}
err := sm.persist.UpdateJobsTaskStatusesConditional(
ctx, job, taskStatusesToCancel, api.TaskStatusCanceled,
fmt.Sprintf("Manager cancelled this task because the job got status %q.", job.Status),
)
if err != nil {
return "", fmt.Errorf("error cancelling tasks of job %s: %w", job.UUID, err)
}
// If cancellation was requested, it has now happened, so the job can transition.
if job.Status == api.JobStatusCancelRequested {
logger.Info().Msg("all tasks of job cancelled, job can go to 'cancelled' status")
return api.JobStatusCanceled, nil
}
// This could mean cancellation was triggered by failure of the job, in which
// case the job is already in the correct status.
return "", nil
}
// requeueTasks re-queues all tasks of the job.
//
// This function assumes that the current job status is "requeued".
//
// Returns the new job status, if this status transition should be followed by
// another one.
func (sm *StateMachine) requeueTasks(
ctx context.Context, logger zerolog.Logger, job *persistence.Job, oldJobStatus api.JobStatus,
) (api.JobStatus, error) {
var err error
if job.Status != api.JobStatusRequeued {
logger.Warn().Msg("unexpected job status in StateMachine::requeueTasks()")
}
switch oldJobStatus {
case api.JobStatusUnderConstruction:
// Nothing to do, the job compiler has just finished its work; the tasks have
// already been set to 'queued' status.
logger.Debug().Msg("ignoring job status change")
return "", nil
case api.JobStatusCompleted:
// Re-queue all tasks.
err = sm.persist.UpdateJobsTaskStatuses(ctx, job, api.TaskStatusQueued,
fmt.Sprintf("Queued because job transitioned status from %q to %q", oldJobStatus, job.Status))
default:
// Re-queue only the non-completed tasks.
statusesToUpdate := []api.TaskStatus{
api.TaskStatusCancelRequested,
api.TaskStatusCanceled,
api.TaskStatusFailed,
api.TaskStatusPaused,
api.TaskStatusSoftFailed,
}
err = sm.persist.UpdateJobsTaskStatusesConditional(ctx, job,
statusesToUpdate, api.TaskStatusQueued,
fmt.Sprintf("Queued because job transitioned status from %q to %q", oldJobStatus, job.Status))
}
if err != nil {
return "", fmt.Errorf("error queueing tasks of job %s: %w", job.UUID, err)
}
// TODO: also reset the 'failed by workers' blacklist.
// The appropriate tasks have been requeued, so now the job can go from "requeued" to "queued".
return api.JobStatusQueued, nil
}
// checkTaskCompletion returns "completed" as next job status when all tasks of
// the job are completed.
//
// Returns the new job status, if this status transition should be followed by
// another one.
func (sm *StateMachine) checkTaskCompletion(
ctx context.Context, logger zerolog.Logger, job *persistence.Job,
) (api.JobStatus, error) {
numCompleted, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted)
if err != nil {
return "", fmt.Errorf("checking task completion of job %s: %w", job.UUID, err)
}
if numCompleted < numTotal {
logger.Debug().
Int("numTasksCompleted", numCompleted).
Int("numTasksTotal", numTotal).
Msg("not all tasks of job are completed")
return "", nil
}
logger.Info().Msg("job has all tasks completed, transition job to 'completed'")
return api.JobStatusCompleted, nil
}