From 29419cb30efe97cd6527a255f602da586d0922a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Thu, 26 Sep 2024 22:24:58 +0200 Subject: [PATCH] Manager: convert final sleep schedule queries to sqlc Ref: #104305 --- .../persistence/sqlc/query_workers.sql | 61 ++++++ .../persistence/sqlc/query_workers.sql.go | 189 ++++++++++++++++++ .../persistence/worker_sleep_schedule.go | 89 ++++++--- .../persistence/worker_sleep_schedule_test.go | 34 ++-- 4 files changed, 322 insertions(+), 51 deletions(-) diff --git a/internal/manager/persistence/sqlc/query_workers.sql b/internal/manager/persistence/sqlc/query_workers.sql index 64cc7974..60ddf639 100644 --- a/internal/manager/persistence/sqlc/query_workers.sql +++ b/internal/manager/persistence/sqlc/query_workers.sql @@ -50,6 +50,10 @@ WHERE deleted_at IS NULL; -- FetchWorker only returns the worker if it wasn't soft-deleted. SELECT * FROM workers WHERE workers.uuid = @uuid and deleted_at is NULL; +-- name: FetchWorkerByID :one +-- FetchWorkerByID only returns the worker if it wasn't soft-deleted. +SELECT * FROM workers WHERE workers.id = @worker_id and deleted_at is NULL; + -- name: FetchWorkerUnconditional :one -- FetchWorkerUnconditional ignores soft-deletion status and just returns the worker. SELECT * FROM workers WHERE workers.uuid = @uuid; @@ -166,3 +170,60 @@ SELECT sleep_schedules.* FROM sleep_schedules INNER JOIN workers on workers.id = sleep_schedules.worker_id WHERE workers.uuid = @workerUUID; + +-- name: SetWorkerSleepSchedule :execlastid +-- Note that the use of ?2 and ?3 in the SQL is not desirable, and should be +-- replaced with @updated_at and @job_id as soon as sqlc issue #3334 is fixed. +-- See https://github.com/sqlc-dev/sqlc/issues/3334 for more info. +INSERT INTO sleep_schedules ( + created_at, + updated_at, + worker_id, + is_active, + days_of_week, + start_time, + end_time, + next_check +) VALUES ( + @created_at, + @updated_at, + @worker_id, + @is_active, + @days_of_week, + @start_time, + @end_time, + @next_check +) +ON CONFLICT DO UPDATE + SET updated_at=?2, is_active=?4, days_of_week=?5, start_time=?6, end_time=?7, next_check=?8 + WHERE worker_id=?3; + +-- name: SetWorkerSleepScheduleNextCheck :execrows +UPDATE sleep_schedules +SET next_check=@next_check +WHERE ID=@schedule_id; + + +-- name: FetchSleepSchedulesToCheck :many +SELECT * FROM sleep_schedules +WHERE is_active +AND (next_check <= @next_check OR next_check IS NULL OR next_check = ''); + +-- name: Test_CreateWorkerSleepSchedule :execlastid +INSERT INTO sleep_schedules ( + created_at, + worker_id, + is_active, + days_of_week, + start_time, + end_time, + next_check +) VALUES ( + @created_at, + @worker_id, + @is_active, + @days_of_week, + @start_time, + @end_time, + @next_check +); diff --git a/internal/manager/persistence/sqlc/query_workers.sql.go b/internal/manager/persistence/sqlc/query_workers.sql.go index c2385c51..d6cbc97b 100644 --- a/internal/manager/persistence/sqlc/query_workers.sql.go +++ b/internal/manager/persistence/sqlc/query_workers.sql.go @@ -144,6 +144,45 @@ func (q *Queries) DeleteWorkerTag(ctx context.Context, uuid string) (int64, erro return result.RowsAffected() } +const fetchSleepSchedulesToCheck = `-- name: FetchSleepSchedulesToCheck :many +SELECT id, created_at, updated_at, worker_id, is_active, days_of_week, start_time, end_time, next_check FROM sleep_schedules +WHERE is_active +AND (next_check <= ?1 OR next_check IS NULL OR next_check = '') +` + +func (q *Queries) FetchSleepSchedulesToCheck(ctx context.Context, nextCheck sql.NullTime) ([]SleepSchedule, error) { + rows, err := q.db.QueryContext(ctx, fetchSleepSchedulesToCheck, nextCheck) + if err != nil { + return nil, err + } + defer rows.Close() + var items []SleepSchedule + for rows.Next() { + var i SleepSchedule + if err := rows.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.WorkerID, + &i.IsActive, + &i.DaysOfWeek, + &i.StartTime, + &i.EndTime, + &i.NextCheck, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const fetchTagsOfWorker = `-- name: FetchTagsOfWorker :many SELECT worker_tags.id, worker_tags.created_at, worker_tags.updated_at, worker_tags.uuid, worker_tags.name, worker_tags.description FROM worker_tags @@ -276,6 +315,35 @@ func (q *Queries) FetchWorker(ctx context.Context, uuid string) (Worker, error) return i, err } +const fetchWorkerByID = `-- name: FetchWorkerByID :one +SELECT id, created_at, updated_at, uuid, secret, name, address, platform, software, status, last_seen_at, status_requested, lazy_status_request, supported_task_types, deleted_at, can_restart FROM workers WHERE workers.id = ?1 and deleted_at is NULL +` + +// FetchWorkerByID only returns the worker if it wasn't soft-deleted. +func (q *Queries) FetchWorkerByID(ctx context.Context, workerID int64) (Worker, error) { + row := q.db.QueryRowContext(ctx, fetchWorkerByID, workerID) + var i Worker + err := row.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.UUID, + &i.Secret, + &i.Name, + &i.Address, + &i.Platform, + &i.Software, + &i.Status, + &i.LastSeenAt, + &i.StatusRequested, + &i.LazyStatusRequest, + &i.SupportedTaskTypes, + &i.DeletedAt, + &i.CanRestart, + ) + return i, err +} + const fetchWorkerSleepSchedule = `-- name: FetchWorkerSleepSchedule :one SELECT sleep_schedules.id, sleep_schedules.created_at, sleep_schedules.updated_at, sleep_schedules.worker_id, sleep_schedules.is_active, sleep_schedules.days_of_week, sleep_schedules.start_time, sleep_schedules.end_time, sleep_schedules.next_check FROM sleep_schedules @@ -640,6 +708,81 @@ func (q *Queries) SaveWorkerTag(ctx context.Context, arg SaveWorkerTagParams) er return err } +const setWorkerSleepSchedule = `-- name: SetWorkerSleepSchedule :execlastid +INSERT INTO sleep_schedules ( + created_at, + updated_at, + worker_id, + is_active, + days_of_week, + start_time, + end_time, + next_check +) VALUES ( + ?1, + ?2, + ?3, + ?4, + ?5, + ?6, + ?7, + ?8 +) +ON CONFLICT DO UPDATE + SET updated_at=?2, is_active=?4, days_of_week=?5, start_time=?6, end_time=?7, next_check=?8 + WHERE worker_id=?3 +` + +type SetWorkerSleepScheduleParams struct { + CreatedAt time.Time + UpdatedAt sql.NullTime + WorkerID int64 + IsActive bool + DaysOfWeek string + StartTime string + EndTime string + NextCheck sql.NullTime +} + +// Note that the use of ?2 and ?3 in the SQL is not desirable, and should be +// replaced with @updated_at and @job_id as soon as sqlc issue #3334 is fixed. +// See https://github.com/sqlc-dev/sqlc/issues/3334 for more info. +func (q *Queries) SetWorkerSleepSchedule(ctx context.Context, arg SetWorkerSleepScheduleParams) (int64, error) { + result, err := q.db.ExecContext(ctx, setWorkerSleepSchedule, + arg.CreatedAt, + arg.UpdatedAt, + arg.WorkerID, + arg.IsActive, + arg.DaysOfWeek, + arg.StartTime, + arg.EndTime, + arg.NextCheck, + ) + if err != nil { + return 0, err + } + return result.LastInsertId() +} + +const setWorkerSleepScheduleNextCheck = `-- name: SetWorkerSleepScheduleNextCheck :execrows +UPDATE sleep_schedules +SET next_check=?1 +WHERE ID=?2 +` + +type SetWorkerSleepScheduleNextCheckParams struct { + NextCheck sql.NullTime + ScheduleID int64 +} + +func (q *Queries) SetWorkerSleepScheduleNextCheck(ctx context.Context, arg SetWorkerSleepScheduleNextCheckParams) (int64, error) { + result, err := q.db.ExecContext(ctx, setWorkerSleepScheduleNextCheck, arg.NextCheck, arg.ScheduleID) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + const softDeleteWorker = `-- name: SoftDeleteWorker :execrows UPDATE workers SET deleted_at=?1 WHERE uuid=?2 @@ -692,6 +835,52 @@ func (q *Queries) SummarizeWorkerStatuses(ctx context.Context) ([]SummarizeWorke return items, nil } +const test_CreateWorkerSleepSchedule = `-- name: Test_CreateWorkerSleepSchedule :execlastid +INSERT INTO sleep_schedules ( + created_at, + worker_id, + is_active, + days_of_week, + start_time, + end_time, + next_check +) VALUES ( + ?1, + ?2, + ?3, + ?4, + ?5, + ?6, + ?7 +) +` + +type Test_CreateWorkerSleepScheduleParams struct { + CreatedAt time.Time + WorkerID int64 + IsActive bool + DaysOfWeek string + StartTime string + EndTime string + NextCheck sql.NullTime +} + +func (q *Queries) Test_CreateWorkerSleepSchedule(ctx context.Context, arg Test_CreateWorkerSleepScheduleParams) (int64, error) { + result, err := q.db.ExecContext(ctx, test_CreateWorkerSleepSchedule, + arg.CreatedAt, + arg.WorkerID, + arg.IsActive, + arg.DaysOfWeek, + arg.StartTime, + arg.EndTime, + arg.NextCheck, + ) + if err != nil { + return 0, err + } + return result.LastInsertId() +} + const workerAddTagMembership = `-- name: WorkerAddTagMembership :exec INSERT INTO worker_tag_membership (worker_tag_id, worker_id) VALUES (?1, ?2) diff --git a/internal/manager/persistence/worker_sleep_schedule.go b/internal/manager/persistence/worker_sleep_schedule.go index 8ac9bf83..916c19c0 100644 --- a/internal/manager/persistence/worker_sleep_schedule.go +++ b/internal/manager/persistence/worker_sleep_schedule.go @@ -10,7 +10,6 @@ import ( "time" "github.com/rs/zerolog/log" - "gorm.io/gorm/clause" "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" ) @@ -69,13 +68,24 @@ func (db *DB) SetWorkerSleepSchedule(ctx context.Context, workerUUID string, sch schedule.NextCheck = schedule.NextCheck.UTC() } - tx := db.gormDB.WithContext(ctx). - Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "worker_id"}}, - UpdateAll: true, - }). - Create(&schedule) - return tx.Error + queries := db.queries() + params := sqlc.SetWorkerSleepScheduleParams{ + CreatedAt: db.gormDB.NowFunc(), + UpdatedAt: db.now(), + WorkerID: int64(schedule.WorkerID), + IsActive: schedule.IsActive, + DaysOfWeek: schedule.DaysOfWeek, + StartTime: schedule.StartTime.String(), + EndTime: schedule.EndTime.String(), + NextCheck: sql.NullTime{Time: schedule.NextCheck, Valid: !schedule.NextCheck.IsZero()}, + } + + id, err := queries.SetWorkerSleepSchedule(ctx, params) + if err != nil { + return fmt.Errorf("storing worker %q sleep schedule: %w", workerUUID, err) + } + schedule.ID = uint(id) + return nil } func (db *DB) SetWorkerSleepScheduleNextCheck(ctx context.Context, schedule *SleepSchedule) error { @@ -84,47 +94,60 @@ func (db *DB) SetWorkerSleepScheduleNextCheck(ctx context.Context, schedule *Sle schedule.NextCheck = schedule.NextCheck.UTC() } - tx := db.gormDB.WithContext(ctx). - Select("next_check"). - Updates(schedule) - return tx.Error + queries := db.queries() + numAffected, err := queries.SetWorkerSleepScheduleNextCheck( + ctx, + sqlc.SetWorkerSleepScheduleNextCheckParams{ + ScheduleID: int64(schedule.ID), + NextCheck: sql.NullTime{Time: schedule.NextCheck, Valid: !schedule.NextCheck.IsZero()}, + }) + if err != nil { + return fmt.Errorf("updating worker sleep schedule: %w", err) + } + if numAffected < 1 { + return fmt.Errorf("could not find worker sleep schedule ID %d", schedule.ID) + } + return nil } // FetchSleepScheduleWorker sets the given schedule's `Worker` pointer. func (db *DB) FetchSleepScheduleWorker(ctx context.Context, schedule *SleepSchedule) error { - var worker Worker - tx := db.gormDB.WithContext(ctx).Limit(1).Find(&worker, schedule.WorkerID) - if tx.Error != nil { - return workerError(tx.Error, "finding worker by their sleep schedule") - } - if worker.ID == 0 { - // Worker was not found. It could be that the worker was soft-deleted, which - // keeps the schedule around in the database. + queries := db.queries() + + worker, err := queries.FetchWorkerByID(ctx, int64(schedule.WorkerID)) + if err != nil { schedule.Worker = nil - return ErrWorkerNotFound + return workerError(err, "finding worker by their sleep schedule") } - schedule.Worker = &worker + + schedule.Worker = convertSqlcWorker(worker) return nil } // FetchSleepSchedulesToCheck returns the sleep schedules that are due for a check. func (db *DB) FetchSleepSchedulesToCheck(ctx context.Context) ([]*SleepSchedule, error) { - now := db.gormDB.NowFunc() + now := db.now() log.Debug(). - Str("timeout", now.String()). + Str("timeout", now.Time.String()). Msg("fetching sleep schedules that need checking") - schedules := []*SleepSchedule{} - tx := db.gormDB.WithContext(ctx). - Model(&SleepSchedule{}). - Where("is_active = ?", true). - Where("next_check <= ? or next_check is NULL or next_check = ''", now). - Scan(&schedules) - if tx.Error != nil { - return nil, tx.Error + queries := db.queries() + schedules, err := queries.FetchSleepSchedulesToCheck(ctx, now) + if err != nil { + return nil, err } - return schedules, nil + + gormSchedules := make([]*SleepSchedule, len(schedules)) + for index := range schedules { + gormSched, err := convertSqlcSleepSchedule(schedules[index]) + if err != nil { + return nil, err + } + gormSchedules[index] = gormSched + } + + return gormSchedules, nil } func convertSqlcSleepSchedule(sqlcSchedule sqlc.SleepSchedule) (*SleepSchedule, error) { diff --git a/internal/manager/persistence/worker_sleep_schedule_test.go b/internal/manager/persistence/worker_sleep_schedule_test.go index 0df40b35..b35f1232 100644 --- a/internal/manager/persistence/worker_sleep_schedule_test.go +++ b/internal/manager/persistence/worker_sleep_schedule_test.go @@ -48,8 +48,8 @@ func TestFetchWorkerSleepSchedule(t *testing.T) { StartTime: TimeOfDay{18, 0}, EndTime: TimeOfDay{9, 0}, } - tx := db.gormDB.Create(&created) - require.NoError(t, tx.Error) + err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, &created) + require.NoError(t, err) fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) require.NoError(t, err) @@ -82,8 +82,8 @@ func TestFetchSleepScheduleWorker(t *testing.T) { StartTime: TimeOfDay{18, 0}, EndTime: TimeOfDay{9, 0}, } - tx := db.gormDB.Create(&created) - require.NoError(t, tx.Error) + err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, &created) + require.NoError(t, err) dbSchedule, err := db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) require.NoError(t, err) @@ -190,26 +190,24 @@ func TestSetWorkerSleepScheduleNextCheck(t *testing.T) { ctx, finish, db := persistenceTestFixtures(1 * time.Second) defer finish() + w := linuxWorker(t, db, func(worker *Worker) { + worker.Status = api.WorkerStatusAwake + }) + schedule := SleepSchedule{ - Worker: &Worker{ - UUID: "2b1f857a-fd64-484b-9c17-cf89bbe47be7", - Name: "дрон 1", - Status: api.WorkerStatusAwake, - }, + Worker: &w, IsActive: true, DaysOfWeek: "mo,tu,th,fr", StartTime: TimeOfDay{18, 0}, EndTime: TimeOfDay{9, 0}, } - // Use GORM to create the worker and sleep schedule in one go. - if tx := db.gormDB.Create(&schedule); tx.Error != nil { - panic(tx.Error) - } + err := db.SetWorkerSleepSchedule(ctx, w.UUID, &schedule) + require.NoError(t, err) future := db.gormDB.NowFunc().Add(5 * time.Hour) schedule.NextCheck = future - err := db.SetWorkerSleepScheduleNextCheck(ctx, &schedule) + err = db.SetWorkerSleepScheduleNextCheck(ctx, &schedule) require.NoError(t, err) fetched, err := db.FetchWorkerSleepSchedule(ctx, schedule.Worker.UUID) @@ -283,12 +281,12 @@ func TestFetchSleepSchedulesToCheck(t *testing.T) { NextCheck: mockedPast, // next check in the past, so if active it would be checked. } - // Use GORM to create the workers and sleep schedules in one go. + // Create the workers and sleep schedules. scheds := []*SleepSchedule{&schedule0, &schedule1, &schedule2, &schedule3} for idx := range scheds { - if tx := db.gormDB.Create(scheds[idx]); tx.Error != nil { - panic(tx.Error) - } + saveTestWorker(t, db, scheds[idx].Worker) + err := db.SetWorkerSleepSchedule(ctx, scheds[idx].Worker.UUID, scheds[idx]) + require.NoError(t, err) } toCheck, err := db.FetchSleepSchedulesToCheck(ctx)