flamenco/internal/manager/persistence/sqlc/query_task_scheduler.sql.go
Sybren A. Stüvel bfe47ea394 Manager: convert task scheduler from gorm to sqlc
Convert the task scheduler from gorm to sqlc. This makes the query
considerably easier to read.

No functional changes intended.
2024-06-30 21:18:08 +02:00

192 lines
6.4 KiB
Go

// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.26.0
// source: query_task_scheduler.sql
package sqlc
import (
"context"
"database/sql"
"strings"
)
const assignTaskToWorker = `-- name: AssignTaskToWorker :exec
UPDATE tasks
SET worker_id=?1, last_touched_at=?2, updated_at=?2
WHERE tasks.id=?3
`
type AssignTaskToWorkerParams struct {
WorkerID sql.NullInt64
Now sql.NullTime
TaskID int64
}
func (q *Queries) AssignTaskToWorker(ctx context.Context, arg AssignTaskToWorkerParams) error {
_, err := q.db.ExecContext(ctx, assignTaskToWorker, arg.WorkerID, arg.Now, arg.TaskID)
return err
}
const fetchAssignedAndRunnableTaskOfWorker = `-- name: FetchAssignedAndRunnableTaskOfWorker :one
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity
FROM tasks
INNER JOIN jobs ON tasks.job_id = jobs.id
WHERE tasks.status=?1
AND tasks.worker_id=?2
AND jobs.status IN (/*SLICE:active_job_statuses*/?)
LIMIT 1
`
type FetchAssignedAndRunnableTaskOfWorkerParams struct {
ActiveTaskStatus string
WorkerID sql.NullInt64
ActiveJobStatuses []string
}
type FetchAssignedAndRunnableTaskOfWorkerRow struct {
Task Task
}
// Fetch a task that's assigned to this worker, and is in a runnable state.
func (q *Queries) FetchAssignedAndRunnableTaskOfWorker(ctx context.Context, arg FetchAssignedAndRunnableTaskOfWorkerParams) (FetchAssignedAndRunnableTaskOfWorkerRow, error) {
query := fetchAssignedAndRunnableTaskOfWorker
var queryParams []interface{}
queryParams = append(queryParams, arg.ActiveTaskStatus)
queryParams = append(queryParams, arg.WorkerID)
if len(arg.ActiveJobStatuses) > 0 {
for _, v := range arg.ActiveJobStatuses {
queryParams = append(queryParams, v)
}
query = strings.Replace(query, "/*SLICE:active_job_statuses*/?", strings.Repeat(",?", len(arg.ActiveJobStatuses))[1:], 1)
} else {
query = strings.Replace(query, "/*SLICE:active_job_statuses*/?", "NULL", 1)
}
row := q.db.QueryRowContext(ctx, query, queryParams...)
var i FetchAssignedAndRunnableTaskOfWorkerRow
err := row.Scan(
&i.Task.ID,
&i.Task.CreatedAt,
&i.Task.UpdatedAt,
&i.Task.UUID,
&i.Task.Name,
&i.Task.Type,
&i.Task.JobID,
&i.Task.Priority,
&i.Task.Status,
&i.Task.WorkerID,
&i.Task.LastTouchedAt,
&i.Task.Commands,
&i.Task.Activity,
)
return i, err
}
const findRunnableTask = `-- name: FindRunnableTask :one
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity
FROM tasks
INNER JOIN jobs ON tasks.job_id = jobs.id
LEFT JOIN task_failures TF ON tasks.id = TF.task_id AND TF.worker_id=?1
WHERE TF.worker_id IS NULL -- Not failed by this worker before.
AND tasks.id NOT IN (
-- Find all tasks IDs that have incomplete dependencies. These are not runnable.
SELECT tasks_incomplete.id
FROM tasks AS tasks_incomplete
INNER JOIN task_dependencies td ON tasks_incomplete.id = td.task_id
INNER JOIN tasks dep ON dep.id = td.dependency_id
WHERE dep.status != ?2
)
AND tasks.type NOT IN (
SELECT task_type
FROM job_blocks
WHERE job_blocks.worker_id = ?1
AND job_blocks.job_id = jobs.id
)
AND (
jobs.worker_tag_id IS NULL
OR jobs.worker_tag_id IN (/*SLICE:worker_tags*/?))
AND tasks.status IN (/*SLICE:schedulable_task_statuses*/?)
AND jobs.status IN (/*SLICE:schedulable_job_statuses*/?)
AND tasks.type IN (/*SLICE:supported_task_types*/?)
ORDER BY jobs.priority DESC, tasks.priority DESC
`
type FindRunnableTaskParams struct {
WorkerID int64
TaskStatusCompleted string
WorkerTags []sql.NullInt64
SchedulableTaskStatuses []string
SchedulableJobStatuses []string
SupportedTaskTypes []string
}
type FindRunnableTaskRow struct {
Task Task
}
// Find a task to be run by a worker. This is the core of the task scheduler.
//
// Note that this query doesn't check for the assigned worker. Tasks that have a
// 'schedulable' status might have been assigned to a worker, representing the
// last worker to touch it -- it's not meant to indicate "ownership" of the
// task.
//
// The order in the WHERE clause is important, slices should come last. See
// https://github.com/sqlc-dev/sqlc/issues/2452 for more info.
func (q *Queries) FindRunnableTask(ctx context.Context, arg FindRunnableTaskParams) (FindRunnableTaskRow, error) {
query := findRunnableTask
var queryParams []interface{}
queryParams = append(queryParams, arg.WorkerID)
queryParams = append(queryParams, arg.TaskStatusCompleted)
if len(arg.WorkerTags) > 0 {
for _, v := range arg.WorkerTags {
queryParams = append(queryParams, v)
}
query = strings.Replace(query, "/*SLICE:worker_tags*/?", strings.Repeat(",?", len(arg.WorkerTags))[1:], 1)
} else {
query = strings.Replace(query, "/*SLICE:worker_tags*/?", "NULL", 1)
}
if len(arg.SchedulableTaskStatuses) > 0 {
for _, v := range arg.SchedulableTaskStatuses {
queryParams = append(queryParams, v)
}
query = strings.Replace(query, "/*SLICE:schedulable_task_statuses*/?", strings.Repeat(",?", len(arg.SchedulableTaskStatuses))[1:], 1)
} else {
query = strings.Replace(query, "/*SLICE:schedulable_task_statuses*/?", "NULL", 1)
}
if len(arg.SchedulableJobStatuses) > 0 {
for _, v := range arg.SchedulableJobStatuses {
queryParams = append(queryParams, v)
}
query = strings.Replace(query, "/*SLICE:schedulable_job_statuses*/?", strings.Repeat(",?", len(arg.SchedulableJobStatuses))[1:], 1)
} else {
query = strings.Replace(query, "/*SLICE:schedulable_job_statuses*/?", "NULL", 1)
}
if len(arg.SupportedTaskTypes) > 0 {
for _, v := range arg.SupportedTaskTypes {
queryParams = append(queryParams, v)
}
query = strings.Replace(query, "/*SLICE:supported_task_types*/?", strings.Repeat(",?", len(arg.SupportedTaskTypes))[1:], 1)
} else {
query = strings.Replace(query, "/*SLICE:supported_task_types*/?", "NULL", 1)
}
row := q.db.QueryRowContext(ctx, query, queryParams...)
var i FindRunnableTaskRow
err := row.Scan(
&i.Task.ID,
&i.Task.CreatedAt,
&i.Task.UpdatedAt,
&i.Task.UUID,
&i.Task.Name,
&i.Task.Type,
&i.Task.JobID,
&i.Task.Priority,
&i.Task.Status,
&i.Task.WorkerID,
&i.Task.LastTouchedAt,
&i.Task.Commands,
&i.Task.Activity,
)
return i, err
}