
Replace old used-to-be-GORM datastructures (#104305) with sqlc-generated structs. This also makes it possible to use more specific structs that are more taylored to the specific queries, increasing efficiency. This commit deals with the remaining areas, like the job deleter, task timeout checker, and task state machine. And anything else to get things running again. Functional changes are kept to a minimum, as the API still serves the same data. Because this work covers so much of Flamenco's code, it's been split up into different commits. Each commit brings Flamenco to a state where it compiles and unit tests pass. Only the result of the final commit has actually been tested properly. Ref: #104343
865 lines
24 KiB
Go
865 lines
24 KiB
Go
package persistence
|
|
|
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"database/sql/driver"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"projects.blender.org/studio/flamenco/internal/manager/job_compilers"
|
|
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
|
|
"projects.blender.org/studio/flamenco/pkg/api"
|
|
)
|
|
|
|
type Job = sqlc.Job
|
|
type Task = sqlc.Task
|
|
|
|
// TaskJobWorker represents a task, with identifieres for its job and the worker it's assigned to.
|
|
type TaskJobWorker struct {
|
|
Task Task
|
|
JobUUID string
|
|
WorkerUUID string
|
|
}
|
|
|
|
// TaskJob represents a task, with identifier for its job.
|
|
type TaskJob struct {
|
|
Task Task
|
|
JobUUID string
|
|
IsActive bool // Whether the worker assigned to this task is actually working on it.
|
|
}
|
|
|
|
type StringInterfaceMap map[string]interface{}
|
|
type StringStringMap map[string]string
|
|
|
|
// Commands is the schema used for (un)marshalling sqlc.Task.Commands.
|
|
type Commands []Command
|
|
|
|
type Command struct {
|
|
Name string `json:"name"`
|
|
Parameters StringInterfaceMap `json:"parameters"`
|
|
}
|
|
|
|
func (c Commands) Value() (driver.Value, error) {
|
|
return json.Marshal(c)
|
|
}
|
|
func (c *Commands) Scan(value interface{}) error {
|
|
b, ok := value.([]byte)
|
|
if !ok {
|
|
return errors.New("type assertion to []byte failed")
|
|
}
|
|
return json.Unmarshal(b, &c)
|
|
}
|
|
|
|
func (js StringInterfaceMap) Value() (driver.Value, error) {
|
|
return json.Marshal(js)
|
|
}
|
|
func (js *StringInterfaceMap) Scan(value interface{}) error {
|
|
b, ok := value.([]byte)
|
|
if !ok {
|
|
return errors.New("type assertion to []byte failed")
|
|
}
|
|
return json.Unmarshal(b, &js)
|
|
}
|
|
|
|
func (js StringStringMap) Value() (driver.Value, error) {
|
|
return json.Marshal(js)
|
|
}
|
|
func (js *StringStringMap) Scan(value interface{}) error {
|
|
b, ok := value.([]byte)
|
|
if !ok {
|
|
return errors.New("type assertion to []byte failed")
|
|
}
|
|
return json.Unmarshal(b, &js)
|
|
}
|
|
|
|
// TaskFailure keeps track of which Worker failed which Task.
|
|
type TaskFailure = sqlc.TaskFailure
|
|
|
|
// StoreJob stores an AuthoredJob and its tasks, and saves it to the database.
|
|
// The job will be in 'under construction' status. It is up to the caller to transition it to its desired initial status.
|
|
func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error {
|
|
|
|
// Run all queries in a single transaction.
|
|
qtx, err := db.queriesWithTX()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer qtx.rollback()
|
|
|
|
// Serialise the embedded JSON.
|
|
settings, err := json.Marshal(authoredJob.Settings)
|
|
if err != nil {
|
|
return fmt.Errorf("converting job settings to JSON: %w", err)
|
|
}
|
|
metadata, err := json.Marshal(authoredJob.Metadata)
|
|
if err != nil {
|
|
return fmt.Errorf("converting job metadata to JSON: %w", err)
|
|
}
|
|
|
|
// Create the job itself.
|
|
params := sqlc.CreateJobParams{
|
|
CreatedAt: db.now(),
|
|
UUID: authoredJob.JobID,
|
|
Name: authoredJob.Name,
|
|
JobType: authoredJob.JobType,
|
|
Priority: int64(authoredJob.Priority),
|
|
Status: authoredJob.Status,
|
|
Settings: settings,
|
|
Metadata: metadata,
|
|
StorageShamanCheckoutID: authoredJob.Storage.ShamanCheckoutID,
|
|
}
|
|
|
|
if authoredJob.WorkerTagUUID != "" {
|
|
workerTag, err := qtx.queries.FetchWorkerTagByUUID(ctx, authoredJob.WorkerTagUUID)
|
|
switch {
|
|
case errors.Is(err, sql.ErrNoRows):
|
|
return fmt.Errorf("no worker tag %q found", authoredJob.WorkerTagUUID)
|
|
case err != nil:
|
|
return fmt.Errorf("could not find worker tag %q: %w", authoredJob.WorkerTagUUID, err)
|
|
}
|
|
params.WorkerTagID = sql.NullInt64{Int64: workerTag.ID, Valid: true}
|
|
}
|
|
|
|
log.Debug().
|
|
Str("job", params.UUID).
|
|
Str("type", params.JobType).
|
|
Str("name", params.Name).
|
|
Str("status", string(params.Status)).
|
|
Msg("persistence: storing authored job")
|
|
|
|
jobID, err := qtx.queries.CreateJob(ctx, params)
|
|
if err != nil {
|
|
return jobError(err, "storing job")
|
|
}
|
|
|
|
err = db.storeAuthoredJobTaks(ctx, qtx, jobID, &authoredJob)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return qtx.commit()
|
|
}
|
|
|
|
// StoreAuthoredJobTaks is a low-level function that is only used for recreating an existing job's tasks.
|
|
// It stores `authoredJob`'s tasks, but attaches them to the already-persisted `job`.
|
|
func (db *DB) StoreAuthoredJobTaks(
|
|
ctx context.Context,
|
|
job *Job,
|
|
authoredJob *job_compilers.AuthoredJob,
|
|
) error {
|
|
qtx, err := db.queriesWithTX()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer qtx.rollback()
|
|
|
|
err = db.storeAuthoredJobTaks(ctx, qtx, int64(job.ID), authoredJob)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return qtx.commit()
|
|
}
|
|
|
|
// storeAuthoredJobTaks stores the tasks of the authored job.
|
|
// Note that this function does NOT commit the database transaction. That is up
|
|
// to the caller.
|
|
func (db *DB) storeAuthoredJobTaks(
|
|
ctx context.Context,
|
|
qtx *queriesTX,
|
|
jobID int64,
|
|
authoredJob *job_compilers.AuthoredJob,
|
|
) error {
|
|
type TaskInfo struct {
|
|
ID int64
|
|
UUID string
|
|
Name string
|
|
}
|
|
|
|
// Give every task the same creation timestamp.
|
|
now := db.now()
|
|
|
|
uuidToTask := make(map[string]TaskInfo)
|
|
for taskIndex, authoredTask := range authoredJob.Tasks {
|
|
// Marshal commands to JSON.
|
|
var commands []Command
|
|
for _, authoredCommand := range authoredTask.Commands {
|
|
commands = append(commands, Command{
|
|
Name: authoredCommand.Name,
|
|
Parameters: StringInterfaceMap(authoredCommand.Parameters),
|
|
})
|
|
}
|
|
commandsJSON, err := json.Marshal(commands)
|
|
if err != nil {
|
|
return fmt.Errorf("could not convert commands of task %q to JSON: %w",
|
|
authoredTask.Name, err)
|
|
}
|
|
|
|
taskParams := sqlc.CreateTaskParams{
|
|
CreatedAt: now,
|
|
Name: authoredTask.Name,
|
|
Type: authoredTask.Type,
|
|
UUID: authoredTask.UUID,
|
|
JobID: jobID,
|
|
IndexInJob: int64(taskIndex + 1), // indexInJob is base-1.
|
|
Priority: int64(authoredTask.Priority),
|
|
Status: api.TaskStatusQueued,
|
|
Commands: commandsJSON,
|
|
// dependencies are stored below.
|
|
}
|
|
|
|
log.Debug().
|
|
Str("task", taskParams.UUID).
|
|
Str("type", taskParams.Type).
|
|
Str("name", taskParams.Name).
|
|
Str("status", string(taskParams.Status)).
|
|
Msg("persistence: storing authored task")
|
|
|
|
taskID, err := qtx.queries.CreateTask(ctx, taskParams)
|
|
if err != nil {
|
|
return taskError(err, "storing task: %v", err)
|
|
}
|
|
|
|
uuidToTask[authoredTask.UUID] = TaskInfo{
|
|
ID: taskID,
|
|
UUID: taskParams.UUID,
|
|
Name: taskParams.Name,
|
|
}
|
|
}
|
|
|
|
// Store the dependencies between tasks.
|
|
for _, authoredTask := range authoredJob.Tasks {
|
|
if len(authoredTask.Dependencies) == 0 {
|
|
continue
|
|
}
|
|
|
|
taskInfo, ok := uuidToTask[authoredTask.UUID]
|
|
if !ok {
|
|
return taskError(nil, "unable to find task %q in the database, even though it was just authored", authoredTask.UUID)
|
|
}
|
|
|
|
deps := make([]*TaskInfo, len(authoredTask.Dependencies))
|
|
for idx, authoredDep := range authoredTask.Dependencies {
|
|
depTask, ok := uuidToTask[authoredDep.UUID]
|
|
if !ok {
|
|
return taskError(nil, "finding task with UUID %q; a task depends on a task that is not part of this job", authoredDep.UUID)
|
|
}
|
|
|
|
err := qtx.queries.StoreTaskDependency(ctx, sqlc.StoreTaskDependencyParams{
|
|
TaskID: taskInfo.ID,
|
|
DependencyID: depTask.ID,
|
|
})
|
|
if err != nil {
|
|
return taskError(err, "error storing task %q depending on task %q", authoredTask.UUID, depTask.UUID)
|
|
}
|
|
|
|
deps[idx] = &depTask
|
|
}
|
|
|
|
if log.Debug().Enabled() {
|
|
depNames := make([]string, len(deps))
|
|
for i, dep := range deps {
|
|
depNames[i] = dep.Name
|
|
}
|
|
log.Debug().
|
|
Str("task", taskInfo.UUID).
|
|
Str("name", taskInfo.Name).
|
|
Strs("dependencies", depNames).
|
|
Msg("persistence: storing authored task dependencies")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// FetchJob fetches a single job, without fetching its tasks.
|
|
func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) {
|
|
queries := db.queries()
|
|
|
|
job, err := queries.FetchJob(ctx, jobUUID)
|
|
switch {
|
|
case errors.Is(err, sql.ErrNoRows):
|
|
return nil, ErrJobNotFound
|
|
case err != nil:
|
|
return nil, jobError(err, "fetching job")
|
|
}
|
|
|
|
return &job, nil
|
|
}
|
|
|
|
// FetchJob fetches a single job by its database ID, without fetching its tasks.
|
|
func (db *DB) FetchJobByID(ctx context.Context, jobID int64) (*Job, error) {
|
|
queries := db.queries()
|
|
|
|
job, err := queries.FetchJobByID(ctx, jobID)
|
|
switch {
|
|
case errors.Is(err, sql.ErrNoRows):
|
|
return nil, ErrJobNotFound
|
|
case err != nil:
|
|
return nil, jobError(err, "fetching job")
|
|
}
|
|
|
|
return &job, nil
|
|
}
|
|
|
|
func (db *DB) FetchJobs(ctx context.Context) ([]*Job, error) {
|
|
queries := db.queries()
|
|
|
|
sqlcJobs, err := queries.FetchJobs(ctx)
|
|
if err != nil {
|
|
return nil, jobError(err, "fetching all jobs")
|
|
}
|
|
|
|
// TODO: just return []Job instead of converting the array.
|
|
jobPointers := make([]*Job, len(sqlcJobs))
|
|
for index := range sqlcJobs {
|
|
jobPointers[index] = &sqlcJobs[index]
|
|
}
|
|
|
|
return jobPointers, nil
|
|
}
|
|
|
|
// FetchJobShamanCheckoutID fetches the job's Shaman Checkout ID.
|
|
func (db *DB) FetchJobShamanCheckoutID(ctx context.Context, jobUUID string) (string, error) {
|
|
queries := db.queries()
|
|
|
|
checkoutID, err := queries.FetchJobShamanCheckoutID(ctx, jobUUID)
|
|
switch {
|
|
case errors.Is(err, sql.ErrNoRows):
|
|
return "", ErrJobNotFound
|
|
case err != nil:
|
|
return "", jobError(err, "fetching job")
|
|
}
|
|
return checkoutID, nil
|
|
}
|
|
|
|
// DeleteJob deletes a job from the database.
|
|
// The deletion cascades to its tasks and other job-related tables.
|
|
func (db *DB) DeleteJob(ctx context.Context, jobUUID string) error {
|
|
// As a safety measure, refuse to delete jobs unless foreign key constraints are active.
|
|
fkEnabled, err := db.areForeignKeysEnabled(ctx)
|
|
switch {
|
|
case err != nil:
|
|
return err
|
|
case !fkEnabled:
|
|
return ErrDeletingWithoutFK
|
|
}
|
|
|
|
queries := db.queries()
|
|
|
|
if err := queries.DeleteJob(ctx, jobUUID); err != nil {
|
|
return jobError(err, "deleting job")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RequestJobDeletion sets the job's "DeletionRequestedAt" field to "now".
|
|
func (db *DB) RequestJobDeletion(ctx context.Context, j *Job) error {
|
|
queries := db.queries()
|
|
|
|
// Update the given job itself, so we don't have to re-fetch it from the database.
|
|
j.DeleteRequestedAt = db.nowNullable()
|
|
|
|
params := sqlc.RequestJobDeletionParams{
|
|
Now: j.DeleteRequestedAt,
|
|
JobID: int64(j.ID),
|
|
}
|
|
|
|
log.Trace().
|
|
Str("job", j.UUID).
|
|
Time("deletedAt", params.Now.Time).
|
|
Msg("database: marking job as deletion-requested")
|
|
if err := queries.RequestJobDeletion(ctx, params); err != nil {
|
|
return jobError(err, "queueing job for deletion")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RequestJobMassDeletion sets multiple job's "DeletionRequestedAt" field to "now".
|
|
// The list of affected job UUIDs is returned.
|
|
func (db *DB) RequestJobMassDeletion(ctx context.Context, lastUpdatedMax time.Time) ([]string, error) {
|
|
queries := db.queries()
|
|
|
|
// In order to be able to report which jobs were affected, first fetch the
|
|
// list of jobs, then update them.
|
|
uuids, err := queries.FetchJobUUIDsUpdatedBefore(ctx, sql.NullTime{
|
|
Time: lastUpdatedMax,
|
|
Valid: true,
|
|
})
|
|
switch {
|
|
case err != nil:
|
|
return nil, jobError(err, "fetching jobs by last-modified timestamp")
|
|
case len(uuids) == 0:
|
|
return nil, ErrJobNotFound
|
|
}
|
|
|
|
// Update the selected jobs.
|
|
params := sqlc.RequestMassJobDeletionParams{
|
|
Now: db.nowNullable(),
|
|
UUIDs: uuids,
|
|
}
|
|
if err := queries.RequestMassJobDeletion(ctx, params); err != nil {
|
|
return nil, jobError(err, "marking jobs as deletion-requested")
|
|
}
|
|
|
|
return uuids, nil
|
|
}
|
|
|
|
func (db *DB) FetchJobsDeletionRequested(ctx context.Context) ([]string, error) {
|
|
queries := db.queries()
|
|
|
|
uuids, err := queries.FetchJobsDeletionRequested(ctx)
|
|
if err != nil {
|
|
return nil, jobError(err, "fetching jobs marked for deletion")
|
|
}
|
|
return uuids, nil
|
|
}
|
|
|
|
func (db *DB) FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*Job, error) {
|
|
queries := db.queries()
|
|
|
|
jobs, err := queries.FetchJobsInStatus(ctx, jobStatuses)
|
|
if err != nil {
|
|
return nil, jobError(err, "fetching jobs in status %q", jobStatuses)
|
|
}
|
|
|
|
// TODO: just return []Job instead of converting the array.
|
|
pointers := make([]*Job, len(jobs))
|
|
for index := range jobs {
|
|
pointers[index] = &jobs[index]
|
|
}
|
|
|
|
return pointers, nil
|
|
}
|
|
|
|
// SaveJobStatus saves the job's Status and Activity fields.
|
|
func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error {
|
|
queries := db.queries()
|
|
|
|
params := sqlc.SaveJobStatusParams{
|
|
Now: db.nowNullable(),
|
|
ID: int64(j.ID),
|
|
Status: j.Status,
|
|
Activity: j.Activity,
|
|
}
|
|
|
|
err := queries.SaveJobStatus(ctx, params)
|
|
if err != nil {
|
|
return jobError(err, "saving job status")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SaveJobPriority saves the job's Priority field.
|
|
func (db *DB) SaveJobPriority(ctx context.Context, j *Job) error {
|
|
queries := db.queries()
|
|
|
|
params := sqlc.SaveJobPriorityParams{
|
|
Now: db.nowNullable(),
|
|
ID: int64(j.ID),
|
|
Priority: int64(j.Priority),
|
|
}
|
|
|
|
err := queries.SaveJobPriority(ctx, params)
|
|
if err != nil {
|
|
return jobError(err, "saving job priority")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SaveJobStorageInfo saves the job's Storage field.
|
|
// NOTE: this function does NOT update the job's `UpdatedAt` field. This is
|
|
// necessary for `cmd/shaman-checkout-id-setter` to do its work quietly.
|
|
func (db *DB) SaveJobStorageInfo(ctx context.Context, j *Job) error {
|
|
queries := db.queries()
|
|
|
|
params := sqlc.SaveJobStorageInfoParams{
|
|
ID: int64(j.ID),
|
|
StorageShamanCheckoutID: j.StorageShamanCheckoutID,
|
|
}
|
|
|
|
err := queries.SaveJobStorageInfo(ctx, params)
|
|
if err != nil {
|
|
return jobError(err, "saving job storage")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) FetchTask(ctx context.Context, taskUUID string) (TaskJobWorker, error) {
|
|
queries := db.queries()
|
|
|
|
taskRow, err := queries.FetchTask(ctx, taskUUID)
|
|
if err != nil {
|
|
return TaskJobWorker{}, taskError(err, "fetching task %s", taskUUID)
|
|
}
|
|
|
|
taskJobWorker := TaskJobWorker{
|
|
Task: taskRow.Task,
|
|
JobUUID: taskRow.JobUUID.String,
|
|
WorkerUUID: taskRow.WorkerUUID.String,
|
|
}
|
|
|
|
return taskJobWorker, nil
|
|
}
|
|
|
|
// FetchTaskJobUUID fetches the job UUID of the given task.
|
|
func (db *DB) FetchTaskJobUUID(ctx context.Context, taskUUID string) (string, error) {
|
|
queries := db.queries()
|
|
|
|
jobUUID, err := queries.FetchTaskJobUUID(ctx, taskUUID)
|
|
if err != nil {
|
|
return "", taskError(err, "fetching job UUID of task %s", taskUUID)
|
|
}
|
|
if !jobUUID.Valid {
|
|
return "", PersistenceError{Message: fmt.Sprintf("unable to find job of task %s", taskUUID)}
|
|
}
|
|
return jobUUID.String, nil
|
|
}
|
|
|
|
// SaveTask updates a task that already exists in the database.
|
|
// This function is not used by the Flamenco API, only by unit tests.
|
|
func (db *DB) SaveTask(ctx context.Context, t *Task) error {
|
|
if t.ID == 0 {
|
|
panic(fmt.Errorf("cannot use this function to insert a task"))
|
|
}
|
|
|
|
queries := db.queries()
|
|
|
|
commandsJSON, err := json.Marshal(t.Commands)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot convert commands to JSON: %w", err)
|
|
}
|
|
|
|
param := sqlc.UpdateTaskParams{
|
|
UpdatedAt: db.nowNullable(),
|
|
Name: t.Name,
|
|
Type: t.Type,
|
|
Priority: t.Priority,
|
|
Status: t.Status,
|
|
Commands: commandsJSON,
|
|
Activity: t.Activity,
|
|
ID: t.ID,
|
|
WorkerID: t.WorkerID,
|
|
LastTouchedAt: t.LastTouchedAt,
|
|
}
|
|
|
|
err = queries.UpdateTask(ctx, param)
|
|
if err != nil {
|
|
return taskError(err, "updating task")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) SaveTaskStatus(ctx context.Context, t *Task) error {
|
|
queries := db.queries()
|
|
|
|
err := queries.UpdateTaskStatus(ctx, sqlc.UpdateTaskStatusParams{
|
|
UpdatedAt: db.nowNullable(),
|
|
Status: t.Status,
|
|
ID: int64(t.ID),
|
|
})
|
|
if err != nil {
|
|
return taskError(err, "saving task status")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) SaveTaskActivity(ctx context.Context, t *Task) error {
|
|
queries := db.queries()
|
|
|
|
err := queries.UpdateTaskActivity(ctx, sqlc.UpdateTaskActivityParams{
|
|
UpdatedAt: db.nowNullable(),
|
|
Activity: t.Activity,
|
|
ID: int64(t.ID),
|
|
})
|
|
if err != nil {
|
|
return taskError(err, "saving task activity")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TaskAssignToWorker assigns the given task to the given worker.
|
|
// This function is only used by unit tests. During normal operation, Flamenco
|
|
// uses the code in task_scheduler.go to assign tasks to workers.
|
|
func (db *DB) TaskAssignToWorker(ctx context.Context, t *Task, w *Worker) error {
|
|
queries := db.queries()
|
|
|
|
t.WorkerID = sql.NullInt64{Int64: w.ID, Valid: true}
|
|
|
|
err := queries.TaskAssignToWorker(ctx, sqlc.TaskAssignToWorkerParams{
|
|
UpdatedAt: db.nowNullable(),
|
|
WorkerID: t.WorkerID,
|
|
ID: t.ID,
|
|
})
|
|
if err != nil {
|
|
return taskError(err, "assigning task %s to worker %s", t.UUID, w.UUID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) FetchTasksOfWorkerInStatus(ctx context.Context, worker *Worker, taskStatus api.TaskStatus) ([]TaskJob, error) {
|
|
queries := db.queries()
|
|
|
|
rows, err := queries.FetchTasksOfWorkerInStatus(ctx, sqlc.FetchTasksOfWorkerInStatusParams{
|
|
WorkerID: sql.NullInt64{
|
|
Int64: int64(worker.ID),
|
|
Valid: true,
|
|
},
|
|
TaskStatus: taskStatus,
|
|
})
|
|
if err != nil {
|
|
return nil, taskError(err, "finding tasks of worker %s in status %q", worker.UUID, taskStatus)
|
|
}
|
|
|
|
result := make([]TaskJob, len(rows))
|
|
for i := range rows {
|
|
result[i].Task = rows[i].Task
|
|
result[i].JobUUID = rows[i].JobUUID
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (db *DB) FetchTasksOfWorkerInStatusOfJob(ctx context.Context, worker *Worker, taskStatus api.TaskStatus, jobUUID string) ([]*Task, error) {
|
|
queries := db.queries()
|
|
|
|
rows, err := queries.FetchTasksOfWorkerInStatusOfJob(ctx, sqlc.FetchTasksOfWorkerInStatusOfJobParams{
|
|
WorkerID: sql.NullInt64{
|
|
Int64: int64(worker.ID),
|
|
Valid: true,
|
|
},
|
|
JobUUID: jobUUID,
|
|
TaskStatus: taskStatus,
|
|
})
|
|
if err != nil {
|
|
return nil, taskError(err, "finding tasks of worker %s in status %q and job %s", worker.UUID, taskStatus, jobUUID)
|
|
}
|
|
|
|
// TODO: just return []Task instead of creating an array of pointers.
|
|
result := make([]*Task, len(rows))
|
|
for i := range rows {
|
|
result[i] = &rows[i].Task
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (db *DB) JobHasTasksInStatus(ctx context.Context, job *Job, taskStatus api.TaskStatus) (bool, error) {
|
|
queries := db.queries()
|
|
|
|
count, err := queries.JobCountTasksInStatus(ctx, sqlc.JobCountTasksInStatusParams{
|
|
JobID: int64(job.ID),
|
|
TaskStatus: taskStatus,
|
|
})
|
|
if err != nil {
|
|
return false, taskError(err, "counting tasks of job %s in status %q", job.UUID, taskStatus)
|
|
}
|
|
|
|
return count > 0, nil
|
|
}
|
|
|
|
// CountTasksOfJobInStatus counts the number of tasks in the job.
|
|
// It returns two counts, one is the number of tasks in the given statuses, the
|
|
// other is the total number of tasks of the job.
|
|
func (db *DB) CountTasksOfJobInStatus(
|
|
ctx context.Context,
|
|
job *Job,
|
|
taskStatuses ...api.TaskStatus,
|
|
) (numInStatus, numTotal int, err error) {
|
|
queries := db.queries()
|
|
|
|
results, err := queries.JobCountTaskStatuses(ctx, int64(job.ID))
|
|
if err != nil {
|
|
return 0, 0, jobError(err, "count tasks of job %s in status %q", job.UUID, taskStatuses)
|
|
}
|
|
|
|
// Create lookup table for which statuses to count.
|
|
countStatus := map[api.TaskStatus]bool{}
|
|
for _, status := range taskStatuses {
|
|
countStatus[status] = true
|
|
}
|
|
|
|
// Count the number of tasks per status.
|
|
for _, result := range results {
|
|
if countStatus[api.TaskStatus(result.Status)] {
|
|
numInStatus += int(result.NumTasks)
|
|
}
|
|
numTotal += int(result.NumTasks)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// FetchTaskIDsOfJob returns all tasks of the given job.
|
|
func (db *DB) FetchTasksOfJob(ctx context.Context, job *Job) ([]TaskJobWorker, error) {
|
|
queries := db.queries()
|
|
|
|
rows, err := queries.FetchTasksOfJob(ctx, int64(job.ID))
|
|
if err != nil {
|
|
return nil, taskError(err, "fetching tasks of job %s", job.UUID)
|
|
}
|
|
|
|
result := make([]TaskJobWorker, len(rows))
|
|
for i := range rows {
|
|
result[i].Task = rows[i].Task
|
|
result[i].JobUUID = job.UUID
|
|
result[i].WorkerUUID = rows[i].WorkerUUID.String
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// FetchTasksOfJobInStatus returns those tasks of the given job that have any of the given statuses.
|
|
func (db *DB) FetchTasksOfJobInStatus(ctx context.Context, job *Job, taskStatuses ...api.TaskStatus) ([]TaskJobWorker, error) {
|
|
queries := db.queries()
|
|
|
|
rows, err := queries.FetchTasksOfJobInStatus(ctx, sqlc.FetchTasksOfJobInStatusParams{
|
|
JobID: int64(job.ID),
|
|
TaskStatus: taskStatuses,
|
|
})
|
|
if err != nil {
|
|
return nil, taskError(err, "fetching tasks of job %s in status %q", job.UUID, taskStatuses)
|
|
}
|
|
|
|
result := make([]TaskJobWorker, len(rows))
|
|
for i := range rows {
|
|
result[i].Task = rows[i].Task
|
|
result[i].JobUUID = job.UUID
|
|
result[i].WorkerUUID = rows[i].WorkerUUID.String
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// UpdateJobsTaskStatuses updates the status & activity of all tasks of `job`.
|
|
func (db *DB) UpdateJobsTaskStatuses(ctx context.Context, job *Job,
|
|
taskStatus api.TaskStatus, activity string) error {
|
|
|
|
if taskStatus == "" {
|
|
return taskError(nil, "empty status not allowed")
|
|
}
|
|
|
|
queries := db.queries()
|
|
|
|
err := queries.UpdateJobsTaskStatuses(ctx, sqlc.UpdateJobsTaskStatusesParams{
|
|
UpdatedAt: db.nowNullable(),
|
|
Status: taskStatus,
|
|
Activity: activity,
|
|
JobID: int64(job.ID),
|
|
})
|
|
|
|
if err != nil {
|
|
return taskError(err, "updating status of all tasks of job %s", job.UUID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateJobsTaskStatusesConditional updates the status & activity of the tasks of `job`,
|
|
// limited to those tasks with status in `statusesToUpdate`.
|
|
func (db *DB) UpdateJobsTaskStatusesConditional(ctx context.Context, job *Job,
|
|
statusesToUpdate []api.TaskStatus, taskStatus api.TaskStatus, activity string) error {
|
|
|
|
if taskStatus == "" {
|
|
return taskError(nil, "empty status not allowed")
|
|
}
|
|
|
|
queries := db.queries()
|
|
|
|
err := queries.UpdateJobsTaskStatusesConditional(ctx, sqlc.UpdateJobsTaskStatusesConditionalParams{
|
|
UpdatedAt: db.nowNullable(),
|
|
Status: taskStatus,
|
|
Activity: activity,
|
|
JobID: int64(job.ID),
|
|
StatusesToUpdate: statusesToUpdate,
|
|
})
|
|
|
|
if err != nil {
|
|
return taskError(err, "updating status of all tasks in status %v of job %s", statusesToUpdate, job.UUID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TaskTouchedByWorker marks the task as 'touched' by a worker. This is used for timeout detection.
|
|
func (db *DB) TaskTouchedByWorker(ctx context.Context, taskUUID string) error {
|
|
queries := db.queries()
|
|
|
|
now := db.nowNullable()
|
|
err := queries.TaskTouchedByWorker(ctx, sqlc.TaskTouchedByWorkerParams{
|
|
UpdatedAt: now,
|
|
LastTouchedAt: now,
|
|
UUID: taskUUID,
|
|
})
|
|
if err != nil {
|
|
return taskError(err, "saving task 'last touched at'")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AddWorkerToTaskFailedList records that the given worker failed the given task.
|
|
// This information is not used directly by the task scheduler. It's used to
|
|
// determine whether there are any workers left to perform this task, and thus
|
|
// whether it should be hard- or soft-failed.
|
|
//
|
|
// Calling this multiple times with the same task/worker is a no-op.
|
|
//
|
|
// Returns the new number of workers that failed this task.
|
|
func (db *DB) AddWorkerToTaskFailedList(ctx context.Context, t *Task, w *Worker) (numFailed int, err error) {
|
|
queries := db.queries()
|
|
|
|
err = queries.AddWorkerToTaskFailedList(ctx, sqlc.AddWorkerToTaskFailedListParams{
|
|
CreatedAt: db.nowNullable().Time,
|
|
TaskID: int64(t.ID),
|
|
WorkerID: int64(w.ID),
|
|
})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
numFailed64, err := queries.CountWorkersFailingTask(ctx, int64(t.ID))
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Integer literals are of type `int`, so that's just a bit nicer to work with
|
|
// than `int64`.
|
|
if numFailed64 > math.MaxInt32 {
|
|
log.Warn().Int64("numFailed", numFailed64).Msg("number of failed workers is crazy high, something is wrong here")
|
|
return math.MaxInt32, nil
|
|
}
|
|
return int(numFailed64), nil
|
|
}
|
|
|
|
// ClearFailureListOfTask clears the list of workers that failed this task.
|
|
func (db *DB) ClearFailureListOfTask(ctx context.Context, t *Task) error {
|
|
queries := db.queries()
|
|
|
|
return queries.ClearFailureListOfTask(ctx, int64(t.ID))
|
|
}
|
|
|
|
// ClearFailureListOfJob en-mass, for all tasks of this job, clears the list of
|
|
// workers that failed those tasks.
|
|
func (db *DB) ClearFailureListOfJob(ctx context.Context, j *Job) error {
|
|
queries := db.queries()
|
|
|
|
return queries.ClearFailureListOfJob(ctx, int64(j.ID))
|
|
}
|
|
|
|
func (db *DB) FetchTaskFailureList(ctx context.Context, t *Task) ([]*Worker, error) {
|
|
queries := db.queries()
|
|
|
|
failureList, err := queries.FetchTaskFailureList(ctx, int64(t.ID))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
workers := make([]*Worker, len(failureList))
|
|
for idx := range failureList {
|
|
workers[idx] = &failureList[idx].Worker
|
|
}
|
|
return workers, nil
|
|
}
|