diff --git a/internal/manager/persistence/jobs_blocklist.go b/internal/manager/persistence/jobs_blocklist.go index da041ecf..1121c523 100644 --- a/internal/manager/persistence/jobs_blocklist.go +++ b/internal/manager/persistence/jobs_blocklist.go @@ -7,7 +7,7 @@ import ( "math" "time" - "gorm.io/gorm/clause" + "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" ) // JobBlock keeps track of which Worker is not allowed to run which task type on which job. @@ -28,66 +28,76 @@ type JobBlock struct { // AddWorkerToJobBlocklist prevents this Worker of getting any task, of this type, on this job, from the task scheduler. func (db *DB) AddWorkerToJobBlocklist(ctx context.Context, job *Job, worker *Worker, taskType string) error { - entry := JobBlock{ - Job: job, - Worker: worker, - TaskType: taskType, + if job.ID == 0 { + panic("Cannot add worker to job blocklist with zero job ID") } - tx := db.gormDB.WithContext(ctx). - Clauses(clause.OnConflict{DoNothing: true}). - Create(&entry) - return tx.Error + if worker.ID == 0 { + panic("Cannot add worker to job blocklist with zero worker ID") + } + if taskType == "" { + panic("Cannot add worker to job blocklist with empty task type") + } + + queries, err := db.queries() + if err != nil { + return err + } + + return queries.AddWorkerToJobBlocklist(ctx, sqlc.AddWorkerToJobBlocklistParams{ + CreatedAt: db.now().Time, + JobID: int64(job.ID), + WorkerID: int64(worker.ID), + TaskType: taskType, + }) } +// FetchJobBlocklist fetches the blocklist for the given job. +// Workers are fetched too, and embedded in the returned list. func (db *DB) FetchJobBlocklist(ctx context.Context, jobUUID string) ([]JobBlock, error) { - entries := []JobBlock{} + queries, err := db.queries() + if err != nil { + return nil, err + } - tx := db.gormDB.WithContext(ctx). - Model(JobBlock{}). - Joins("inner join jobs on jobs.id = job_blocks.job_id"). - Joins("Worker"). - Where("jobs.uuid = ?", jobUUID). - Order("Worker.name"). - Scan(&entries) - return entries, tx.Error + rows, err := queries.FetchJobBlocklist(ctx, jobUUID) + if err != nil { + return nil, err + } + + entries := make([]JobBlock, len(rows)) + for idx, row := range rows { + entries[idx].ID = uint(row.JobBlock.ID) + entries[idx].CreatedAt = row.JobBlock.CreatedAt + entries[idx].TaskType = row.JobBlock.TaskType + entries[idx].JobID = uint(row.JobBlock.JobID) + entries[idx].WorkerID = uint(row.JobBlock.WorkerID) + + worker := convertSqlcWorker(row.Worker) + entries[idx].Worker = &worker + } + + return entries, nil } // ClearJobBlocklist removes the entire blocklist of this job. func (db *DB) ClearJobBlocklist(ctx context.Context, job *Job) error { - tx := db.gormDB.WithContext(ctx). - Where("job_id = ?", job.ID). - Delete(JobBlock{}) - return tx.Error + queries, err := db.queries() + if err != nil { + return err + } + return queries.ClearJobBlocklist(ctx, job.UUID) } func (db *DB) RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error { - // Find the job ID. - job := Job{} - tx := db.gormDB.WithContext(ctx). - Select("id"). - Where("uuid = ?", jobUUID). - Find(&job) - if tx.Error != nil { - return jobError(tx.Error, "fetching job with uuid=%q", jobUUID) + queries, err := db.queries() + if err != nil { + return err } - - // Find the worker ID. - worker := Worker{} - tx = db.gormDB.WithContext(ctx). - Select("id"). - Where("uuid = ?", workerUUID). - Find(&worker) - if tx.Error != nil { - return workerError(tx.Error, "fetching worker with uuid=%q", workerUUID) - } - - // Remove the blocklist entry. - tx = db.gormDB.WithContext(ctx). - Where("job_id = ?", job.ID). - Where("worker_id = ?", worker.ID). - Where("task_type = ?", taskType). - Delete(JobBlock{}) - return tx.Error + return queries.RemoveFromJobBlocklist(ctx, sqlc.RemoveFromJobBlocklistParams{ + JobUUID: jobUUID, + WorkerUUID: workerUUID, + TaskType: taskType, + }) } // WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job. diff --git a/internal/manager/persistence/sqlc/query_jobs.sql b/internal/manager/persistence/sqlc/query_jobs.sql index 9d5e61d4..2e5f35e1 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql +++ b/internal/manager/persistence/sqlc/query_jobs.sql @@ -244,3 +244,28 @@ ON CONFLICT DO UPDATE -- name: GetLastRenderedJobUUID :one SELECT uuid FROM jobs INNER JOIN last_rendereds LR ON jobs.id = LR.job_id; + +-- name: AddWorkerToJobBlocklist :exec +-- Add a worker to a job's blocklist. +INSERT INTO job_blocks (created_at, job_id, worker_id, task_type) +VALUES (@created_at, @job_id, @worker_id, @task_type) +ON CONFLICT DO NOTHING; + +-- name: FetchJobBlocklist :many +SELECT sqlc.embed(job_blocks), sqlc.embed(workers) +FROM job_blocks +INNER JOIN jobs ON jobs.id = job_blocks.job_id +INNER JOIN workers on workers.id = job_blocks.worker_id +WHERE jobs.uuid = @jobuuid +ORDER BY workers.name; + +-- name: ClearJobBlocklist :exec +DELETE FROM job_blocks +WHERE job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=@jobuuid); + +-- name: RemoveFromJobBlocklist :exec +DELETE FROM job_blocks +WHERE + job_blocks.job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=@jobuuid) +AND job_blocks.worker_id in (SELECT workers.id FROM workers WHERE workers.uuid=@workeruuid) +AND job_blocks.task_type = @task_type; diff --git a/internal/manager/persistence/sqlc/query_jobs.sql.go b/internal/manager/persistence/sqlc/query_jobs.sql.go index 27470b1c..5350c085 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql.go +++ b/internal/manager/persistence/sqlc/query_jobs.sql.go @@ -13,6 +13,30 @@ import ( "time" ) +const addWorkerToJobBlocklist = `-- name: AddWorkerToJobBlocklist :exec +INSERT INTO job_blocks (created_at, job_id, worker_id, task_type) +VALUES (?1, ?2, ?3, ?4) +ON CONFLICT DO NOTHING +` + +type AddWorkerToJobBlocklistParams struct { + CreatedAt time.Time + JobID int64 + WorkerID int64 + TaskType string +} + +// Add a worker to a job's blocklist. +func (q *Queries) AddWorkerToJobBlocklist(ctx context.Context, arg AddWorkerToJobBlocklistParams) error { + _, err := q.db.ExecContext(ctx, addWorkerToJobBlocklist, + arg.CreatedAt, + arg.JobID, + arg.WorkerID, + arg.TaskType, + ) + return err +} + const addWorkerToTaskFailedList = `-- name: AddWorkerToTaskFailedList :exec INSERT INTO task_failures (created_at, task_id, worker_id) VALUES (?1, ?2, ?3) @@ -50,6 +74,16 @@ func (q *Queries) ClearFailureListOfTask(ctx context.Context, taskID int64) erro return err } +const clearJobBlocklist = `-- name: ClearJobBlocklist :exec +DELETE FROM job_blocks +WHERE job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=?1) +` + +func (q *Queries) ClearJobBlocklist(ctx context.Context, jobuuid string) error { + _, err := q.db.ExecContext(ctx, clearJobBlocklist, jobuuid) + return err +} + const countWorkersFailingTask = `-- name: CountWorkersFailingTask :one SELECT count(*) as num_failed FROM task_failures WHERE task_id=?1 @@ -217,6 +251,65 @@ func (q *Queries) FetchJob(ctx context.Context, uuid string) (Job, error) { return i, err } +const fetchJobBlocklist = `-- name: FetchJobBlocklist :many +SELECT job_blocks.id, job_blocks.created_at, job_blocks.job_id, job_blocks.worker_id, job_blocks.task_type, workers.id, workers.created_at, workers.updated_at, workers.uuid, workers.secret, workers.name, workers.address, workers.platform, workers.software, workers.status, workers.last_seen_at, workers.status_requested, workers.lazy_status_request, workers.supported_task_types, workers.deleted_at, workers.can_restart +FROM job_blocks +INNER JOIN jobs ON jobs.id = job_blocks.job_id +INNER JOIN workers on workers.id = job_blocks.worker_id +WHERE jobs.uuid = ?1 +ORDER BY workers.name +` + +type FetchJobBlocklistRow struct { + JobBlock JobBlock + Worker Worker +} + +func (q *Queries) FetchJobBlocklist(ctx context.Context, jobuuid string) ([]FetchJobBlocklistRow, error) { + rows, err := q.db.QueryContext(ctx, fetchJobBlocklist, jobuuid) + if err != nil { + return nil, err + } + defer rows.Close() + var items []FetchJobBlocklistRow + for rows.Next() { + var i FetchJobBlocklistRow + if err := rows.Scan( + &i.JobBlock.ID, + &i.JobBlock.CreatedAt, + &i.JobBlock.JobID, + &i.JobBlock.WorkerID, + &i.JobBlock.TaskType, + &i.Worker.ID, + &i.Worker.CreatedAt, + &i.Worker.UpdatedAt, + &i.Worker.UUID, + &i.Worker.Secret, + &i.Worker.Name, + &i.Worker.Address, + &i.Worker.Platform, + &i.Worker.Software, + &i.Worker.Status, + &i.Worker.LastSeenAt, + &i.Worker.StatusRequested, + &i.Worker.LazyStatusRequest, + &i.Worker.SupportedTaskTypes, + &i.Worker.DeletedAt, + &i.Worker.CanRestart, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const fetchJobByID = `-- name: FetchJobByID :one SELECT id, created_at, updated_at, uuid, name, job_type, priority, status, activity, settings, metadata, delete_requested_at, storage_shaman_checkout_id, worker_tag_id FROM jobs WHERE id = ? LIMIT 1 @@ -758,6 +851,25 @@ func (q *Queries) JobCountTasksInStatus(ctx context.Context, arg JobCountTasksIn return num_tasks, err } +const removeFromJobBlocklist = `-- name: RemoveFromJobBlocklist :exec +DELETE FROM job_blocks +WHERE + job_blocks.job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=?1) +AND job_blocks.worker_id in (SELECT workers.id FROM workers WHERE workers.uuid=?2) +AND job_blocks.task_type = ?3 +` + +type RemoveFromJobBlocklistParams struct { + JobUUID string + WorkerUUID string + TaskType string +} + +func (q *Queries) RemoveFromJobBlocklist(ctx context.Context, arg RemoveFromJobBlocklistParams) error { + _, err := q.db.ExecContext(ctx, removeFromJobBlocklist, arg.JobUUID, arg.WorkerUUID, arg.TaskType) + return err +} + const requestJobDeletion = `-- name: RequestJobDeletion :exec UPDATE jobs SET updated_at = ?1,