From 4435633756f7428015ceff100114fa10745f2e3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Mon, 20 May 2024 21:36:20 +0200 Subject: [PATCH] Manager: Convert FetchTasksOfJob() and FetchTasksOfJobInStatus() to sqlc No functional changes. --- internal/manager/persistence/jobs.go | 62 ++++++---- .../manager/persistence/sqlc/query_jobs.sql | 13 ++ .../persistence/sqlc/query_jobs.sql.go | 117 ++++++++++++++++++ 3 files changed, 171 insertions(+), 21 deletions(-) diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index bcff8dbb..17f0ea9c 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -774,39 +774,59 @@ func (db *DB) CountTasksOfJobInStatus( // FetchTaskIDsOfJob returns all tasks of the given job. func (db *DB) FetchTasksOfJob(ctx context.Context, job *Job) ([]*Task, error) { - var tasks []*Task - tx := db.gormDB.WithContext(ctx). - Model(&Task{}). - Where("job_id", job.ID). - Scan(&tasks) - if tx.Error != nil { - return nil, taskError(tx.Error, "fetching tasks of job %s", job.UUID) + queries, err := db.queries() + if err != nil { + return nil, err } - for i := range tasks { - tasks[i].Job = job + rows, err := queries.FetchTasksOfJob(ctx, int64(job.ID)) + if err != nil { + return nil, taskError(err, "fetching tasks of job %s", job.UUID) } - return tasks, nil + result := make([]*Task, len(rows)) + for i := range rows { + gormTask, err := convertSqlcTask(rows[i].Task, job.UUID, rows[i].WorkerUUID.String) + if err != nil { + return nil, err + } + gormTask.Job = job + result[i] = gormTask + } + return result, nil } // FetchTasksOfJobInStatus returns those tasks of the given job that have any of the given statuses. func (db *DB) FetchTasksOfJobInStatus(ctx context.Context, job *Job, taskStatuses ...api.TaskStatus) ([]*Task, error) { - var tasks []*Task - tx := db.gormDB.WithContext(ctx). - Model(&Task{}). - Where("job_id", job.ID). - Where("status in ?", taskStatuses). - Scan(&tasks) - if tx.Error != nil { - return nil, taskError(tx.Error, "fetching tasks of job %s in status %q", job.UUID, taskStatuses) + queries, err := db.queries() + if err != nil { + return nil, err } - for i := range tasks { - tasks[i].Job = job + // Convert from []api.TaskStatus to []string for feeding to sqlc. + statusesAsStrings := make([]string, len(taskStatuses)) + for index := range taskStatuses { + statusesAsStrings[index] = string(taskStatuses[index]) } - return tasks, nil + rows, err := queries.FetchTasksOfJobInStatus(ctx, sqlc.FetchTasksOfJobInStatusParams{ + JobID: int64(job.ID), + TaskStatus: statusesAsStrings, + }) + if err != nil { + return nil, taskError(err, "fetching tasks of job %s in status %q", job.UUID, taskStatuses) + } + + result := make([]*Task, len(rows)) + for i := range rows { + gormTask, err := convertSqlcTask(rows[i].Task, job.UUID, rows[i].WorkerUUID.String) + if err != nil { + return nil, err + } + gormTask.Job = job + result[i] = gormTask + } + return result, nil } // UpdateJobsTaskStatuses updates the status & activity of all tasks of `job`. diff --git a/internal/manager/persistence/sqlc/query_jobs.sql b/internal/manager/persistence/sqlc/query_jobs.sql index aa02ebb4..231a2d5d 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql +++ b/internal/manager/persistence/sqlc/query_jobs.sql @@ -83,6 +83,19 @@ WHERE tasks.worker_id = @worker_id AND tasks.job_id = @job_id AND tasks.status = @task_status; +-- name: FetchTasksOfJob :many +SELECT sqlc.embed(tasks), workers.UUID as workerUUID +FROM tasks +LEFT JOIN workers ON (tasks.worker_id = workers.id) +WHERE tasks.job_id = @job_id; + +-- name: FetchTasksOfJobInStatus :many +SELECT sqlc.embed(tasks), workers.UUID as workerUUID +FROM tasks +LEFT JOIN workers ON (tasks.worker_id = workers.id) +WHERE tasks.job_id = @job_id + AND tasks.status in (sqlc.slice('task_status')); + -- name: FetchTaskJobUUID :one SELECT jobs.UUID as jobUUID FROM tasks diff --git a/internal/manager/persistence/sqlc/query_jobs.sql.go b/internal/manager/persistence/sqlc/query_jobs.sql.go index 5adb0878..38f5f84f 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql.go +++ b/internal/manager/persistence/sqlc/query_jobs.sql.go @@ -285,6 +285,123 @@ func (q *Queries) FetchTaskJobUUID(ctx context.Context, uuid string) (sql.NullSt return jobuuid, err } +const fetchTasksOfJob = `-- name: FetchTasksOfJob :many +SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, workers.UUID as workerUUID +FROM tasks +LEFT JOIN workers ON (tasks.worker_id = workers.id) +WHERE tasks.job_id = ?1 +` + +type FetchTasksOfJobRow struct { + Task Task + WorkerUUID sql.NullString +} + +func (q *Queries) FetchTasksOfJob(ctx context.Context, jobID int64) ([]FetchTasksOfJobRow, error) { + rows, err := q.db.QueryContext(ctx, fetchTasksOfJob, jobID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []FetchTasksOfJobRow + for rows.Next() { + var i FetchTasksOfJobRow + if err := rows.Scan( + &i.Task.ID, + &i.Task.CreatedAt, + &i.Task.UpdatedAt, + &i.Task.UUID, + &i.Task.Name, + &i.Task.Type, + &i.Task.JobID, + &i.Task.Priority, + &i.Task.Status, + &i.Task.WorkerID, + &i.Task.LastTouchedAt, + &i.Task.Commands, + &i.Task.Activity, + &i.WorkerUUID, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const fetchTasksOfJobInStatus = `-- name: FetchTasksOfJobInStatus :many +SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, workers.UUID as workerUUID +FROM tasks +LEFT JOIN workers ON (tasks.worker_id = workers.id) +WHERE tasks.job_id = ?1 + AND tasks.status in (/*SLICE:task_status*/?) +` + +type FetchTasksOfJobInStatusParams struct { + JobID int64 + TaskStatus []string +} + +type FetchTasksOfJobInStatusRow struct { + Task Task + WorkerUUID sql.NullString +} + +func (q *Queries) FetchTasksOfJobInStatus(ctx context.Context, arg FetchTasksOfJobInStatusParams) ([]FetchTasksOfJobInStatusRow, error) { + query := fetchTasksOfJobInStatus + var queryParams []interface{} + queryParams = append(queryParams, arg.JobID) + if len(arg.TaskStatus) > 0 { + for _, v := range arg.TaskStatus { + queryParams = append(queryParams, v) + } + query = strings.Replace(query, "/*SLICE:task_status*/?", strings.Repeat(",?", len(arg.TaskStatus))[1:], 1) + } else { + query = strings.Replace(query, "/*SLICE:task_status*/?", "NULL", 1) + } + rows, err := q.db.QueryContext(ctx, query, queryParams...) + if err != nil { + return nil, err + } + defer rows.Close() + var items []FetchTasksOfJobInStatusRow + for rows.Next() { + var i FetchTasksOfJobInStatusRow + if err := rows.Scan( + &i.Task.ID, + &i.Task.CreatedAt, + &i.Task.UpdatedAt, + &i.Task.UUID, + &i.Task.Name, + &i.Task.Type, + &i.Task.JobID, + &i.Task.Priority, + &i.Task.Status, + &i.Task.WorkerID, + &i.Task.LastTouchedAt, + &i.Task.Commands, + &i.Task.Activity, + &i.WorkerUUID, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const fetchTasksOfWorkerInStatus = `-- name: FetchTasksOfWorkerInStatus :many SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, jobs.UUID as jobUUID FROM tasks