From 3b2b5c47f3fe35c93f180f95c2fb3a83c14e71a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Tue, 2 Jul 2024 10:16:41 +0200 Subject: [PATCH] Manager: convert `FetchWorkerTask()` from gorm to sqlc Ref: #104305 No functional changes --- .../persistence/sqlc/query_task_scheduler.sql | 15 +++++ .../sqlc/query_task_scheduler.sql.go | 64 ++++++++++++++++++ .../manager/persistence/task_scheduler.go | 12 ---- internal/manager/persistence/workers.go | 66 +++++++++++-------- 4 files changed, 118 insertions(+), 39 deletions(-) diff --git a/internal/manager/persistence/sqlc/query_task_scheduler.sql b/internal/manager/persistence/sqlc/query_task_scheduler.sql index 11f302a0..f88d7f3c 100644 --- a/internal/manager/persistence/sqlc/query_task_scheduler.sql +++ b/internal/manager/persistence/sqlc/query_task_scheduler.sql @@ -47,6 +47,21 @@ WHERE TF.worker_id IS NULL -- Not failed by this worker before. AND tasks.type IN (sqlc.slice('supported_task_types')) ORDER BY jobs.priority DESC, tasks.priority DESC; +-- name: FetchWorkerTask :one +-- Find the currently-active task assigned to a Worker. If not found, find the last task this Worker worked on. +SELECT + sqlc.embed(tasks), + sqlc.embed(jobs), + (tasks.status = @task_status_active AND jobs.status = @job_status_active) as is_active +FROM tasks + INNER JOIN jobs ON tasks.job_id = jobs.id +WHERE + tasks.worker_id = @worker_id +ORDER BY + is_active DESC, + tasks.updated_at DESC +LIMIT 1; + -- name: AssignTaskToWorker :exec UPDATE tasks SET worker_id=@worker_id, last_touched_at=@now, updated_at=@now diff --git a/internal/manager/persistence/sqlc/query_task_scheduler.sql.go b/internal/manager/persistence/sqlc/query_task_scheduler.sql.go index 3b2cb45c..9aea531c 100644 --- a/internal/manager/persistence/sqlc/query_task_scheduler.sql.go +++ b/internal/manager/persistence/sqlc/query_task_scheduler.sql.go @@ -82,6 +82,70 @@ func (q *Queries) FetchAssignedAndRunnableTaskOfWorker(ctx context.Context, arg return i, err } +const fetchWorkerTask = `-- name: FetchWorkerTask :one +SELECT + tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, + jobs.id, jobs.created_at, jobs.updated_at, jobs.uuid, jobs.name, jobs.job_type, jobs.priority, jobs.status, jobs.activity, jobs.settings, jobs.metadata, jobs.delete_requested_at, jobs.storage_shaman_checkout_id, jobs.worker_tag_id, + (tasks.status = ?1 AND jobs.status = ?2) as is_active +FROM tasks + INNER JOIN jobs ON tasks.job_id = jobs.id +WHERE + tasks.worker_id = ?3 +ORDER BY + is_active DESC, + tasks.updated_at DESC +LIMIT 1 +` + +type FetchWorkerTaskParams struct { + TaskStatusActive string + JobStatusActive string + WorkerID sql.NullInt64 +} + +type FetchWorkerTaskRow struct { + Task Task + Job Job + IsActive interface{} +} + +// Find the currently-active task assigned to a Worker. If not found, find the last task this Worker worked on. +func (q *Queries) FetchWorkerTask(ctx context.Context, arg FetchWorkerTaskParams) (FetchWorkerTaskRow, error) { + row := q.db.QueryRowContext(ctx, fetchWorkerTask, arg.TaskStatusActive, arg.JobStatusActive, arg.WorkerID) + var i FetchWorkerTaskRow + err := row.Scan( + &i.Task.ID, + &i.Task.CreatedAt, + &i.Task.UpdatedAt, + &i.Task.UUID, + &i.Task.Name, + &i.Task.Type, + &i.Task.JobID, + &i.Task.Priority, + &i.Task.Status, + &i.Task.WorkerID, + &i.Task.LastTouchedAt, + &i.Task.Commands, + &i.Task.Activity, + &i.Job.ID, + &i.Job.CreatedAt, + &i.Job.UpdatedAt, + &i.Job.UUID, + &i.Job.Name, + &i.Job.JobType, + &i.Job.Priority, + &i.Job.Status, + &i.Job.Activity, + &i.Job.Settings, + &i.Job.Metadata, + &i.Job.DeleteRequestedAt, + &i.Job.StorageShamanCheckoutID, + &i.Job.WorkerTagID, + &i.IsActive, + ) + return i, err +} + const findRunnableTask = `-- name: FindRunnableTask :one SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity FROM tasks diff --git a/internal/manager/persistence/task_scheduler.go b/internal/manager/persistence/task_scheduler.go index 41ebb843..0c16d350 100644 --- a/internal/manager/persistence/task_scheduler.go +++ b/internal/manager/persistence/task_scheduler.go @@ -10,7 +10,6 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" - "gorm.io/gorm" "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" "projects.blender.org/studio/flamenco/pkg/api" @@ -164,14 +163,3 @@ func findTaskForWorker( } return row.Task, nil } - -// taskAssignedAndRunnableQuery appends some GORM clauses to query for a task -// that's already assigned to this worker, and is in a runnable state. -func taskAssignedAndRunnableQuery(tx *gorm.DB, w *Worker) *gorm.DB { - return tx. - Joins("left join jobs on tasks.job_id = jobs.id"). - Where("tasks.status = ?", api.TaskStatusActive). - Where("jobs.status in ?", schedulableJobStatuses). - Where("tasks.worker_id = ?", w.ID). // assigned to this worker - Limit(1) -} diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index 62e32c00..3fd1f7e0 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -5,6 +5,7 @@ package persistence import ( "context" "database/sql" + "errors" "fmt" "strings" "time" @@ -191,38 +192,49 @@ func (db *DB) FetchWorkers(ctx context.Context) ([]*Worker, error) { // FetchWorkerTask returns the most recent task assigned to the given Worker. func (db *DB) FetchWorkerTask(ctx context.Context, worker *Worker) (*Task, error) { - task := Task{} - - // See if there is a task assigned to this worker in the same way that the - // task scheduler does. - query := db.gormDB.WithContext(ctx) - query = taskAssignedAndRunnableQuery(query, worker) - tx := query. - Order("tasks.updated_at"). - Preload("Job"). - Find(&task) - if tx.Error != nil { - return nil, taskError(tx.Error, "fetching task assigned to Worker %s", worker.UUID) - } - if task.ID != 0 { - // Found a task! - return &task, nil + queries, err := db.queries() + if err != nil { + return nil, err } - // If not found, just find the last-modified task associated with this Worker. - tx = db.gormDB.WithContext(ctx). - Where("worker_id = ?", worker.ID). - Order("tasks.updated_at DESC"). - Preload("Job"). - Find(&task) - if tx.Error != nil { - return nil, taskError(tx.Error, "fetching task assigned to Worker %s", worker.UUID) - } - if task.ID == 0 { + // Convert the WorkerID to a NullInt64. As task.worker_id can be NULL, this is + // what sqlc expects us to pass in. + workerID := sql.NullInt64{Int64: int64(worker.ID), Valid: true} + + row, err := queries.FetchWorkerTask(ctx, sqlc.FetchWorkerTaskParams{ + TaskStatusActive: string(api.TaskStatusActive), + JobStatusActive: string(api.JobStatusActive), + WorkerID: workerID, + }) + + switch { + case errors.Is(err, sql.ErrNoRows): return nil, nil + case err != nil: + return nil, taskError(err, "fetching task assigned to Worker %s", worker.UUID) } - return &task, nil + // Found a task! + if row.Job.ID == 0 { + panic(fmt.Sprintf("task found but with no job: %#v", row)) + } + if row.Task.ID == 0 { + panic(fmt.Sprintf("task found but with zero ID: %#v", row)) + } + + // Convert the task & job to gorm data types. + gormTask, err := convertSqlcTask(row.Task, row.Job.UUID, worker.UUID) + if err != nil { + return nil, err + } + gormJob, err := convertSqlcJob(row.Job) + if err != nil { + return nil, err + } + gormTask.Job = &gormJob + gormTask.Worker = worker + + return gormTask, nil } func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error {