From 1f7cab0ef210031f58e610b4b8734c6dd1d75e57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Fri, 2 Aug 2024 12:50:17 +0200 Subject: [PATCH] Manager: convert job blocklist management queries to sqlc Convert most of the job blocklist queries from GORM to sqlc. The management functions (add worker, remove worker, clear list, fetch list) have been converted. --- .../manager/persistence/jobs_blocklist.go | 106 +++++++++-------- .../manager/persistence/sqlc/query_jobs.sql | 25 ++++ .../persistence/sqlc/query_jobs.sql.go | 112 ++++++++++++++++++ 3 files changed, 195 insertions(+), 48 deletions(-) diff --git a/internal/manager/persistence/jobs_blocklist.go b/internal/manager/persistence/jobs_blocklist.go index da041ecf..1121c523 100644 --- a/internal/manager/persistence/jobs_blocklist.go +++ b/internal/manager/persistence/jobs_blocklist.go @@ -7,7 +7,7 @@ import ( "math" "time" - "gorm.io/gorm/clause" + "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" ) // JobBlock keeps track of which Worker is not allowed to run which task type on which job. @@ -28,66 +28,76 @@ type JobBlock struct { // AddWorkerToJobBlocklist prevents this Worker of getting any task, of this type, on this job, from the task scheduler. func (db *DB) AddWorkerToJobBlocklist(ctx context.Context, job *Job, worker *Worker, taskType string) error { - entry := JobBlock{ - Job: job, - Worker: worker, - TaskType: taskType, + if job.ID == 0 { + panic("Cannot add worker to job blocklist with zero job ID") } - tx := db.gormDB.WithContext(ctx). - Clauses(clause.OnConflict{DoNothing: true}). - Create(&entry) - return tx.Error + if worker.ID == 0 { + panic("Cannot add worker to job blocklist with zero worker ID") + } + if taskType == "" { + panic("Cannot add worker to job blocklist with empty task type") + } + + queries, err := db.queries() + if err != nil { + return err + } + + return queries.AddWorkerToJobBlocklist(ctx, sqlc.AddWorkerToJobBlocklistParams{ + CreatedAt: db.now().Time, + JobID: int64(job.ID), + WorkerID: int64(worker.ID), + TaskType: taskType, + }) } +// FetchJobBlocklist fetches the blocklist for the given job. +// Workers are fetched too, and embedded in the returned list. func (db *DB) FetchJobBlocklist(ctx context.Context, jobUUID string) ([]JobBlock, error) { - entries := []JobBlock{} + queries, err := db.queries() + if err != nil { + return nil, err + } - tx := db.gormDB.WithContext(ctx). - Model(JobBlock{}). - Joins("inner join jobs on jobs.id = job_blocks.job_id"). - Joins("Worker"). - Where("jobs.uuid = ?", jobUUID). - Order("Worker.name"). - Scan(&entries) - return entries, tx.Error + rows, err := queries.FetchJobBlocklist(ctx, jobUUID) + if err != nil { + return nil, err + } + + entries := make([]JobBlock, len(rows)) + for idx, row := range rows { + entries[idx].ID = uint(row.JobBlock.ID) + entries[idx].CreatedAt = row.JobBlock.CreatedAt + entries[idx].TaskType = row.JobBlock.TaskType + entries[idx].JobID = uint(row.JobBlock.JobID) + entries[idx].WorkerID = uint(row.JobBlock.WorkerID) + + worker := convertSqlcWorker(row.Worker) + entries[idx].Worker = &worker + } + + return entries, nil } // ClearJobBlocklist removes the entire blocklist of this job. func (db *DB) ClearJobBlocklist(ctx context.Context, job *Job) error { - tx := db.gormDB.WithContext(ctx). - Where("job_id = ?", job.ID). - Delete(JobBlock{}) - return tx.Error + queries, err := db.queries() + if err != nil { + return err + } + return queries.ClearJobBlocklist(ctx, job.UUID) } func (db *DB) RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error { - // Find the job ID. - job := Job{} - tx := db.gormDB.WithContext(ctx). - Select("id"). - Where("uuid = ?", jobUUID). - Find(&job) - if tx.Error != nil { - return jobError(tx.Error, "fetching job with uuid=%q", jobUUID) + queries, err := db.queries() + if err != nil { + return err } - - // Find the worker ID. - worker := Worker{} - tx = db.gormDB.WithContext(ctx). - Select("id"). - Where("uuid = ?", workerUUID). - Find(&worker) - if tx.Error != nil { - return workerError(tx.Error, "fetching worker with uuid=%q", workerUUID) - } - - // Remove the blocklist entry. - tx = db.gormDB.WithContext(ctx). - Where("job_id = ?", job.ID). - Where("worker_id = ?", worker.ID). - Where("task_type = ?", taskType). - Delete(JobBlock{}) - return tx.Error + return queries.RemoveFromJobBlocklist(ctx, sqlc.RemoveFromJobBlocklistParams{ + JobUUID: jobUUID, + WorkerUUID: workerUUID, + TaskType: taskType, + }) } // WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job. diff --git a/internal/manager/persistence/sqlc/query_jobs.sql b/internal/manager/persistence/sqlc/query_jobs.sql index 9d5e61d4..2e5f35e1 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql +++ b/internal/manager/persistence/sqlc/query_jobs.sql @@ -244,3 +244,28 @@ ON CONFLICT DO UPDATE -- name: GetLastRenderedJobUUID :one SELECT uuid FROM jobs INNER JOIN last_rendereds LR ON jobs.id = LR.job_id; + +-- name: AddWorkerToJobBlocklist :exec +-- Add a worker to a job's blocklist. +INSERT INTO job_blocks (created_at, job_id, worker_id, task_type) +VALUES (@created_at, @job_id, @worker_id, @task_type) +ON CONFLICT DO NOTHING; + +-- name: FetchJobBlocklist :many +SELECT sqlc.embed(job_blocks), sqlc.embed(workers) +FROM job_blocks +INNER JOIN jobs ON jobs.id = job_blocks.job_id +INNER JOIN workers on workers.id = job_blocks.worker_id +WHERE jobs.uuid = @jobuuid +ORDER BY workers.name; + +-- name: ClearJobBlocklist :exec +DELETE FROM job_blocks +WHERE job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=@jobuuid); + +-- name: RemoveFromJobBlocklist :exec +DELETE FROM job_blocks +WHERE + job_blocks.job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=@jobuuid) +AND job_blocks.worker_id in (SELECT workers.id FROM workers WHERE workers.uuid=@workeruuid) +AND job_blocks.task_type = @task_type; diff --git a/internal/manager/persistence/sqlc/query_jobs.sql.go b/internal/manager/persistence/sqlc/query_jobs.sql.go index 27470b1c..5350c085 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql.go +++ b/internal/manager/persistence/sqlc/query_jobs.sql.go @@ -13,6 +13,30 @@ import ( "time" ) +const addWorkerToJobBlocklist = `-- name: AddWorkerToJobBlocklist :exec +INSERT INTO job_blocks (created_at, job_id, worker_id, task_type) +VALUES (?1, ?2, ?3, ?4) +ON CONFLICT DO NOTHING +` + +type AddWorkerToJobBlocklistParams struct { + CreatedAt time.Time + JobID int64 + WorkerID int64 + TaskType string +} + +// Add a worker to a job's blocklist. +func (q *Queries) AddWorkerToJobBlocklist(ctx context.Context, arg AddWorkerToJobBlocklistParams) error { + _, err := q.db.ExecContext(ctx, addWorkerToJobBlocklist, + arg.CreatedAt, + arg.JobID, + arg.WorkerID, + arg.TaskType, + ) + return err +} + const addWorkerToTaskFailedList = `-- name: AddWorkerToTaskFailedList :exec INSERT INTO task_failures (created_at, task_id, worker_id) VALUES (?1, ?2, ?3) @@ -50,6 +74,16 @@ func (q *Queries) ClearFailureListOfTask(ctx context.Context, taskID int64) erro return err } +const clearJobBlocklist = `-- name: ClearJobBlocklist :exec +DELETE FROM job_blocks +WHERE job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=?1) +` + +func (q *Queries) ClearJobBlocklist(ctx context.Context, jobuuid string) error { + _, err := q.db.ExecContext(ctx, clearJobBlocklist, jobuuid) + return err +} + const countWorkersFailingTask = `-- name: CountWorkersFailingTask :one SELECT count(*) as num_failed FROM task_failures WHERE task_id=?1 @@ -217,6 +251,65 @@ func (q *Queries) FetchJob(ctx context.Context, uuid string) (Job, error) { return i, err } +const fetchJobBlocklist = `-- name: FetchJobBlocklist :many +SELECT job_blocks.id, job_blocks.created_at, job_blocks.job_id, job_blocks.worker_id, job_blocks.task_type, workers.id, workers.created_at, workers.updated_at, workers.uuid, workers.secret, workers.name, workers.address, workers.platform, workers.software, workers.status, workers.last_seen_at, workers.status_requested, workers.lazy_status_request, workers.supported_task_types, workers.deleted_at, workers.can_restart +FROM job_blocks +INNER JOIN jobs ON jobs.id = job_blocks.job_id +INNER JOIN workers on workers.id = job_blocks.worker_id +WHERE jobs.uuid = ?1 +ORDER BY workers.name +` + +type FetchJobBlocklistRow struct { + JobBlock JobBlock + Worker Worker +} + +func (q *Queries) FetchJobBlocklist(ctx context.Context, jobuuid string) ([]FetchJobBlocklistRow, error) { + rows, err := q.db.QueryContext(ctx, fetchJobBlocklist, jobuuid) + if err != nil { + return nil, err + } + defer rows.Close() + var items []FetchJobBlocklistRow + for rows.Next() { + var i FetchJobBlocklistRow + if err := rows.Scan( + &i.JobBlock.ID, + &i.JobBlock.CreatedAt, + &i.JobBlock.JobID, + &i.JobBlock.WorkerID, + &i.JobBlock.TaskType, + &i.Worker.ID, + &i.Worker.CreatedAt, + &i.Worker.UpdatedAt, + &i.Worker.UUID, + &i.Worker.Secret, + &i.Worker.Name, + &i.Worker.Address, + &i.Worker.Platform, + &i.Worker.Software, + &i.Worker.Status, + &i.Worker.LastSeenAt, + &i.Worker.StatusRequested, + &i.Worker.LazyStatusRequest, + &i.Worker.SupportedTaskTypes, + &i.Worker.DeletedAt, + &i.Worker.CanRestart, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const fetchJobByID = `-- name: FetchJobByID :one SELECT id, created_at, updated_at, uuid, name, job_type, priority, status, activity, settings, metadata, delete_requested_at, storage_shaman_checkout_id, worker_tag_id FROM jobs WHERE id = ? LIMIT 1 @@ -758,6 +851,25 @@ func (q *Queries) JobCountTasksInStatus(ctx context.Context, arg JobCountTasksIn return num_tasks, err } +const removeFromJobBlocklist = `-- name: RemoveFromJobBlocklist :exec +DELETE FROM job_blocks +WHERE + job_blocks.job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=?1) +AND job_blocks.worker_id in (SELECT workers.id FROM workers WHERE workers.uuid=?2) +AND job_blocks.task_type = ?3 +` + +type RemoveFromJobBlocklistParams struct { + JobUUID string + WorkerUUID string + TaskType string +} + +func (q *Queries) RemoveFromJobBlocklist(ctx context.Context, arg RemoveFromJobBlocklistParams) error { + _, err := q.db.ExecContext(ctx, removeFromJobBlocklist, arg.JobUUID, arg.WorkerUUID, arg.TaskType) + return err +} + const requestJobDeletion = `-- name: RequestJobDeletion :exec UPDATE jobs SET updated_at = ?1,