Manager: convert job blocklist management queries to sqlc

Convert most of the job blocklist queries from GORM to sqlc. The management
functions (add worker, remove worker, clear list, fetch list) have been
converted.
This commit is contained in:
Sybren A. Stüvel 2024-08-02 12:50:17 +02:00
parent e758f8c79d
commit 1f7cab0ef2
3 changed files with 195 additions and 48 deletions

View File

@ -7,7 +7,7 @@ import (
"math" "math"
"time" "time"
"gorm.io/gorm/clause" "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
) )
// JobBlock keeps track of which Worker is not allowed to run which task type on which job. // JobBlock keeps track of which Worker is not allowed to run which task type on which job.
@ -28,66 +28,76 @@ type JobBlock struct {
// AddWorkerToJobBlocklist prevents this Worker of getting any task, of this type, on this job, from the task scheduler. // AddWorkerToJobBlocklist prevents this Worker of getting any task, of this type, on this job, from the task scheduler.
func (db *DB) AddWorkerToJobBlocklist(ctx context.Context, job *Job, worker *Worker, taskType string) error { func (db *DB) AddWorkerToJobBlocklist(ctx context.Context, job *Job, worker *Worker, taskType string) error {
entry := JobBlock{ if job.ID == 0 {
Job: job, panic("Cannot add worker to job blocklist with zero job ID")
Worker: worker, }
if worker.ID == 0 {
panic("Cannot add worker to job blocklist with zero worker ID")
}
if taskType == "" {
panic("Cannot add worker to job blocklist with empty task type")
}
queries, err := db.queries()
if err != nil {
return err
}
return queries.AddWorkerToJobBlocklist(ctx, sqlc.AddWorkerToJobBlocklistParams{
CreatedAt: db.now().Time,
JobID: int64(job.ID),
WorkerID: int64(worker.ID),
TaskType: taskType, TaskType: taskType,
} })
tx := db.gormDB.WithContext(ctx).
Clauses(clause.OnConflict{DoNothing: true}).
Create(&entry)
return tx.Error
} }
// FetchJobBlocklist fetches the blocklist for the given job.
// Workers are fetched too, and embedded in the returned list.
func (db *DB) FetchJobBlocklist(ctx context.Context, jobUUID string) ([]JobBlock, error) { func (db *DB) FetchJobBlocklist(ctx context.Context, jobUUID string) ([]JobBlock, error) {
entries := []JobBlock{} queries, err := db.queries()
if err != nil {
return nil, err
}
tx := db.gormDB.WithContext(ctx). rows, err := queries.FetchJobBlocklist(ctx, jobUUID)
Model(JobBlock{}). if err != nil {
Joins("inner join jobs on jobs.id = job_blocks.job_id"). return nil, err
Joins("Worker"). }
Where("jobs.uuid = ?", jobUUID).
Order("Worker.name"). entries := make([]JobBlock, len(rows))
Scan(&entries) for idx, row := range rows {
return entries, tx.Error entries[idx].ID = uint(row.JobBlock.ID)
entries[idx].CreatedAt = row.JobBlock.CreatedAt
entries[idx].TaskType = row.JobBlock.TaskType
entries[idx].JobID = uint(row.JobBlock.JobID)
entries[idx].WorkerID = uint(row.JobBlock.WorkerID)
worker := convertSqlcWorker(row.Worker)
entries[idx].Worker = &worker
}
return entries, nil
} }
// ClearJobBlocklist removes the entire blocklist of this job. // ClearJobBlocklist removes the entire blocklist of this job.
func (db *DB) ClearJobBlocklist(ctx context.Context, job *Job) error { func (db *DB) ClearJobBlocklist(ctx context.Context, job *Job) error {
tx := db.gormDB.WithContext(ctx). queries, err := db.queries()
Where("job_id = ?", job.ID). if err != nil {
Delete(JobBlock{}) return err
return tx.Error }
return queries.ClearJobBlocklist(ctx, job.UUID)
} }
func (db *DB) RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error { func (db *DB) RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error {
// Find the job ID. queries, err := db.queries()
job := Job{} if err != nil {
tx := db.gormDB.WithContext(ctx). return err
Select("id").
Where("uuid = ?", jobUUID).
Find(&job)
if tx.Error != nil {
return jobError(tx.Error, "fetching job with uuid=%q", jobUUID)
} }
return queries.RemoveFromJobBlocklist(ctx, sqlc.RemoveFromJobBlocklistParams{
// Find the worker ID. JobUUID: jobUUID,
worker := Worker{} WorkerUUID: workerUUID,
tx = db.gormDB.WithContext(ctx). TaskType: taskType,
Select("id"). })
Where("uuid = ?", workerUUID).
Find(&worker)
if tx.Error != nil {
return workerError(tx.Error, "fetching worker with uuid=%q", workerUUID)
}
// Remove the blocklist entry.
tx = db.gormDB.WithContext(ctx).
Where("job_id = ?", job.ID).
Where("worker_id = ?", worker.ID).
Where("task_type = ?", taskType).
Delete(JobBlock{})
return tx.Error
} }
// WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job. // WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job.

View File

@ -244,3 +244,28 @@ ON CONFLICT DO UPDATE
-- name: GetLastRenderedJobUUID :one -- name: GetLastRenderedJobUUID :one
SELECT uuid FROM jobs SELECT uuid FROM jobs
INNER JOIN last_rendereds LR ON jobs.id = LR.job_id; INNER JOIN last_rendereds LR ON jobs.id = LR.job_id;
-- name: AddWorkerToJobBlocklist :exec
-- Add a worker to a job's blocklist.
INSERT INTO job_blocks (created_at, job_id, worker_id, task_type)
VALUES (@created_at, @job_id, @worker_id, @task_type)
ON CONFLICT DO NOTHING;
-- name: FetchJobBlocklist :many
SELECT sqlc.embed(job_blocks), sqlc.embed(workers)
FROM job_blocks
INNER JOIN jobs ON jobs.id = job_blocks.job_id
INNER JOIN workers on workers.id = job_blocks.worker_id
WHERE jobs.uuid = @jobuuid
ORDER BY workers.name;
-- name: ClearJobBlocklist :exec
DELETE FROM job_blocks
WHERE job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=@jobuuid);
-- name: RemoveFromJobBlocklist :exec
DELETE FROM job_blocks
WHERE
job_blocks.job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=@jobuuid)
AND job_blocks.worker_id in (SELECT workers.id FROM workers WHERE workers.uuid=@workeruuid)
AND job_blocks.task_type = @task_type;

View File

@ -13,6 +13,30 @@ import (
"time" "time"
) )
const addWorkerToJobBlocklist = `-- name: AddWorkerToJobBlocklist :exec
INSERT INTO job_blocks (created_at, job_id, worker_id, task_type)
VALUES (?1, ?2, ?3, ?4)
ON CONFLICT DO NOTHING
`
type AddWorkerToJobBlocklistParams struct {
CreatedAt time.Time
JobID int64
WorkerID int64
TaskType string
}
// Add a worker to a job's blocklist.
func (q *Queries) AddWorkerToJobBlocklist(ctx context.Context, arg AddWorkerToJobBlocklistParams) error {
_, err := q.db.ExecContext(ctx, addWorkerToJobBlocklist,
arg.CreatedAt,
arg.JobID,
arg.WorkerID,
arg.TaskType,
)
return err
}
const addWorkerToTaskFailedList = `-- name: AddWorkerToTaskFailedList :exec const addWorkerToTaskFailedList = `-- name: AddWorkerToTaskFailedList :exec
INSERT INTO task_failures (created_at, task_id, worker_id) INSERT INTO task_failures (created_at, task_id, worker_id)
VALUES (?1, ?2, ?3) VALUES (?1, ?2, ?3)
@ -50,6 +74,16 @@ func (q *Queries) ClearFailureListOfTask(ctx context.Context, taskID int64) erro
return err return err
} }
const clearJobBlocklist = `-- name: ClearJobBlocklist :exec
DELETE FROM job_blocks
WHERE job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=?1)
`
func (q *Queries) ClearJobBlocklist(ctx context.Context, jobuuid string) error {
_, err := q.db.ExecContext(ctx, clearJobBlocklist, jobuuid)
return err
}
const countWorkersFailingTask = `-- name: CountWorkersFailingTask :one const countWorkersFailingTask = `-- name: CountWorkersFailingTask :one
SELECT count(*) as num_failed FROM task_failures SELECT count(*) as num_failed FROM task_failures
WHERE task_id=?1 WHERE task_id=?1
@ -217,6 +251,65 @@ func (q *Queries) FetchJob(ctx context.Context, uuid string) (Job, error) {
return i, err return i, err
} }
const fetchJobBlocklist = `-- name: FetchJobBlocklist :many
SELECT job_blocks.id, job_blocks.created_at, job_blocks.job_id, job_blocks.worker_id, job_blocks.task_type, workers.id, workers.created_at, workers.updated_at, workers.uuid, workers.secret, workers.name, workers.address, workers.platform, workers.software, workers.status, workers.last_seen_at, workers.status_requested, workers.lazy_status_request, workers.supported_task_types, workers.deleted_at, workers.can_restart
FROM job_blocks
INNER JOIN jobs ON jobs.id = job_blocks.job_id
INNER JOIN workers on workers.id = job_blocks.worker_id
WHERE jobs.uuid = ?1
ORDER BY workers.name
`
type FetchJobBlocklistRow struct {
JobBlock JobBlock
Worker Worker
}
func (q *Queries) FetchJobBlocklist(ctx context.Context, jobuuid string) ([]FetchJobBlocklistRow, error) {
rows, err := q.db.QueryContext(ctx, fetchJobBlocklist, jobuuid)
if err != nil {
return nil, err
}
defer rows.Close()
var items []FetchJobBlocklistRow
for rows.Next() {
var i FetchJobBlocklistRow
if err := rows.Scan(
&i.JobBlock.ID,
&i.JobBlock.CreatedAt,
&i.JobBlock.JobID,
&i.JobBlock.WorkerID,
&i.JobBlock.TaskType,
&i.Worker.ID,
&i.Worker.CreatedAt,
&i.Worker.UpdatedAt,
&i.Worker.UUID,
&i.Worker.Secret,
&i.Worker.Name,
&i.Worker.Address,
&i.Worker.Platform,
&i.Worker.Software,
&i.Worker.Status,
&i.Worker.LastSeenAt,
&i.Worker.StatusRequested,
&i.Worker.LazyStatusRequest,
&i.Worker.SupportedTaskTypes,
&i.Worker.DeletedAt,
&i.Worker.CanRestart,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const fetchJobByID = `-- name: FetchJobByID :one const fetchJobByID = `-- name: FetchJobByID :one
SELECT id, created_at, updated_at, uuid, name, job_type, priority, status, activity, settings, metadata, delete_requested_at, storage_shaman_checkout_id, worker_tag_id FROM jobs SELECT id, created_at, updated_at, uuid, name, job_type, priority, status, activity, settings, metadata, delete_requested_at, storage_shaman_checkout_id, worker_tag_id FROM jobs
WHERE id = ? LIMIT 1 WHERE id = ? LIMIT 1
@ -758,6 +851,25 @@ func (q *Queries) JobCountTasksInStatus(ctx context.Context, arg JobCountTasksIn
return num_tasks, err return num_tasks, err
} }
const removeFromJobBlocklist = `-- name: RemoveFromJobBlocklist :exec
DELETE FROM job_blocks
WHERE
job_blocks.job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=?1)
AND job_blocks.worker_id in (SELECT workers.id FROM workers WHERE workers.uuid=?2)
AND job_blocks.task_type = ?3
`
type RemoveFromJobBlocklistParams struct {
JobUUID string
WorkerUUID string
TaskType string
}
func (q *Queries) RemoveFromJobBlocklist(ctx context.Context, arg RemoveFromJobBlocklistParams) error {
_, err := q.db.ExecContext(ctx, removeFromJobBlocklist, arg.JobUUID, arg.WorkerUUID, arg.TaskType)
return err
}
const requestJobDeletion = `-- name: RequestJobDeletion :exec const requestJobDeletion = `-- name: RequestJobDeletion :exec
UPDATE jobs SET UPDATE jobs SET
updated_at = ?1, updated_at = ?1,