From 7a9f809c43e21ffbfc436b358ef86ce791ae7c44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Sun, 26 May 2024 12:38:09 +0200 Subject: [PATCH] Manager: convert more worker functions to sqlc No functional changes. --- .../persistence/sqlc/query_workers.sql | 44 ++++ .../persistence/sqlc/query_workers.sql.go | 202 ++++++++++++++++++ internal/manager/persistence/workers.go | 129 +++++++---- 3 files changed, 335 insertions(+), 40 deletions(-) diff --git a/internal/manager/persistence/sqlc/query_workers.sql b/internal/manager/persistence/sqlc/query_workers.sql index f34179a0..a96d84cf 100644 --- a/internal/manager/persistence/sqlc/query_workers.sql +++ b/internal/manager/persistence/sqlc/query_workers.sql @@ -40,6 +40,10 @@ RETURNING id; INSERT INTO worker_tag_membership (worker_tag_id, worker_id) VALUES (@worker_tag_id, @worker_id); +-- name: FetchWorkers :many +SELECT sqlc.embed(workers) FROM workers +WHERE deleted_at IS NULL; + -- name: FetchWorker :one -- FetchWorker only returns the worker if it wasn't soft-deleted. SELECT * FROM workers WHERE workers.uuid = @uuid and deleted_at is NULL; @@ -54,3 +58,43 @@ FROM worker_tags LEFT JOIN worker_tag_membership m ON (m.worker_tag_id = worker_tags.id) LEFT JOIN workers on (m.worker_id = workers.id) WHERE workers.uuid = @uuid; + +-- name: SoftDeleteWorker :execrows +UPDATE workers SET deleted_at=@deleted_at +WHERE uuid=@uuid; + +-- name: SaveWorkerStatus :exec +UPDATE workers SET + updated_at=@updated_at, + status=@status, + status_requested=@status_requested, + lazy_status_request=@lazy_status_request +WHERE id=@id; + +-- name: SaveWorker :exec +UPDATE workers SET + updated_at=@updated_at, + uuid=@uuid, + secret=@secret, + name=@name, + address=@address, + platform=@platform, + software=@software, + status=@status, + last_seen_at=@last_seen_at, + status_requested=@status_requested, + lazy_status_request=@lazy_status_request, + supported_task_types=@supported_task_types, + can_restart=@can_restart +WHERE id=@id; + +-- name: WorkerSeen :exec +UPDATE workers SET + updated_at=@updated_at, + last_seen_at=@last_seen_at +WHERE id=@id; + +-- name: SummarizeWorkerStatuses :many +SELECT status, count(id) as status_count FROM workers +WHERE deleted_at is NULL +GROUP BY status; diff --git a/internal/manager/persistence/sqlc/query_workers.sql.go b/internal/manager/persistence/sqlc/query_workers.sql.go index 0f652889..f238a280 100644 --- a/internal/manager/persistence/sqlc/query_workers.sql.go +++ b/internal/manager/persistence/sqlc/query_workers.sql.go @@ -198,3 +198,205 @@ func (q *Queries) FetchWorkerUnconditional(ctx context.Context, uuid string) (Wo ) return i, err } + +const fetchWorkers = `-- name: FetchWorkers :many +SELECT workers.id, workers.created_at, workers.updated_at, workers.uuid, workers.secret, workers.name, workers.address, workers.platform, workers.software, workers.status, workers.last_seen_at, workers.status_requested, workers.lazy_status_request, workers.supported_task_types, workers.deleted_at, workers.can_restart FROM workers +WHERE deleted_at IS NULL +` + +type FetchWorkersRow struct { + Worker Worker +} + +func (q *Queries) FetchWorkers(ctx context.Context) ([]FetchWorkersRow, error) { + rows, err := q.db.QueryContext(ctx, fetchWorkers) + if err != nil { + return nil, err + } + defer rows.Close() + var items []FetchWorkersRow + for rows.Next() { + var i FetchWorkersRow + if err := rows.Scan( + &i.Worker.ID, + &i.Worker.CreatedAt, + &i.Worker.UpdatedAt, + &i.Worker.UUID, + &i.Worker.Secret, + &i.Worker.Name, + &i.Worker.Address, + &i.Worker.Platform, + &i.Worker.Software, + &i.Worker.Status, + &i.Worker.LastSeenAt, + &i.Worker.StatusRequested, + &i.Worker.LazyStatusRequest, + &i.Worker.SupportedTaskTypes, + &i.Worker.DeletedAt, + &i.Worker.CanRestart, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const saveWorker = `-- name: SaveWorker :exec +UPDATE workers SET + updated_at=?1, + uuid=?2, + secret=?3, + name=?4, + address=?5, + platform=?6, + software=?7, + status=?8, + last_seen_at=?9, + status_requested=?10, + lazy_status_request=?11, + supported_task_types=?12, + can_restart=?13 +WHERE id=?14 +` + +type SaveWorkerParams struct { + UpdatedAt sql.NullTime + UUID string + Secret string + Name string + Address string + Platform string + Software string + Status string + LastSeenAt sql.NullTime + StatusRequested string + LazyStatusRequest bool + SupportedTaskTypes string + CanRestart bool + ID int64 +} + +func (q *Queries) SaveWorker(ctx context.Context, arg SaveWorkerParams) error { + _, err := q.db.ExecContext(ctx, saveWorker, + arg.UpdatedAt, + arg.UUID, + arg.Secret, + arg.Name, + arg.Address, + arg.Platform, + arg.Software, + arg.Status, + arg.LastSeenAt, + arg.StatusRequested, + arg.LazyStatusRequest, + arg.SupportedTaskTypes, + arg.CanRestart, + arg.ID, + ) + return err +} + +const saveWorkerStatus = `-- name: SaveWorkerStatus :exec +UPDATE workers SET + updated_at=?1, + status=?2, + status_requested=?3, + lazy_status_request=?4 +WHERE id=?5 +` + +type SaveWorkerStatusParams struct { + UpdatedAt sql.NullTime + Status string + StatusRequested string + LazyStatusRequest bool + ID int64 +} + +func (q *Queries) SaveWorkerStatus(ctx context.Context, arg SaveWorkerStatusParams) error { + _, err := q.db.ExecContext(ctx, saveWorkerStatus, + arg.UpdatedAt, + arg.Status, + arg.StatusRequested, + arg.LazyStatusRequest, + arg.ID, + ) + return err +} + +const softDeleteWorker = `-- name: SoftDeleteWorker :execrows +UPDATE workers SET deleted_at=?1 +WHERE uuid=?2 +` + +type SoftDeleteWorkerParams struct { + DeletedAt sql.NullTime + UUID string +} + +func (q *Queries) SoftDeleteWorker(ctx context.Context, arg SoftDeleteWorkerParams) (int64, error) { + result, err := q.db.ExecContext(ctx, softDeleteWorker, arg.DeletedAt, arg.UUID) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + +const summarizeWorkerStatuses = `-- name: SummarizeWorkerStatuses :many +SELECT status, count(id) as status_count FROM workers +WHERE deleted_at is NULL +GROUP BY status +` + +type SummarizeWorkerStatusesRow struct { + Status string + StatusCount int64 +} + +func (q *Queries) SummarizeWorkerStatuses(ctx context.Context) ([]SummarizeWorkerStatusesRow, error) { + rows, err := q.db.QueryContext(ctx, summarizeWorkerStatuses) + if err != nil { + return nil, err + } + defer rows.Close() + var items []SummarizeWorkerStatusesRow + for rows.Next() { + var i SummarizeWorkerStatusesRow + if err := rows.Scan(&i.Status, &i.StatusCount); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const workerSeen = `-- name: WorkerSeen :exec +UPDATE workers SET + updated_at=?1, + last_seen_at=?2 +WHERE id=?3 +` + +type WorkerSeenParams struct { + UpdatedAt sql.NullTime + LastSeenAt sql.NullTime + ID int64 +} + +func (q *Queries) WorkerSeen(ctx context.Context, arg WorkerSeenParams) error { + _, err := q.db.ExecContext(ctx, workerSeen, arg.UpdatedAt, arg.LastSeenAt, arg.ID) + return err +} diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index 0907556e..62e32c00 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -152,25 +152,41 @@ func (db *DB) DeleteWorker(ctx context.Context, uuid string) error { return ErrDeletingWithoutFK } - tx := db.gormDB.WithContext(ctx). - Where("uuid = ?", uuid). - Delete(&Worker{}) - if tx.Error != nil { - return workerError(tx.Error, "deleting worker") + queries, err := db.queries() + if err != nil { + return err } - if tx.RowsAffected == 0 { + + rowsAffected, err := queries.SoftDeleteWorker(ctx, sqlc.SoftDeleteWorkerParams{ + DeletedAt: db.now(), + UUID: uuid, + }) + if err != nil { + return workerError(err, "deleting worker") + } + if rowsAffected == 0 { return ErrWorkerNotFound } return nil } func (db *DB) FetchWorkers(ctx context.Context) ([]*Worker, error) { - workers := make([]*Worker, 0) - tx := db.gormDB.WithContext(ctx).Model(&Worker{}).Scan(&workers) - if tx.Error != nil { - return nil, workerError(tx.Error, "fetching all workers") + queries, err := db.queries() + if err != nil { + return nil, err } - return workers, nil + + workers, err := queries.FetchWorkers(ctx) + if err != nil { + return nil, workerError(err, "fetching all workers") + } + + gormWorkers := make([]*Worker, len(workers)) + for idx := range workers { + worker := convertSqlcWorker(workers[idx].Worker) + gormWorkers[idx] = &worker + } + return gormWorkers, nil } // FetchWorkerTask returns the most recent task assigned to the given Worker. @@ -210,22 +226,52 @@ func (db *DB) FetchWorkerTask(ctx context.Context, worker *Worker) (*Task, error } func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error { - err := db.gormDB.WithContext(ctx). - Model(w). - Select("status", "status_requested", "lazy_status_request"). - Updates(Worker{ - Status: w.Status, - StatusRequested: w.StatusRequested, - LazyStatusRequest: w.LazyStatusRequest, - }).Error + queries, err := db.queries() if err != nil { - return fmt.Errorf("saving worker: %w", err) + return err + } + + err = queries.SaveWorkerStatus(ctx, sqlc.SaveWorkerStatusParams{ + UpdatedAt: db.now(), + Status: string(w.Status), + StatusRequested: string(w.StatusRequested), + LazyStatusRequest: w.LazyStatusRequest, + ID: int64(w.ID), + }) + if err != nil { + return fmt.Errorf("saving worker status: %w", err) } return nil } func (db *DB) SaveWorker(ctx context.Context, w *Worker) error { - if err := db.gormDB.WithContext(ctx).Save(w).Error; err != nil { + // TODO: remove this code, and just let the caller call CreateWorker() directly. + if w.ID == 0 { + return db.CreateWorker(ctx, w) + } + + queries, err := db.queries() + if err != nil { + return err + } + + err = queries.SaveWorker(ctx, sqlc.SaveWorkerParams{ + UpdatedAt: db.now(), + UUID: w.UUID, + Secret: w.Secret, + Name: w.Name, + Address: w.Address, + Platform: w.Platform, + Software: w.Software, + Status: string(w.Status), + LastSeenAt: sql.NullTime{Time: w.LastSeenAt, Valid: !w.LastSeenAt.IsZero()}, + StatusRequested: string(w.StatusRequested), + LazyStatusRequest: w.LazyStatusRequest, + SupportedTaskTypes: w.SupportedTaskTypes, + CanRestart: w.CanRestart, + ID: int64(w.ID), + }) + if err != nil { return fmt.Errorf("saving worker: %w", err) } return nil @@ -233,10 +279,18 @@ func (db *DB) SaveWorker(ctx context.Context, w *Worker) error { // WorkerSeen marks the worker as 'seen' by this Manager. This is used for timeout detection. func (db *DB) WorkerSeen(ctx context.Context, w *Worker) error { - tx := db.gormDB.WithContext(ctx). - Model(w). - Updates(Worker{LastSeenAt: db.gormDB.NowFunc()}) - if err := tx.Error; err != nil { + queries, err := db.queries() + if err != nil { + return err + } + + now := db.now() + err = queries.WorkerSeen(ctx, sqlc.WorkerSeenParams{ + UpdatedAt: now, + LastSeenAt: now, + ID: int64(w.ID), + }) + if err != nil { return workerError(err, "saving worker 'last seen at'") } return nil @@ -249,24 +303,19 @@ func (db *DB) SummarizeWorkerStatuses(ctx context.Context) (WorkerStatusCount, e logger := log.Ctx(ctx) logger.Debug().Msg("database: summarizing worker statuses") - // Query the database using a data structure that's easy to handle in GORM. - type queryResult struct { - Status api.WorkerStatus - StatusCount int - } - result := []*queryResult{} - tx := db.gormDB.WithContext(ctx).Model(&Worker{}). - Select("status as Status", "count(id) as StatusCount"). - Group("status"). - Scan(&result) - if tx.Error != nil { - return nil, workerError(tx.Error, "summarizing worker statuses") + queries, err := db.queries() + if err != nil { + return nil, err + } + + rows, err := queries.SummarizeWorkerStatuses(ctx) + if err != nil { + return nil, workerError(err, "summarizing worker statuses") } - // Convert the array-of-structs to a map that's easier to handle by the caller. statusCounts := make(WorkerStatusCount) - for _, singleStatusCount := range result { - statusCounts[singleStatusCount.Status] = singleStatusCount.StatusCount + for _, row := range rows { + statusCounts[api.WorkerStatus(row.Status)] = int(row.StatusCount) } return statusCounts, nil