
No functional changes. Reviewed-on: https://projects.blender.org/studio/flamenco/pulls/104373
865 lines
24 KiB
Go
865 lines
24 KiB
Go
package persistence
|
|
|
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"database/sql/driver"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"projects.blender.org/studio/flamenco/internal/manager/job_compilers"
|
|
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
|
|
"projects.blender.org/studio/flamenco/pkg/api"
|
|
)
|
|
|
|
type Job = sqlc.Job
|
|
type Task = sqlc.Task
|
|
|
|
// TaskJobWorker represents a task, with identifieres for its job and the worker it's assigned to.
|
|
type TaskJobWorker struct {
|
|
Task Task
|
|
JobUUID string
|
|
WorkerUUID string
|
|
}
|
|
|
|
// TaskJob represents a task, with identifier for its job.
|
|
type TaskJob struct {
|
|
Task Task
|
|
JobUUID string
|
|
IsActive bool // Whether the worker assigned to this task is actually working on it.
|
|
}
|
|
|
|
type StringInterfaceMap map[string]interface{}
|
|
type StringStringMap map[string]string
|
|
|
|
// Commands is the schema used for (un)marshalling sqlc.Task.Commands.
|
|
type Commands []Command
|
|
|
|
type Command struct {
|
|
Name string `json:"name"`
|
|
Parameters StringInterfaceMap `json:"parameters"`
|
|
}
|
|
|
|
func (c Commands) Value() (driver.Value, error) {
|
|
return json.Marshal(c)
|
|
}
|
|
func (c *Commands) Scan(value interface{}) error {
|
|
b, ok := value.([]byte)
|
|
if !ok {
|
|
return errors.New("type assertion to []byte failed")
|
|
}
|
|
return json.Unmarshal(b, &c)
|
|
}
|
|
|
|
func (js StringInterfaceMap) Value() (driver.Value, error) {
|
|
return json.Marshal(js)
|
|
}
|
|
func (js *StringInterfaceMap) Scan(value interface{}) error {
|
|
b, ok := value.([]byte)
|
|
if !ok {
|
|
return errors.New("type assertion to []byte failed")
|
|
}
|
|
return json.Unmarshal(b, &js)
|
|
}
|
|
|
|
func (js StringStringMap) Value() (driver.Value, error) {
|
|
return json.Marshal(js)
|
|
}
|
|
func (js *StringStringMap) Scan(value interface{}) error {
|
|
b, ok := value.([]byte)
|
|
if !ok {
|
|
return errors.New("type assertion to []byte failed")
|
|
}
|
|
return json.Unmarshal(b, &js)
|
|
}
|
|
|
|
// TaskFailure keeps track of which Worker failed which Task.
|
|
type TaskFailure = sqlc.TaskFailure
|
|
|
|
// StoreJob stores an AuthoredJob and its tasks, and saves it to the database.
|
|
// The job will be in 'under construction' status. It is up to the caller to transition it to its desired initial status.
|
|
func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error {
|
|
|
|
// Run all queries in a single transaction.
|
|
qtx, err := db.queriesWithTX()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer qtx.rollback()
|
|
|
|
// Serialise the embedded JSON.
|
|
settings, err := json.Marshal(authoredJob.Settings)
|
|
if err != nil {
|
|
return fmt.Errorf("converting job settings to JSON: %w", err)
|
|
}
|
|
metadata, err := json.Marshal(authoredJob.Metadata)
|
|
if err != nil {
|
|
return fmt.Errorf("converting job metadata to JSON: %w", err)
|
|
}
|
|
|
|
// Create the job itself.
|
|
params := sqlc.CreateJobParams{
|
|
CreatedAt: db.now(),
|
|
UUID: authoredJob.JobID,
|
|
Name: authoredJob.Name,
|
|
JobType: authoredJob.JobType,
|
|
Priority: int64(authoredJob.Priority),
|
|
Status: authoredJob.Status,
|
|
Settings: settings,
|
|
Metadata: metadata,
|
|
StorageShamanCheckoutID: authoredJob.Storage.ShamanCheckoutID,
|
|
}
|
|
|
|
if authoredJob.WorkerTagUUID != "" {
|
|
workerTag, err := qtx.queries.FetchWorkerTagByUUID(ctx, authoredJob.WorkerTagUUID)
|
|
switch {
|
|
case errors.Is(err, sql.ErrNoRows):
|
|
return fmt.Errorf("no worker tag %q found", authoredJob.WorkerTagUUID)
|
|
case err != nil:
|
|
return fmt.Errorf("could not find worker tag %q: %w", authoredJob.WorkerTagUUID, err)
|
|
}
|
|
params.WorkerTagID = sql.NullInt64{Int64: workerTag.ID, Valid: true}
|
|
}
|
|
|
|
log.Debug().
|
|
Str("job", params.UUID).
|
|
Str("type", params.JobType).
|
|
Str("name", params.Name).
|
|
Str("status", string(params.Status)).
|
|
Msg("persistence: storing authored job")
|
|
|
|
jobID, err := qtx.queries.CreateJob(ctx, params)
|
|
if err != nil {
|
|
return jobError(err, "storing job")
|
|
}
|
|
|
|
err = db.storeAuthoredJobTask(ctx, qtx, jobID, &authoredJob)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return qtx.commit()
|
|
}
|
|
|
|
// StoreAuthoredJobTask is a low-level function that is only used for recreating an existing job's tasks.
|
|
// It stores `authoredJob`'s tasks, but attaches them to the already-persisted `job`.
|
|
func (db *DB) StoreAuthoredJobTask(
|
|
ctx context.Context,
|
|
job *Job,
|
|
authoredJob *job_compilers.AuthoredJob,
|
|
) error {
|
|
qtx, err := db.queriesWithTX()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer qtx.rollback()
|
|
|
|
err = db.storeAuthoredJobTask(ctx, qtx, int64(job.ID), authoredJob)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return qtx.commit()
|
|
}
|
|
|
|
// storeAuthoredJobTask stores the tasks of the authored job.
|
|
// Note that this function does NOT commit the database transaction. That is up
|
|
// to the caller.
|
|
func (db *DB) storeAuthoredJobTask(
|
|
ctx context.Context,
|
|
qtx *queriesTX,
|
|
jobID int64,
|
|
authoredJob *job_compilers.AuthoredJob,
|
|
) error {
|
|
type TaskInfo struct {
|
|
ID int64
|
|
UUID string
|
|
Name string
|
|
}
|
|
|
|
// Give every task the same creation timestamp.
|
|
now := db.now()
|
|
|
|
uuidToTask := make(map[string]TaskInfo)
|
|
for taskIndex, authoredTask := range authoredJob.Tasks {
|
|
// Marshal commands to JSON.
|
|
var commands []Command
|
|
for _, authoredCommand := range authoredTask.Commands {
|
|
commands = append(commands, Command{
|
|
Name: authoredCommand.Name,
|
|
Parameters: StringInterfaceMap(authoredCommand.Parameters),
|
|
})
|
|
}
|
|
commandsJSON, err := json.Marshal(commands)
|
|
if err != nil {
|
|
return fmt.Errorf("could not convert commands of task %q to JSON: %w",
|
|
authoredTask.Name, err)
|
|
}
|
|
|
|
taskParams := sqlc.CreateTaskParams{
|
|
CreatedAt: now,
|
|
Name: authoredTask.Name,
|
|
Type: authoredTask.Type,
|
|
UUID: authoredTask.UUID,
|
|
JobID: jobID,
|
|
IndexInJob: int64(taskIndex + 1), // indexInJob is base-1.
|
|
Priority: int64(authoredTask.Priority),
|
|
Status: api.TaskStatusQueued,
|
|
Commands: commandsJSON,
|
|
// dependencies are stored below.
|
|
}
|
|
|
|
log.Debug().
|
|
Str("task", taskParams.UUID).
|
|
Str("type", taskParams.Type).
|
|
Str("name", taskParams.Name).
|
|
Str("status", string(taskParams.Status)).
|
|
Msg("persistence: storing authored task")
|
|
|
|
taskID, err := qtx.queries.CreateTask(ctx, taskParams)
|
|
if err != nil {
|
|
return taskError(err, "storing task: %v", err)
|
|
}
|
|
|
|
uuidToTask[authoredTask.UUID] = TaskInfo{
|
|
ID: taskID,
|
|
UUID: taskParams.UUID,
|
|
Name: taskParams.Name,
|
|
}
|
|
}
|
|
|
|
// Store the dependencies between tasks.
|
|
for _, authoredTask := range authoredJob.Tasks {
|
|
if len(authoredTask.Dependencies) == 0 {
|
|
continue
|
|
}
|
|
|
|
taskInfo, ok := uuidToTask[authoredTask.UUID]
|
|
if !ok {
|
|
return taskError(nil, "unable to find task %q in the database, even though it was just authored", authoredTask.UUID)
|
|
}
|
|
|
|
deps := make([]*TaskInfo, len(authoredTask.Dependencies))
|
|
for idx, authoredDep := range authoredTask.Dependencies {
|
|
depTask, ok := uuidToTask[authoredDep.UUID]
|
|
if !ok {
|
|
return taskError(nil, "finding task with UUID %q; a task depends on a task that is not part of this job", authoredDep.UUID)
|
|
}
|
|
|
|
err := qtx.queries.StoreTaskDependency(ctx, sqlc.StoreTaskDependencyParams{
|
|
TaskID: taskInfo.ID,
|
|
DependencyID: depTask.ID,
|
|
})
|
|
if err != nil {
|
|
return taskError(err, "error storing task %q depending on task %q", authoredTask.UUID, depTask.UUID)
|
|
}
|
|
|
|
deps[idx] = &depTask
|
|
}
|
|
|
|
if log.Debug().Enabled() {
|
|
depNames := make([]string, len(deps))
|
|
for i, dep := range deps {
|
|
depNames[i] = dep.Name
|
|
}
|
|
log.Debug().
|
|
Str("task", taskInfo.UUID).
|
|
Str("name", taskInfo.Name).
|
|
Strs("dependencies", depNames).
|
|
Msg("persistence: storing authored task dependencies")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// FetchJob fetches a single job, without fetching its tasks.
|
|
func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) {
|
|
queries := db.queries()
|
|
|
|
job, err := queries.FetchJob(ctx, jobUUID)
|
|
switch {
|
|
case errors.Is(err, sql.ErrNoRows):
|
|
return nil, ErrJobNotFound
|
|
case err != nil:
|
|
return nil, jobError(err, "fetching job")
|
|
}
|
|
|
|
return &job, nil
|
|
}
|
|
|
|
// FetchJob fetches a single job by its database ID, without fetching its tasks.
|
|
func (db *DB) FetchJobByID(ctx context.Context, jobID int64) (*Job, error) {
|
|
queries := db.queries()
|
|
|
|
job, err := queries.FetchJobByID(ctx, jobID)
|
|
switch {
|
|
case errors.Is(err, sql.ErrNoRows):
|
|
return nil, ErrJobNotFound
|
|
case err != nil:
|
|
return nil, jobError(err, "fetching job")
|
|
}
|
|
|
|
return &job, nil
|
|
}
|
|
|
|
func (db *DB) FetchJobs(ctx context.Context) ([]*Job, error) {
|
|
queries := db.queries()
|
|
|
|
sqlcJobs, err := queries.FetchJobs(ctx)
|
|
if err != nil {
|
|
return nil, jobError(err, "fetching all jobs")
|
|
}
|
|
|
|
// TODO: just return []Job instead of converting the array.
|
|
jobPointers := make([]*Job, len(sqlcJobs))
|
|
for index := range sqlcJobs {
|
|
jobPointers[index] = &sqlcJobs[index]
|
|
}
|
|
|
|
return jobPointers, nil
|
|
}
|
|
|
|
// FetchJobShamanCheckoutID fetches the job's Shaman Checkout ID.
|
|
func (db *DB) FetchJobShamanCheckoutID(ctx context.Context, jobUUID string) (string, error) {
|
|
queries := db.queries()
|
|
|
|
checkoutID, err := queries.FetchJobShamanCheckoutID(ctx, jobUUID)
|
|
switch {
|
|
case errors.Is(err, sql.ErrNoRows):
|
|
return "", ErrJobNotFound
|
|
case err != nil:
|
|
return "", jobError(err, "fetching job")
|
|
}
|
|
return checkoutID, nil
|
|
}
|
|
|
|
// DeleteJob deletes a job from the database.
|
|
// The deletion cascades to its tasks and other job-related tables.
|
|
func (db *DB) DeleteJob(ctx context.Context, jobUUID string) error {
|
|
// As a safety measure, refuse to delete jobs unless foreign key constraints are active.
|
|
fkEnabled, err := db.areForeignKeysEnabled(ctx)
|
|
switch {
|
|
case err != nil:
|
|
return err
|
|
case !fkEnabled:
|
|
return ErrDeletingWithoutFK
|
|
}
|
|
|
|
queries := db.queries()
|
|
|
|
if err := queries.DeleteJob(ctx, jobUUID); err != nil {
|
|
return jobError(err, "deleting job")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RequestJobDeletion sets the job's "DeletionRequestedAt" field to "now".
|
|
func (db *DB) RequestJobDeletion(ctx context.Context, j *Job) error {
|
|
queries := db.queries()
|
|
|
|
// Update the given job itself, so we don't have to re-fetch it from the database.
|
|
j.DeleteRequestedAt = db.nowNullable()
|
|
|
|
params := sqlc.RequestJobDeletionParams{
|
|
Now: j.DeleteRequestedAt,
|
|
JobID: int64(j.ID),
|
|
}
|
|
|
|
log.Trace().
|
|
Str("job", j.UUID).
|
|
Time("deletedAt", params.Now.Time).
|
|
Msg("database: marking job as deletion-requested")
|
|
if err := queries.RequestJobDeletion(ctx, params); err != nil {
|
|
return jobError(err, "queueing job for deletion")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RequestJobMassDeletion sets multiple job's "DeletionRequestedAt" field to "now".
|
|
// The list of affected job UUIDs is returned.
|
|
func (db *DB) RequestJobMassDeletion(ctx context.Context, lastUpdatedMax time.Time) ([]string, error) {
|
|
queries := db.queries()
|
|
|
|
// In order to be able to report which jobs were affected, first fetch the
|
|
// list of jobs, then update them.
|
|
uuids, err := queries.FetchJobUUIDsUpdatedBefore(ctx, sql.NullTime{
|
|
Time: lastUpdatedMax,
|
|
Valid: true,
|
|
})
|
|
switch {
|
|
case err != nil:
|
|
return nil, jobError(err, "fetching jobs by last-modified timestamp")
|
|
case len(uuids) == 0:
|
|
return nil, ErrJobNotFound
|
|
}
|
|
|
|
// Update the selected jobs.
|
|
params := sqlc.RequestMassJobDeletionParams{
|
|
Now: db.nowNullable(),
|
|
UUIDs: uuids,
|
|
}
|
|
if err := queries.RequestMassJobDeletion(ctx, params); err != nil {
|
|
return nil, jobError(err, "marking jobs as deletion-requested")
|
|
}
|
|
|
|
return uuids, nil
|
|
}
|
|
|
|
func (db *DB) FetchJobsDeletionRequested(ctx context.Context) ([]string, error) {
|
|
queries := db.queries()
|
|
|
|
uuids, err := queries.FetchJobsDeletionRequested(ctx)
|
|
if err != nil {
|
|
return nil, jobError(err, "fetching jobs marked for deletion")
|
|
}
|
|
return uuids, nil
|
|
}
|
|
|
|
func (db *DB) FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*Job, error) {
|
|
queries := db.queries()
|
|
|
|
jobs, err := queries.FetchJobsInStatus(ctx, jobStatuses)
|
|
if err != nil {
|
|
return nil, jobError(err, "fetching jobs in status %q", jobStatuses)
|
|
}
|
|
|
|
// TODO: just return []Job instead of converting the array.
|
|
pointers := make([]*Job, len(jobs))
|
|
for index := range jobs {
|
|
pointers[index] = &jobs[index]
|
|
}
|
|
|
|
return pointers, nil
|
|
}
|
|
|
|
// SaveJobStatus saves the job's Status and Activity fields.
|
|
func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error {
|
|
queries := db.queries()
|
|
|
|
params := sqlc.SaveJobStatusParams{
|
|
Now: db.nowNullable(),
|
|
ID: int64(j.ID),
|
|
Status: j.Status,
|
|
Activity: j.Activity,
|
|
}
|
|
|
|
err := queries.SaveJobStatus(ctx, params)
|
|
if err != nil {
|
|
return jobError(err, "saving job status")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SaveJobPriority saves the job's Priority field.
|
|
func (db *DB) SaveJobPriority(ctx context.Context, j *Job) error {
|
|
queries := db.queries()
|
|
|
|
params := sqlc.SaveJobPriorityParams{
|
|
Now: db.nowNullable(),
|
|
ID: int64(j.ID),
|
|
Priority: int64(j.Priority),
|
|
}
|
|
|
|
err := queries.SaveJobPriority(ctx, params)
|
|
if err != nil {
|
|
return jobError(err, "saving job priority")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SaveJobStorageInfo saves the job's Storage field.
|
|
// NOTE: this function does NOT update the job's `UpdatedAt` field. This is
|
|
// necessary for `cmd/shaman-checkout-id-setter` to do its work quietly.
|
|
func (db *DB) SaveJobStorageInfo(ctx context.Context, j *Job) error {
|
|
queries := db.queries()
|
|
|
|
params := sqlc.SaveJobStorageInfoParams{
|
|
ID: int64(j.ID),
|
|
StorageShamanCheckoutID: j.StorageShamanCheckoutID,
|
|
}
|
|
|
|
err := queries.SaveJobStorageInfo(ctx, params)
|
|
if err != nil {
|
|
return jobError(err, "saving job storage")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) FetchTask(ctx context.Context, taskUUID string) (TaskJobWorker, error) {
|
|
queries := db.queries()
|
|
|
|
taskRow, err := queries.FetchTask(ctx, taskUUID)
|
|
if err != nil {
|
|
return TaskJobWorker{}, taskError(err, "fetching task %s", taskUUID)
|
|
}
|
|
|
|
taskJobWorker := TaskJobWorker{
|
|
Task: taskRow.Task,
|
|
JobUUID: taskRow.JobUUID.String,
|
|
WorkerUUID: taskRow.WorkerUUID.String,
|
|
}
|
|
|
|
return taskJobWorker, nil
|
|
}
|
|
|
|
// FetchTaskJobUUID fetches the job UUID of the given task.
|
|
func (db *DB) FetchTaskJobUUID(ctx context.Context, taskUUID string) (string, error) {
|
|
queries := db.queries()
|
|
|
|
jobUUID, err := queries.FetchTaskJobUUID(ctx, taskUUID)
|
|
if err != nil {
|
|
return "", taskError(err, "fetching job UUID of task %s", taskUUID)
|
|
}
|
|
if !jobUUID.Valid {
|
|
return "", PersistenceError{Message: fmt.Sprintf("unable to find job of task %s", taskUUID)}
|
|
}
|
|
return jobUUID.String, nil
|
|
}
|
|
|
|
// SaveTask updates a task that already exists in the database.
|
|
// This function is not used by the Flamenco API, only by unit tests.
|
|
func (db *DB) SaveTask(ctx context.Context, t *Task) error {
|
|
if t.ID == 0 {
|
|
panic(fmt.Errorf("cannot use this function to insert a task"))
|
|
}
|
|
|
|
queries := db.queries()
|
|
|
|
commandsJSON, err := json.Marshal(t.Commands)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot convert commands to JSON: %w", err)
|
|
}
|
|
|
|
param := sqlc.UpdateTaskParams{
|
|
UpdatedAt: db.nowNullable(),
|
|
Name: t.Name,
|
|
Type: t.Type,
|
|
Priority: t.Priority,
|
|
Status: t.Status,
|
|
Commands: commandsJSON,
|
|
Activity: t.Activity,
|
|
ID: t.ID,
|
|
WorkerID: t.WorkerID,
|
|
LastTouchedAt: t.LastTouchedAt,
|
|
}
|
|
|
|
err = queries.UpdateTask(ctx, param)
|
|
if err != nil {
|
|
return taskError(err, "updating task")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) SaveTaskStatus(ctx context.Context, t *Task) error {
|
|
queries := db.queries()
|
|
|
|
err := queries.UpdateTaskStatus(ctx, sqlc.UpdateTaskStatusParams{
|
|
UpdatedAt: db.nowNullable(),
|
|
Status: t.Status,
|
|
ID: int64(t.ID),
|
|
})
|
|
if err != nil {
|
|
return taskError(err, "saving task status")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) SaveTaskActivity(ctx context.Context, t *Task) error {
|
|
queries := db.queries()
|
|
|
|
err := queries.UpdateTaskActivity(ctx, sqlc.UpdateTaskActivityParams{
|
|
UpdatedAt: db.nowNullable(),
|
|
Activity: t.Activity,
|
|
ID: int64(t.ID),
|
|
})
|
|
if err != nil {
|
|
return taskError(err, "saving task activity")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TaskAssignToWorker assigns the given task to the given worker.
|
|
// This function is only used by unit tests. During normal operation, Flamenco
|
|
// uses the code in task_scheduler.go to assign tasks to workers.
|
|
func (db *DB) TaskAssignToWorker(ctx context.Context, t *Task, w *Worker) error {
|
|
queries := db.queries()
|
|
|
|
t.WorkerID = sql.NullInt64{Int64: w.ID, Valid: true}
|
|
|
|
err := queries.TaskAssignToWorker(ctx, sqlc.TaskAssignToWorkerParams{
|
|
UpdatedAt: db.nowNullable(),
|
|
WorkerID: t.WorkerID,
|
|
ID: t.ID,
|
|
})
|
|
if err != nil {
|
|
return taskError(err, "assigning task %s to worker %s", t.UUID, w.UUID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) FetchTasksOfWorkerInStatus(ctx context.Context, worker *Worker, taskStatus api.TaskStatus) ([]TaskJob, error) {
|
|
queries := db.queries()
|
|
|
|
rows, err := queries.FetchTasksOfWorkerInStatus(ctx, sqlc.FetchTasksOfWorkerInStatusParams{
|
|
WorkerID: sql.NullInt64{
|
|
Int64: int64(worker.ID),
|
|
Valid: true,
|
|
},
|
|
TaskStatus: taskStatus,
|
|
})
|
|
if err != nil {
|
|
return nil, taskError(err, "finding tasks of worker %s in status %q", worker.UUID, taskStatus)
|
|
}
|
|
|
|
result := make([]TaskJob, len(rows))
|
|
for i := range rows {
|
|
result[i].Task = rows[i].Task
|
|
result[i].JobUUID = rows[i].JobUUID
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (db *DB) FetchTasksOfWorkerInStatusOfJob(ctx context.Context, worker *Worker, taskStatus api.TaskStatus, jobUUID string) ([]*Task, error) {
|
|
queries := db.queries()
|
|
|
|
rows, err := queries.FetchTasksOfWorkerInStatusOfJob(ctx, sqlc.FetchTasksOfWorkerInStatusOfJobParams{
|
|
WorkerID: sql.NullInt64{
|
|
Int64: int64(worker.ID),
|
|
Valid: true,
|
|
},
|
|
JobUUID: jobUUID,
|
|
TaskStatus: taskStatus,
|
|
})
|
|
if err != nil {
|
|
return nil, taskError(err, "finding tasks of worker %s in status %q and job %s", worker.UUID, taskStatus, jobUUID)
|
|
}
|
|
|
|
// TODO: just return []Task instead of creating an array of pointers.
|
|
result := make([]*Task, len(rows))
|
|
for i := range rows {
|
|
result[i] = &rows[i].Task
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (db *DB) JobHasTasksInStatus(ctx context.Context, job *Job, taskStatus api.TaskStatus) (bool, error) {
|
|
queries := db.queries()
|
|
|
|
count, err := queries.JobCountTasksInStatus(ctx, sqlc.JobCountTasksInStatusParams{
|
|
JobID: int64(job.ID),
|
|
TaskStatus: taskStatus,
|
|
})
|
|
if err != nil {
|
|
return false, taskError(err, "counting tasks of job %s in status %q", job.UUID, taskStatus)
|
|
}
|
|
|
|
return count > 0, nil
|
|
}
|
|
|
|
// CountTasksOfJobInStatus counts the number of tasks in the job.
|
|
// It returns two counts, one is the number of tasks in the given statuses, the
|
|
// other is the total number of tasks of the job.
|
|
func (db *DB) CountTasksOfJobInStatus(
|
|
ctx context.Context,
|
|
job *Job,
|
|
taskStatuses ...api.TaskStatus,
|
|
) (numInStatus, numTotal int, err error) {
|
|
queries := db.queries()
|
|
|
|
results, err := queries.JobCountTaskStatuses(ctx, int64(job.ID))
|
|
if err != nil {
|
|
return 0, 0, jobError(err, "count tasks of job %s in status %q", job.UUID, taskStatuses)
|
|
}
|
|
|
|
// Create lookup table for which statuses to count.
|
|
countStatus := map[api.TaskStatus]bool{}
|
|
for _, status := range taskStatuses {
|
|
countStatus[status] = true
|
|
}
|
|
|
|
// Count the number of tasks per status.
|
|
for _, result := range results {
|
|
if countStatus[api.TaskStatus(result.Status)] {
|
|
numInStatus += int(result.NumTasks)
|
|
}
|
|
numTotal += int(result.NumTasks)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// FetchTaskIDsOfJob returns all tasks of the given job.
|
|
func (db *DB) FetchTasksOfJob(ctx context.Context, job *Job) ([]TaskJobWorker, error) {
|
|
queries := db.queries()
|
|
|
|
rows, err := queries.FetchTasksOfJob(ctx, int64(job.ID))
|
|
if err != nil {
|
|
return nil, taskError(err, "fetching tasks of job %s", job.UUID)
|
|
}
|
|
|
|
result := make([]TaskJobWorker, len(rows))
|
|
for i := range rows {
|
|
result[i].Task = rows[i].Task
|
|
result[i].JobUUID = job.UUID
|
|
result[i].WorkerUUID = rows[i].WorkerUUID.String
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// FetchTasksOfJobInStatus returns those tasks of the given job that have any of the given statuses.
|
|
func (db *DB) FetchTasksOfJobInStatus(ctx context.Context, job *Job, taskStatuses ...api.TaskStatus) ([]TaskJobWorker, error) {
|
|
queries := db.queries()
|
|
|
|
rows, err := queries.FetchTasksOfJobInStatus(ctx, sqlc.FetchTasksOfJobInStatusParams{
|
|
JobID: int64(job.ID),
|
|
TaskStatus: taskStatuses,
|
|
})
|
|
if err != nil {
|
|
return nil, taskError(err, "fetching tasks of job %s in status %q", job.UUID, taskStatuses)
|
|
}
|
|
|
|
result := make([]TaskJobWorker, len(rows))
|
|
for i := range rows {
|
|
result[i].Task = rows[i].Task
|
|
result[i].JobUUID = job.UUID
|
|
result[i].WorkerUUID = rows[i].WorkerUUID.String
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// UpdateJobsTaskStatuses updates the status & activity of all tasks of `job`.
|
|
func (db *DB) UpdateJobsTaskStatuses(ctx context.Context, job *Job,
|
|
taskStatus api.TaskStatus, activity string) error {
|
|
|
|
if taskStatus == "" {
|
|
return taskError(nil, "empty status not allowed")
|
|
}
|
|
|
|
queries := db.queries()
|
|
|
|
err := queries.UpdateJobsTaskStatuses(ctx, sqlc.UpdateJobsTaskStatusesParams{
|
|
UpdatedAt: db.nowNullable(),
|
|
Status: taskStatus,
|
|
Activity: activity,
|
|
JobID: int64(job.ID),
|
|
})
|
|
|
|
if err != nil {
|
|
return taskError(err, "updating status of all tasks of job %s", job.UUID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateJobsTaskStatusesConditional updates the status & activity of the tasks of `job`,
|
|
// limited to those tasks with status in `statusesToUpdate`.
|
|
func (db *DB) UpdateJobsTaskStatusesConditional(ctx context.Context, job *Job,
|
|
statusesToUpdate []api.TaskStatus, taskStatus api.TaskStatus, activity string) error {
|
|
|
|
if taskStatus == "" {
|
|
return taskError(nil, "empty status not allowed")
|
|
}
|
|
|
|
queries := db.queries()
|
|
|
|
err := queries.UpdateJobsTaskStatusesConditional(ctx, sqlc.UpdateJobsTaskStatusesConditionalParams{
|
|
UpdatedAt: db.nowNullable(),
|
|
Status: taskStatus,
|
|
Activity: activity,
|
|
JobID: int64(job.ID),
|
|
StatusesToUpdate: statusesToUpdate,
|
|
})
|
|
|
|
if err != nil {
|
|
return taskError(err, "updating status of all tasks in status %v of job %s", statusesToUpdate, job.UUID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TaskTouchedByWorker marks the task as 'touched' by a worker. This is used for timeout detection.
|
|
func (db *DB) TaskTouchedByWorker(ctx context.Context, taskUUID string) error {
|
|
queries := db.queries()
|
|
|
|
now := db.nowNullable()
|
|
err := queries.TaskTouchedByWorker(ctx, sqlc.TaskTouchedByWorkerParams{
|
|
UpdatedAt: now,
|
|
LastTouchedAt: now,
|
|
UUID: taskUUID,
|
|
})
|
|
if err != nil {
|
|
return taskError(err, "saving task 'last touched at'")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AddWorkerToTaskFailedList records that the given worker failed the given task.
|
|
// This information is not used directly by the task scheduler. It's used to
|
|
// determine whether there are any workers left to perform this task, and thus
|
|
// whether it should be hard- or soft-failed.
|
|
//
|
|
// Calling this multiple times with the same task/worker is a no-op.
|
|
//
|
|
// Returns the new number of workers that failed this task.
|
|
func (db *DB) AddWorkerToTaskFailedList(ctx context.Context, t *Task, w *Worker) (numFailed int, err error) {
|
|
queries := db.queries()
|
|
|
|
err = queries.AddWorkerToTaskFailedList(ctx, sqlc.AddWorkerToTaskFailedListParams{
|
|
CreatedAt: db.nowNullable().Time,
|
|
TaskID: int64(t.ID),
|
|
WorkerID: int64(w.ID),
|
|
})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
numFailed64, err := queries.CountWorkersFailingTask(ctx, int64(t.ID))
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Integer literals are of type `int`, so that's just a bit nicer to work with
|
|
// than `int64`.
|
|
if numFailed64 > math.MaxInt32 {
|
|
log.Warn().Int64("numFailed", numFailed64).Msg("number of failed workers is crazy high, something is wrong here")
|
|
return math.MaxInt32, nil
|
|
}
|
|
return int(numFailed64), nil
|
|
}
|
|
|
|
// ClearFailureListOfTask clears the list of workers that failed this task.
|
|
func (db *DB) ClearFailureListOfTask(ctx context.Context, t *Task) error {
|
|
queries := db.queries()
|
|
|
|
return queries.ClearFailureListOfTask(ctx, int64(t.ID))
|
|
}
|
|
|
|
// ClearFailureListOfJob en-mass, for all tasks of this job, clears the list of
|
|
// workers that failed those tasks.
|
|
func (db *DB) ClearFailureListOfJob(ctx context.Context, j *Job) error {
|
|
queries := db.queries()
|
|
|
|
return queries.ClearFailureListOfJob(ctx, int64(j.ID))
|
|
}
|
|
|
|
func (db *DB) FetchTaskFailureList(ctx context.Context, t *Task) ([]*Worker, error) {
|
|
queries := db.queries()
|
|
|
|
failureList, err := queries.FetchTaskFailureList(ctx, int64(t.ID))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
workers := make([]*Worker, len(failureList))
|
|
for idx := range failureList {
|
|
workers[idx] = &failureList[idx].Worker
|
|
}
|
|
return workers, nil
|
|
}
|