
Replace old used-to-be-GORM datastructures (#104305) with sqlc-generated structs. This also makes it possible to use more specific structs that are more taylored to the specific queries, increasing efficiency. This commit deals with the remaining areas, like the job deleter, task timeout checker, and task state machine. And anything else to get things running again. Functional changes are kept to a minimum, as the API still serves the same data. Because this work covers so much of Flamenco's code, it's been split up into different commits. Each commit brings Flamenco to a state where it compiles and unit tests pass. Only the result of the final commit has actually been tested properly. Ref: #104343
260 lines
8.3 KiB
Go
Generated
260 lines
8.3 KiB
Go
Generated
// Code generated by sqlc. DO NOT EDIT.
|
|
// versions:
|
|
// sqlc v1.26.0
|
|
// source: query_task_scheduler.sql
|
|
|
|
package sqlc
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"strings"
|
|
|
|
"projects.blender.org/studio/flamenco/pkg/api"
|
|
)
|
|
|
|
const assignTaskToWorker = `-- name: AssignTaskToWorker :exec
|
|
UPDATE tasks
|
|
SET worker_id=?1, last_touched_at=?2, updated_at=?2
|
|
WHERE tasks.id=?3
|
|
`
|
|
|
|
type AssignTaskToWorkerParams struct {
|
|
WorkerID sql.NullInt64
|
|
Now sql.NullTime
|
|
TaskID int64
|
|
}
|
|
|
|
func (q *Queries) AssignTaskToWorker(ctx context.Context, arg AssignTaskToWorkerParams) error {
|
|
_, err := q.db.ExecContext(ctx, assignTaskToWorker, arg.WorkerID, arg.Now, arg.TaskID)
|
|
return err
|
|
}
|
|
|
|
const fetchAssignedAndRunnableTaskOfWorker = `-- name: FetchAssignedAndRunnableTaskOfWorker :one
|
|
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.index_in_job, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, jobs.uuid as jobuuid, jobs.priority as job_priority, jobs.job_type as job_type
|
|
FROM tasks
|
|
INNER JOIN jobs ON tasks.job_id = jobs.id
|
|
WHERE tasks.status=?1
|
|
AND tasks.worker_id=?2
|
|
AND jobs.status IN (/*SLICE:active_job_statuses*/?)
|
|
LIMIT 1
|
|
`
|
|
|
|
type FetchAssignedAndRunnableTaskOfWorkerParams struct {
|
|
ActiveTaskStatus api.TaskStatus
|
|
WorkerID sql.NullInt64
|
|
ActiveJobStatuses []api.JobStatus
|
|
}
|
|
|
|
type FetchAssignedAndRunnableTaskOfWorkerRow struct {
|
|
Task Task
|
|
JobUUID string
|
|
JobPriority int64
|
|
JobType string
|
|
}
|
|
|
|
// Fetch a task that's assigned to this worker, and is in a runnable state.
|
|
func (q *Queries) FetchAssignedAndRunnableTaskOfWorker(ctx context.Context, arg FetchAssignedAndRunnableTaskOfWorkerParams) (FetchAssignedAndRunnableTaskOfWorkerRow, error) {
|
|
query := fetchAssignedAndRunnableTaskOfWorker
|
|
var queryParams []interface{}
|
|
queryParams = append(queryParams, arg.ActiveTaskStatus)
|
|
queryParams = append(queryParams, arg.WorkerID)
|
|
if len(arg.ActiveJobStatuses) > 0 {
|
|
for _, v := range arg.ActiveJobStatuses {
|
|
queryParams = append(queryParams, v)
|
|
}
|
|
query = strings.Replace(query, "/*SLICE:active_job_statuses*/?", strings.Repeat(",?", len(arg.ActiveJobStatuses))[1:], 1)
|
|
} else {
|
|
query = strings.Replace(query, "/*SLICE:active_job_statuses*/?", "NULL", 1)
|
|
}
|
|
row := q.db.QueryRowContext(ctx, query, queryParams...)
|
|
var i FetchAssignedAndRunnableTaskOfWorkerRow
|
|
err := row.Scan(
|
|
&i.Task.ID,
|
|
&i.Task.CreatedAt,
|
|
&i.Task.UpdatedAt,
|
|
&i.Task.UUID,
|
|
&i.Task.Name,
|
|
&i.Task.Type,
|
|
&i.Task.JobID,
|
|
&i.Task.IndexInJob,
|
|
&i.Task.Priority,
|
|
&i.Task.Status,
|
|
&i.Task.WorkerID,
|
|
&i.Task.LastTouchedAt,
|
|
&i.Task.Commands,
|
|
&i.Task.Activity,
|
|
&i.JobUUID,
|
|
&i.JobPriority,
|
|
&i.JobType,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const fetchWorkerTask = `-- name: FetchWorkerTask :one
|
|
SELECT
|
|
tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.index_in_job, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity,
|
|
jobs.uuid as jobuuid,
|
|
CAST(tasks.status = ?1 AND jobs.status = ?2 AS BOOLEAN) as is_active
|
|
FROM tasks
|
|
INNER JOIN jobs ON tasks.job_id = jobs.id
|
|
WHERE
|
|
tasks.worker_id = ?3
|
|
ORDER BY
|
|
is_active DESC,
|
|
tasks.updated_at DESC
|
|
LIMIT 1
|
|
`
|
|
|
|
type FetchWorkerTaskParams struct {
|
|
TaskStatusActive api.TaskStatus
|
|
JobStatusActive api.JobStatus
|
|
WorkerID sql.NullInt64
|
|
}
|
|
|
|
type FetchWorkerTaskRow struct {
|
|
Task Task
|
|
JobUUID string
|
|
IsActive bool
|
|
}
|
|
|
|
// Find the currently-active task assigned to a Worker. If not found, find the last task this Worker worked on.
|
|
func (q *Queries) FetchWorkerTask(ctx context.Context, arg FetchWorkerTaskParams) (FetchWorkerTaskRow, error) {
|
|
row := q.db.QueryRowContext(ctx, fetchWorkerTask, arg.TaskStatusActive, arg.JobStatusActive, arg.WorkerID)
|
|
var i FetchWorkerTaskRow
|
|
err := row.Scan(
|
|
&i.Task.ID,
|
|
&i.Task.CreatedAt,
|
|
&i.Task.UpdatedAt,
|
|
&i.Task.UUID,
|
|
&i.Task.Name,
|
|
&i.Task.Type,
|
|
&i.Task.JobID,
|
|
&i.Task.IndexInJob,
|
|
&i.Task.Priority,
|
|
&i.Task.Status,
|
|
&i.Task.WorkerID,
|
|
&i.Task.LastTouchedAt,
|
|
&i.Task.Commands,
|
|
&i.Task.Activity,
|
|
&i.JobUUID,
|
|
&i.IsActive,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const findRunnableTask = `-- name: FindRunnableTask :one
|
|
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.index_in_job, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, jobs.uuid as jobuuid, jobs.priority as job_priority, jobs.job_type as job_type
|
|
FROM tasks
|
|
INNER JOIN jobs ON tasks.job_id = jobs.id
|
|
LEFT JOIN task_failures TF ON tasks.id = TF.task_id AND TF.worker_id=?1
|
|
WHERE TF.worker_id IS NULL -- Not failed by this worker before.
|
|
AND tasks.id NOT IN (
|
|
-- Find all tasks IDs that have incomplete dependencies. These are not runnable.
|
|
SELECT tasks_incomplete.id
|
|
FROM tasks AS tasks_incomplete
|
|
INNER JOIN task_dependencies td ON tasks_incomplete.id = td.task_id
|
|
INNER JOIN tasks dep ON dep.id = td.dependency_id
|
|
WHERE dep.status != ?2
|
|
)
|
|
AND tasks.type NOT IN (
|
|
SELECT task_type
|
|
FROM job_blocks
|
|
WHERE job_blocks.worker_id = ?1
|
|
AND job_blocks.job_id = jobs.id
|
|
)
|
|
AND (
|
|
jobs.worker_tag_id IS NULL
|
|
OR jobs.worker_tag_id IN (/*SLICE:worker_tags*/?))
|
|
AND tasks.status IN (/*SLICE:schedulable_task_statuses*/?)
|
|
AND jobs.status IN (/*SLICE:schedulable_job_statuses*/?)
|
|
AND tasks.type IN (/*SLICE:supported_task_types*/?)
|
|
ORDER BY jobs.priority DESC, tasks.priority DESC
|
|
`
|
|
|
|
type FindRunnableTaskParams struct {
|
|
WorkerID int64
|
|
TaskStatusCompleted api.TaskStatus
|
|
WorkerTags []sql.NullInt64
|
|
SchedulableTaskStatuses []api.TaskStatus
|
|
SchedulableJobStatuses []api.JobStatus
|
|
SupportedTaskTypes []string
|
|
}
|
|
|
|
type FindRunnableTaskRow struct {
|
|
Task Task
|
|
JobUUID string
|
|
JobPriority int64
|
|
JobType string
|
|
}
|
|
|
|
// Find a task to be run by a worker. This is the core of the task scheduler.
|
|
//
|
|
// Note that this query doesn't check for the assigned worker. Tasks that have a
|
|
// 'schedulable' status might have been assigned to a worker, representing the
|
|
// last worker to touch it -- it's not meant to indicate "ownership" of the
|
|
// task.
|
|
//
|
|
// The order in the WHERE clause is important, slices should come last. See
|
|
// https://github.com/sqlc-dev/sqlc/issues/2452 for more info.
|
|
func (q *Queries) FindRunnableTask(ctx context.Context, arg FindRunnableTaskParams) (FindRunnableTaskRow, error) {
|
|
query := findRunnableTask
|
|
var queryParams []interface{}
|
|
queryParams = append(queryParams, arg.WorkerID)
|
|
queryParams = append(queryParams, arg.TaskStatusCompleted)
|
|
if len(arg.WorkerTags) > 0 {
|
|
for _, v := range arg.WorkerTags {
|
|
queryParams = append(queryParams, v)
|
|
}
|
|
query = strings.Replace(query, "/*SLICE:worker_tags*/?", strings.Repeat(",?", len(arg.WorkerTags))[1:], 1)
|
|
} else {
|
|
query = strings.Replace(query, "/*SLICE:worker_tags*/?", "NULL", 1)
|
|
}
|
|
if len(arg.SchedulableTaskStatuses) > 0 {
|
|
for _, v := range arg.SchedulableTaskStatuses {
|
|
queryParams = append(queryParams, v)
|
|
}
|
|
query = strings.Replace(query, "/*SLICE:schedulable_task_statuses*/?", strings.Repeat(",?", len(arg.SchedulableTaskStatuses))[1:], 1)
|
|
} else {
|
|
query = strings.Replace(query, "/*SLICE:schedulable_task_statuses*/?", "NULL", 1)
|
|
}
|
|
if len(arg.SchedulableJobStatuses) > 0 {
|
|
for _, v := range arg.SchedulableJobStatuses {
|
|
queryParams = append(queryParams, v)
|
|
}
|
|
query = strings.Replace(query, "/*SLICE:schedulable_job_statuses*/?", strings.Repeat(",?", len(arg.SchedulableJobStatuses))[1:], 1)
|
|
} else {
|
|
query = strings.Replace(query, "/*SLICE:schedulable_job_statuses*/?", "NULL", 1)
|
|
}
|
|
if len(arg.SupportedTaskTypes) > 0 {
|
|
for _, v := range arg.SupportedTaskTypes {
|
|
queryParams = append(queryParams, v)
|
|
}
|
|
query = strings.Replace(query, "/*SLICE:supported_task_types*/?", strings.Repeat(",?", len(arg.SupportedTaskTypes))[1:], 1)
|
|
} else {
|
|
query = strings.Replace(query, "/*SLICE:supported_task_types*/?", "NULL", 1)
|
|
}
|
|
row := q.db.QueryRowContext(ctx, query, queryParams...)
|
|
var i FindRunnableTaskRow
|
|
err := row.Scan(
|
|
&i.Task.ID,
|
|
&i.Task.CreatedAt,
|
|
&i.Task.UpdatedAt,
|
|
&i.Task.UUID,
|
|
&i.Task.Name,
|
|
&i.Task.Type,
|
|
&i.Task.JobID,
|
|
&i.Task.IndexInJob,
|
|
&i.Task.Priority,
|
|
&i.Task.Status,
|
|
&i.Task.WorkerID,
|
|
&i.Task.LastTouchedAt,
|
|
&i.Task.Commands,
|
|
&i.Task.Activity,
|
|
&i.JobUUID,
|
|
&i.JobPriority,
|
|
&i.JobType,
|
|
)
|
|
return i, err
|
|
}
|