From b102b73a1f65367ac8c0bb4957646a7960fa675b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Sun, 3 Mar 2024 22:42:19 +0100 Subject: [PATCH] Refactor: convert more job functions to sqlc No functional changes. --- internal/manager/persistence/db.go | 9 + internal/manager/persistence/jobs.go | 158 ++++++++------- internal/manager/persistence/sqlc/query.sql | 25 +++ .../manager/persistence/sqlc/query.sql.go | 187 ++++++++++++++++++ sqlc.yaml | 1 + 5 files changed, 312 insertions(+), 68 deletions(-) diff --git a/internal/manager/persistence/db.go b/internal/manager/persistence/db.go index 77574f7a..6eb299f3 100644 --- a/internal/manager/persistence/db.go +++ b/internal/manager/persistence/db.go @@ -5,6 +5,7 @@ package persistence import ( "context" + "database/sql" "fmt" "time" @@ -182,6 +183,14 @@ func (db *DB) queries() (*sqlc.Queries, error) { return sqlc.New(sqldb), nil } +// now returns the result of `nowFunc()` wrapped in a sql.NullTime. +func (db *DB) now() sql.NullTime { + return sql.NullTime{ + Time: db.gormDB.NowFunc(), + Valid: true, + } +} + func (db *DB) pragmaForeignKeys(enabled bool) error { var ( value int diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 8075f77b..8a2f94d3 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -300,8 +300,7 @@ func (db *DB) RequestJobDeletion(ctx context.Context, j *Job) error { } // Update the given job itself, so we don't have to re-fetch it from the database. - j.DeleteRequestedAt.Time = db.gormDB.NowFunc() - j.DeleteRequestedAt.Valid = true + j.DeleteRequestedAt = db.now() params := sqlc.RequestJobDeletionParams{ Now: j.DeleteRequestedAt, @@ -321,98 +320,114 @@ func (db *DB) RequestJobDeletion(ctx context.Context, j *Job) error { // RequestJobMassDeletion sets multiple job's "DeletionRequestedAt" field to "now". // The list of affected job UUIDs is returned. func (db *DB) RequestJobMassDeletion(ctx context.Context, lastUpdatedMax time.Time) ([]string, error) { - // In order to be able to report which jobs were affected, first fetch the - // list of jobs, then update them. - var jobs []*Job - selectResult := db.gormDB.WithContext(ctx). - Model(&Job{}). - Select("uuid"). - Where("updated_at <= ?", lastUpdatedMax). - Scan(&jobs) - if selectResult.Error != nil { - return nil, jobError(selectResult.Error, "fetching jobs by last-modified timestamp") + queries, err := db.queries() + if err != nil { + return nil, err } - if len(jobs) == 0 { + // In order to be able to report which jobs were affected, first fetch the + // list of jobs, then update them. + uuids, err := queries.FetchJobUUIDsUpdatedBefore(ctx, sql.NullTime{ + Time: lastUpdatedMax, + Valid: true, + }) + switch { + case err != nil: + return nil, jobError(err, "fetching jobs by last-modified timestamp") + case len(uuids) == 0: return nil, ErrJobNotFound } - // Convert array of jobs to array of UUIDs. - uuids := make([]string, len(jobs)) - for index := range jobs { - uuids[index] = jobs[index].UUID - } - // Update the selected jobs. - deleteRequestedAt := sql.NullTime{ - Time: db.gormDB.NowFunc(), - Valid: true, + params := sqlc.RequestMassJobDeletionParams{ + Now: db.now(), + UUIDs: uuids, } - tx := db.gormDB.WithContext(ctx). - Model(Job{}). - Where("uuid in ?", uuids). - Updates(Job{DeleteRequestedAt: deleteRequestedAt}) - if tx.Error != nil { - return nil, jobError(tx.Error, "queueing jobs for deletion") + if err := queries.RequestMassJobDeletion(ctx, params); err != nil { + return nil, jobError(err, "marking jobs as deletion-requested") } return uuids, nil } func (db *DB) FetchJobsDeletionRequested(ctx context.Context) ([]string, error) { - var jobs []*Job - - tx := db.gormDB.WithContext(ctx). - Model(&Job{}). - Select("UUID"). - Where("delete_requested_at is not NULL"). - Order("delete_requested_at"). - Scan(&jobs) - - if tx.Error != nil { - return nil, jobError(tx.Error, "fetching jobs marked for deletion") + queries, err := db.queries() + if err != nil { + return nil, err } - uuids := make([]string, len(jobs)) - for i := range jobs { - uuids[i] = jobs[i].UUID + uuids, err := queries.FetchJobsDeletionRequested(ctx) + if err != nil { + return nil, jobError(err, "fetching jobs marked for deletion") } - return uuids, nil } func (db *DB) FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*Job, error) { - var jobs []*Job - - tx := db.gormDB.WithContext(ctx). - Model(&Job{}). - Where("status in ?", jobStatuses). - Scan(&jobs) - - if tx.Error != nil { - return nil, jobError(tx.Error, "fetching jobs in status %q", jobStatuses) + queries, err := db.queries() + if err != nil { + return nil, err } + + statuses := []string{} + for _, status := range jobStatuses { + statuses = append(statuses, string(status)) + } + + sqlcJobs, err := queries.FetchJobsInStatus(ctx, statuses) + if err != nil { + return nil, jobError(err, "fetching jobs in status %q", jobStatuses) + } + + var jobs []*Job + for index := range sqlcJobs { + job, err := convertSqlcJob(sqlcJobs[index]) + if err != nil { + return nil, jobError(err, "converting fetched jobs in status %q", jobStatuses) + } + jobs = append(jobs, job) + } + return jobs, nil } // SaveJobStatus saves the job's Status and Activity fields. func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error { - tx := db.gormDB.WithContext(ctx). - Model(j). - Updates(Job{Status: j.Status, Activity: j.Activity}) - if tx.Error != nil { - return jobError(tx.Error, "saving job status") + queries, err := db.queries() + if err != nil { + return err + } + + params := sqlc.SaveJobStatusParams{ + Now: db.now(), + ID: int64(j.ID), + Status: string(j.Status), + Activity: j.Activity, + } + + err = queries.SaveJobStatus(ctx, params) + if err != nil { + return jobError(err, "saving job status") } return nil } // SaveJobPriority saves the job's Priority field. func (db *DB) SaveJobPriority(ctx context.Context, j *Job) error { - tx := db.gormDB.WithContext(ctx). - Model(j). - Updates(Job{Priority: j.Priority}) - if tx.Error != nil { - return jobError(tx.Error, "saving job priority") + queries, err := db.queries() + if err != nil { + return err + } + + params := sqlc.SaveJobPriorityParams{ + Now: db.now(), + ID: int64(j.ID), + Priority: int64(j.Priority), + } + + err = queries.SaveJobPriority(ctx, params) + if err != nil { + return jobError(err, "saving job priority") } return nil } @@ -421,12 +436,19 @@ func (db *DB) SaveJobPriority(ctx context.Context, j *Job) error { // NOTE: this function does NOT update the job's `UpdatedAt` field. This is // necessary for `cmd/shaman-checkout-id-setter` to do its work quietly. func (db *DB) SaveJobStorageInfo(ctx context.Context, j *Job) error { - tx := db.gormDB.WithContext(ctx). - Model(j). - Omit("UpdatedAt"). - Updates(Job{Storage: j.Storage}) - if tx.Error != nil { - return jobError(tx.Error, "saving job storage") + queries, err := db.queries() + if err != nil { + return err + } + + params := sqlc.SaveJobStorageInfoParams{ + ID: int64(j.ID), + StorageShamanCheckoutID: j.Storage.ShamanCheckoutID, + } + + err = queries.SaveJobStorageInfo(ctx, params) + if err != nil { + return jobError(err, "saving job storage") } return nil } diff --git a/internal/manager/persistence/sqlc/query.sql b/internal/manager/persistence/sqlc/query.sql index 61583958..0f606454 100644 --- a/internal/manager/persistence/sqlc/query.sql +++ b/internal/manager/persistence/sqlc/query.sql @@ -30,3 +30,28 @@ UPDATE jobs SET delete_requested_at = @now WHERE id = sqlc.arg('job_id'); +-- name: FetchJobUUIDsUpdatedBefore :many +SELECT uuid FROM jobs WHERE updated_at <= @updated_at_max; + +-- name: RequestMassJobDeletion :exec +UPDATE jobs SET + updated_at = @now, + delete_requested_at = @now +WHERE uuid in (sqlc.slice('uuids')); + +-- name: FetchJobsDeletionRequested :many +SELECT uuid FROM jobs + WHERE delete_requested_at is not NULL + ORDER BY delete_requested_at; + +-- name: FetchJobsInStatus :many +SELECT * FROM jobs WHERE status IN (sqlc.slice('statuses')); + +-- name: SaveJobStatus :exec +UPDATE jobs SET updated_at=@now, status=@status, activity=@activity WHERE id=@id; + +-- name: SaveJobPriority :exec +UPDATE jobs SET updated_at=@now, priority=@priority WHERE id=@id; + +-- name: SaveJobStorageInfo :exec +UPDATE jobs SET storage_shaman_checkout_id=@storage_shaman_checkout_id WHERE id=@id; diff --git a/internal/manager/persistence/sqlc/query.sql.go b/internal/manager/persistence/sqlc/query.sql.go index 1597aef2..78f72a9a 100644 --- a/internal/manager/persistence/sqlc/query.sql.go +++ b/internal/manager/persistence/sqlc/query.sql.go @@ -9,6 +9,7 @@ import ( "context" "database/sql" "encoding/json" + "strings" "time" ) @@ -95,6 +96,114 @@ func (q *Queries) FetchJob(ctx context.Context, uuid string) (Job, error) { return i, err } +const fetchJobUUIDsUpdatedBefore = `-- name: FetchJobUUIDsUpdatedBefore :many +SELECT uuid FROM jobs WHERE updated_at <= ?1 +` + +func (q *Queries) FetchJobUUIDsUpdatedBefore(ctx context.Context, updatedAtMax sql.NullTime) ([]string, error) { + rows, err := q.db.QueryContext(ctx, fetchJobUUIDsUpdatedBefore, updatedAtMax) + if err != nil { + return nil, err + } + defer rows.Close() + var items []string + for rows.Next() { + var uuid string + if err := rows.Scan(&uuid); err != nil { + return nil, err + } + items = append(items, uuid) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const fetchJobsDeletionRequested = `-- name: FetchJobsDeletionRequested :many +SELECT uuid FROM jobs + WHERE delete_requested_at is not NULL + ORDER BY delete_requested_at +` + +func (q *Queries) FetchJobsDeletionRequested(ctx context.Context) ([]string, error) { + rows, err := q.db.QueryContext(ctx, fetchJobsDeletionRequested) + if err != nil { + return nil, err + } + defer rows.Close() + var items []string + for rows.Next() { + var uuid string + if err := rows.Scan(&uuid); err != nil { + return nil, err + } + items = append(items, uuid) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const fetchJobsInStatus = `-- name: FetchJobsInStatus :many +SELECT id, created_at, updated_at, uuid, name, job_type, priority, status, activity, settings, metadata, delete_requested_at, storage_shaman_checkout_id, worker_tag_id FROM jobs WHERE status IN (/*SLICE:statuses*/?) +` + +func (q *Queries) FetchJobsInStatus(ctx context.Context, statuses []string) ([]Job, error) { + query := fetchJobsInStatus + var queryParams []interface{} + if len(statuses) > 0 { + for _, v := range statuses { + queryParams = append(queryParams, v) + } + query = strings.Replace(query, "/*SLICE:statuses*/?", strings.Repeat(",?", len(statuses))[1:], 1) + } else { + query = strings.Replace(query, "/*SLICE:statuses*/?", "NULL", 1) + } + rows, err := q.db.QueryContext(ctx, query, queryParams...) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Job + for rows.Next() { + var i Job + if err := rows.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.UUID, + &i.Name, + &i.JobType, + &i.Priority, + &i.Status, + &i.Activity, + &i.Settings, + &i.Metadata, + &i.DeleteRequestedAt, + &i.StorageShamanCheckoutID, + &i.WorkerTagID, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const requestJobDeletion = `-- name: RequestJobDeletion :exec UPDATE jobs SET updated_at = ?1, @@ -111,3 +220,81 @@ func (q *Queries) RequestJobDeletion(ctx context.Context, arg RequestJobDeletion _, err := q.db.ExecContext(ctx, requestJobDeletion, arg.Now, arg.JobID) return err } + +const requestMassJobDeletion = `-- name: RequestMassJobDeletion :exec +UPDATE jobs SET + updated_at = ?1, + delete_requested_at = ?1 +WHERE uuid in (/*SLICE:uuids*/?) +` + +type RequestMassJobDeletionParams struct { + Now sql.NullTime + UUIDs []string +} + +func (q *Queries) RequestMassJobDeletion(ctx context.Context, arg RequestMassJobDeletionParams) error { + query := requestMassJobDeletion + var queryParams []interface{} + queryParams = append(queryParams, arg.Now) + if len(arg.UUIDs) > 0 { + for _, v := range arg.UUIDs { + queryParams = append(queryParams, v) + } + query = strings.Replace(query, "/*SLICE:uuids*/?", strings.Repeat(",?", len(arg.UUIDs))[1:], 1) + } else { + query = strings.Replace(query, "/*SLICE:uuids*/?", "NULL", 1) + } + _, err := q.db.ExecContext(ctx, query, queryParams...) + return err +} + +const saveJobPriority = `-- name: SaveJobPriority :exec +UPDATE jobs SET updated_at=?1, priority=?2 WHERE id=?3 +` + +type SaveJobPriorityParams struct { + Now sql.NullTime + Priority int64 + ID int64 +} + +func (q *Queries) SaveJobPriority(ctx context.Context, arg SaveJobPriorityParams) error { + _, err := q.db.ExecContext(ctx, saveJobPriority, arg.Now, arg.Priority, arg.ID) + return err +} + +const saveJobStatus = `-- name: SaveJobStatus :exec +UPDATE jobs SET updated_at=?1, status=?2, activity=?3 WHERE id=?4 +` + +type SaveJobStatusParams struct { + Now sql.NullTime + Status string + Activity string + ID int64 +} + +func (q *Queries) SaveJobStatus(ctx context.Context, arg SaveJobStatusParams) error { + _, err := q.db.ExecContext(ctx, saveJobStatus, + arg.Now, + arg.Status, + arg.Activity, + arg.ID, + ) + return err +} + +const saveJobStorageInfo = `-- name: SaveJobStorageInfo :exec +UPDATE jobs SET storage_shaman_checkout_id=?1 WHERE id=?2 +` + +type SaveJobStorageInfoParams struct { + StorageShamanCheckoutID string + ID int64 +} + +func (q *Queries) SaveJobStorageInfo(ctx context.Context, arg SaveJobStorageInfoParams) error { + _, err := q.db.ExecContext(ctx, saveJobStorageInfo, arg.StorageShamanCheckoutID, arg.ID) + return err +} diff --git a/sqlc.yaml b/sqlc.yaml index a01807b0..560e430d 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -13,3 +13,4 @@ sql: type: "RawMessage" rename: uuid: "UUID" + uuids: "UUIDs"