256 lines
8.2 KiB
Go
256 lines
8.2 KiB
Go
// Code generated by sqlc. DO NOT EDIT.
|
|
// versions:
|
|
// sqlc v1.26.0
|
|
// source: query_task_scheduler.sql
|
|
|
|
package sqlc
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"strings"
|
|
)
|
|
|
|
const assignTaskToWorker = `-- name: AssignTaskToWorker :exec
|
|
UPDATE tasks
|
|
SET worker_id=?1, last_touched_at=?2, updated_at=?2
|
|
WHERE tasks.id=?3
|
|
`
|
|
|
|
type AssignTaskToWorkerParams struct {
|
|
WorkerID sql.NullInt64
|
|
Now sql.NullTime
|
|
TaskID int64
|
|
}
|
|
|
|
func (q *Queries) AssignTaskToWorker(ctx context.Context, arg AssignTaskToWorkerParams) error {
|
|
_, err := q.db.ExecContext(ctx, assignTaskToWorker, arg.WorkerID, arg.Now, arg.TaskID)
|
|
return err
|
|
}
|
|
|
|
const fetchAssignedAndRunnableTaskOfWorker = `-- name: FetchAssignedAndRunnableTaskOfWorker :one
|
|
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity
|
|
FROM tasks
|
|
INNER JOIN jobs ON tasks.job_id = jobs.id
|
|
WHERE tasks.status=?1
|
|
AND tasks.worker_id=?2
|
|
AND jobs.status IN (/*SLICE:active_job_statuses*/?)
|
|
LIMIT 1
|
|
`
|
|
|
|
type FetchAssignedAndRunnableTaskOfWorkerParams struct {
|
|
ActiveTaskStatus string
|
|
WorkerID sql.NullInt64
|
|
ActiveJobStatuses []string
|
|
}
|
|
|
|
type FetchAssignedAndRunnableTaskOfWorkerRow struct {
|
|
Task Task
|
|
}
|
|
|
|
// Fetch a task that's assigned to this worker, and is in a runnable state.
|
|
func (q *Queries) FetchAssignedAndRunnableTaskOfWorker(ctx context.Context, arg FetchAssignedAndRunnableTaskOfWorkerParams) (FetchAssignedAndRunnableTaskOfWorkerRow, error) {
|
|
query := fetchAssignedAndRunnableTaskOfWorker
|
|
var queryParams []interface{}
|
|
queryParams = append(queryParams, arg.ActiveTaskStatus)
|
|
queryParams = append(queryParams, arg.WorkerID)
|
|
if len(arg.ActiveJobStatuses) > 0 {
|
|
for _, v := range arg.ActiveJobStatuses {
|
|
queryParams = append(queryParams, v)
|
|
}
|
|
query = strings.Replace(query, "/*SLICE:active_job_statuses*/?", strings.Repeat(",?", len(arg.ActiveJobStatuses))[1:], 1)
|
|
} else {
|
|
query = strings.Replace(query, "/*SLICE:active_job_statuses*/?", "NULL", 1)
|
|
}
|
|
row := q.db.QueryRowContext(ctx, query, queryParams...)
|
|
var i FetchAssignedAndRunnableTaskOfWorkerRow
|
|
err := row.Scan(
|
|
&i.Task.ID,
|
|
&i.Task.CreatedAt,
|
|
&i.Task.UpdatedAt,
|
|
&i.Task.UUID,
|
|
&i.Task.Name,
|
|
&i.Task.Type,
|
|
&i.Task.JobID,
|
|
&i.Task.Priority,
|
|
&i.Task.Status,
|
|
&i.Task.WorkerID,
|
|
&i.Task.LastTouchedAt,
|
|
&i.Task.Commands,
|
|
&i.Task.Activity,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const fetchWorkerTask = `-- name: FetchWorkerTask :one
|
|
SELECT
|
|
tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity,
|
|
jobs.id, jobs.created_at, jobs.updated_at, jobs.uuid, jobs.name, jobs.job_type, jobs.priority, jobs.status, jobs.activity, jobs.settings, jobs.metadata, jobs.delete_requested_at, jobs.storage_shaman_checkout_id, jobs.worker_tag_id,
|
|
(tasks.status = ?1 AND jobs.status = ?2) as is_active
|
|
FROM tasks
|
|
INNER JOIN jobs ON tasks.job_id = jobs.id
|
|
WHERE
|
|
tasks.worker_id = ?3
|
|
ORDER BY
|
|
is_active DESC,
|
|
tasks.updated_at DESC
|
|
LIMIT 1
|
|
`
|
|
|
|
type FetchWorkerTaskParams struct {
|
|
TaskStatusActive string
|
|
JobStatusActive string
|
|
WorkerID sql.NullInt64
|
|
}
|
|
|
|
type FetchWorkerTaskRow struct {
|
|
Task Task
|
|
Job Job
|
|
IsActive interface{}
|
|
}
|
|
|
|
// Find the currently-active task assigned to a Worker. If not found, find the last task this Worker worked on.
|
|
func (q *Queries) FetchWorkerTask(ctx context.Context, arg FetchWorkerTaskParams) (FetchWorkerTaskRow, error) {
|
|
row := q.db.QueryRowContext(ctx, fetchWorkerTask, arg.TaskStatusActive, arg.JobStatusActive, arg.WorkerID)
|
|
var i FetchWorkerTaskRow
|
|
err := row.Scan(
|
|
&i.Task.ID,
|
|
&i.Task.CreatedAt,
|
|
&i.Task.UpdatedAt,
|
|
&i.Task.UUID,
|
|
&i.Task.Name,
|
|
&i.Task.Type,
|
|
&i.Task.JobID,
|
|
&i.Task.Priority,
|
|
&i.Task.Status,
|
|
&i.Task.WorkerID,
|
|
&i.Task.LastTouchedAt,
|
|
&i.Task.Commands,
|
|
&i.Task.Activity,
|
|
&i.Job.ID,
|
|
&i.Job.CreatedAt,
|
|
&i.Job.UpdatedAt,
|
|
&i.Job.UUID,
|
|
&i.Job.Name,
|
|
&i.Job.JobType,
|
|
&i.Job.Priority,
|
|
&i.Job.Status,
|
|
&i.Job.Activity,
|
|
&i.Job.Settings,
|
|
&i.Job.Metadata,
|
|
&i.Job.DeleteRequestedAt,
|
|
&i.Job.StorageShamanCheckoutID,
|
|
&i.Job.WorkerTagID,
|
|
&i.IsActive,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const findRunnableTask = `-- name: FindRunnableTask :one
|
|
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity
|
|
FROM tasks
|
|
INNER JOIN jobs ON tasks.job_id = jobs.id
|
|
LEFT JOIN task_failures TF ON tasks.id = TF.task_id AND TF.worker_id=?1
|
|
WHERE TF.worker_id IS NULL -- Not failed by this worker before.
|
|
AND tasks.id NOT IN (
|
|
-- Find all tasks IDs that have incomplete dependencies. These are not runnable.
|
|
SELECT tasks_incomplete.id
|
|
FROM tasks AS tasks_incomplete
|
|
INNER JOIN task_dependencies td ON tasks_incomplete.id = td.task_id
|
|
INNER JOIN tasks dep ON dep.id = td.dependency_id
|
|
WHERE dep.status != ?2
|
|
)
|
|
AND tasks.type NOT IN (
|
|
SELECT task_type
|
|
FROM job_blocks
|
|
WHERE job_blocks.worker_id = ?1
|
|
AND job_blocks.job_id = jobs.id
|
|
)
|
|
AND (
|
|
jobs.worker_tag_id IS NULL
|
|
OR jobs.worker_tag_id IN (/*SLICE:worker_tags*/?))
|
|
AND tasks.status IN (/*SLICE:schedulable_task_statuses*/?)
|
|
AND jobs.status IN (/*SLICE:schedulable_job_statuses*/?)
|
|
AND tasks.type IN (/*SLICE:supported_task_types*/?)
|
|
ORDER BY jobs.priority DESC, tasks.priority DESC
|
|
`
|
|
|
|
type FindRunnableTaskParams struct {
|
|
WorkerID int64
|
|
TaskStatusCompleted string
|
|
WorkerTags []sql.NullInt64
|
|
SchedulableTaskStatuses []string
|
|
SchedulableJobStatuses []string
|
|
SupportedTaskTypes []string
|
|
}
|
|
|
|
type FindRunnableTaskRow struct {
|
|
Task Task
|
|
}
|
|
|
|
// Find a task to be run by a worker. This is the core of the task scheduler.
|
|
//
|
|
// Note that this query doesn't check for the assigned worker. Tasks that have a
|
|
// 'schedulable' status might have been assigned to a worker, representing the
|
|
// last worker to touch it -- it's not meant to indicate "ownership" of the
|
|
// task.
|
|
//
|
|
// The order in the WHERE clause is important, slices should come last. See
|
|
// https://github.com/sqlc-dev/sqlc/issues/2452 for more info.
|
|
func (q *Queries) FindRunnableTask(ctx context.Context, arg FindRunnableTaskParams) (FindRunnableTaskRow, error) {
|
|
query := findRunnableTask
|
|
var queryParams []interface{}
|
|
queryParams = append(queryParams, arg.WorkerID)
|
|
queryParams = append(queryParams, arg.TaskStatusCompleted)
|
|
if len(arg.WorkerTags) > 0 {
|
|
for _, v := range arg.WorkerTags {
|
|
queryParams = append(queryParams, v)
|
|
}
|
|
query = strings.Replace(query, "/*SLICE:worker_tags*/?", strings.Repeat(",?", len(arg.WorkerTags))[1:], 1)
|
|
} else {
|
|
query = strings.Replace(query, "/*SLICE:worker_tags*/?", "NULL", 1)
|
|
}
|
|
if len(arg.SchedulableTaskStatuses) > 0 {
|
|
for _, v := range arg.SchedulableTaskStatuses {
|
|
queryParams = append(queryParams, v)
|
|
}
|
|
query = strings.Replace(query, "/*SLICE:schedulable_task_statuses*/?", strings.Repeat(",?", len(arg.SchedulableTaskStatuses))[1:], 1)
|
|
} else {
|
|
query = strings.Replace(query, "/*SLICE:schedulable_task_statuses*/?", "NULL", 1)
|
|
}
|
|
if len(arg.SchedulableJobStatuses) > 0 {
|
|
for _, v := range arg.SchedulableJobStatuses {
|
|
queryParams = append(queryParams, v)
|
|
}
|
|
query = strings.Replace(query, "/*SLICE:schedulable_job_statuses*/?", strings.Repeat(",?", len(arg.SchedulableJobStatuses))[1:], 1)
|
|
} else {
|
|
query = strings.Replace(query, "/*SLICE:schedulable_job_statuses*/?", "NULL", 1)
|
|
}
|
|
if len(arg.SupportedTaskTypes) > 0 {
|
|
for _, v := range arg.SupportedTaskTypes {
|
|
queryParams = append(queryParams, v)
|
|
}
|
|
query = strings.Replace(query, "/*SLICE:supported_task_types*/?", strings.Repeat(",?", len(arg.SupportedTaskTypes))[1:], 1)
|
|
} else {
|
|
query = strings.Replace(query, "/*SLICE:supported_task_types*/?", "NULL", 1)
|
|
}
|
|
row := q.db.QueryRowContext(ctx, query, queryParams...)
|
|
var i FindRunnableTaskRow
|
|
err := row.Scan(
|
|
&i.Task.ID,
|
|
&i.Task.CreatedAt,
|
|
&i.Task.UpdatedAt,
|
|
&i.Task.UUID,
|
|
&i.Task.Name,
|
|
&i.Task.Type,
|
|
&i.Task.JobID,
|
|
&i.Task.Priority,
|
|
&i.Task.Status,
|
|
&i.Task.WorkerID,
|
|
&i.Task.LastTouchedAt,
|
|
&i.Task.Commands,
|
|
&i.Task.Activity,
|
|
)
|
|
return i, err
|
|
}
|