From 8513e2fdc82bfe37191c81ec4cefe47ddbfe90d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Thu, 26 Sep 2024 22:38:17 +0200 Subject: [PATCH] Manager: replace GORM 'now' function with our own implementation GORM implicitly sets 'created at', 'updated at' and 'deleted at' timestamps to 'now' by calling a 'now function'. This is now implemented by Flamenco directly, instead of relying on GORM. Ref: #104305 --- internal/manager/persistence/db.go | 42 ++++++++++--------- internal/manager/persistence/jobs.go | 28 ++++++------- .../manager/persistence/jobs_blocklist.go | 2 +- internal/manager/persistence/jobs_test.go | 26 ++++++------ internal/manager/persistence/last_rendered.go | 2 +- .../manager/persistence/task_scheduler.go | 2 +- .../persistence/task_scheduler_test.go | 4 +- internal/manager/persistence/test_support.go | 1 - internal/manager/persistence/timeout_test.go | 2 +- .../persistence/worker_sleep_schedule.go | 6 +-- .../persistence/worker_sleep_schedule_test.go | 4 +- internal/manager/persistence/worker_tag.go | 4 +- internal/manager/persistence/workers.go | 10 ++--- internal/manager/persistence/workers_test.go | 2 +- 14 files changed, 69 insertions(+), 66 deletions(-) diff --git a/internal/manager/persistence/db.go b/internal/manager/persistence/db.go index 2cd34262..6ba0048f 100644 --- a/internal/manager/persistence/db.go +++ b/internal/manager/persistence/db.go @@ -18,7 +18,8 @@ import ( // DB provides the database interface. type DB struct { - gormDB *gorm.DB + gormDB *gorm.DB + nowfunc func() time.Time // See PeriodicIntegrityCheck(). consistencyCheckRequests chan struct{} @@ -88,22 +89,15 @@ func openDB(ctx context.Context, dsn string) (*DB, error) { dblogger := NewDBLogger(log.Level(globalLogLevel)) config := gorm.Config{ - Logger: dblogger, - NowFunc: nowFunc, + Logger: dblogger, } return openDBWithConfig(dsn, &config) } func openDBWithConfig(dsn string, config *gorm.Config) (*DB, error) { - dialector := sqlite.Open(dsn) - gormDB, err := gorm.Open(dialector, config) - if err != nil { - return nil, err - } - db := DB{ - gormDB: gormDB, + nowfunc: time.Now, // Buffer one request, so that even when a consistency check is already // running, another can be queued without blocking. Queueing more than one @@ -111,6 +105,15 @@ func openDBWithConfig(dsn string, config *gorm.Config) (*DB, error) { consistencyCheckRequests: make(chan struct{}, 1), } + config.NowFunc = db.now + + dialector := sqlite.Open(dsn) + gormDB, err := gorm.Open(dialector, config) + if err != nil { + return nil, err + } + db.gormDB = gormDB + // Close the database connection if there was some error. This prevents // leaking database connections & should remove any write-ahead-log files. closeConnOnReturn := true @@ -155,12 +158,6 @@ func openDBWithConfig(dsn string, config *gorm.Config) (*DB, error) { return &db, nil } -// nowFunc returns 'now' in UTC, so that GORM-managed times (createdAt, -// deletedAt, updatedAt) are stored in UTC. -func nowFunc() time.Time { - return time.Now().UTC() -} - // vacuum executes the SQL "VACUUM" command, and logs any errors. func (db *DB) vacuum() { tx := db.gormDB.Exec("vacuum") @@ -231,10 +228,17 @@ func (db *DB) queriesWithTX() (*queriesTX, error) { return &qtx, nil } -// now returns the result of `nowFunc()` wrapped in a sql.NullTime. -func (db *DB) now() sql.NullTime { +// now returns 'now' as reported by db.nowfunc. +// It always converts the timestamp to UTC. +func (db *DB) now() time.Time { + return db.nowfunc().UTC() +} + +// nowNullable returns the result of `now()` wrapped in a sql.NullTime. +// It is nullable just for ease of use, it will never actually be null. +func (db *DB) nowNullable() sql.NullTime { return sql.NullTime{ - Time: db.gormDB.NowFunc(), + Time: db.now(), Valid: true, } } diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 7c74adc7..a6851c41 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -167,7 +167,7 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au // Create the job itself. params := sqlc.CreateJobParams{ - CreatedAt: db.gormDB.NowFunc(), + CreatedAt: db.now(), UUID: authoredJob.JobID, Name: authoredJob.Name, JobType: authoredJob.JobType, @@ -246,7 +246,7 @@ func (db *DB) storeAuthoredJobTaks( } // Give every task the same creation timestamp. - now := db.gormDB.NowFunc() + now := db.now() uuidToTask := make(map[string]TaskInfo) for _, authoredTask := range authoredJob.Tasks { @@ -442,7 +442,7 @@ func (db *DB) RequestJobDeletion(ctx context.Context, j *Job) error { queries := db.queries() // Update the given job itself, so we don't have to re-fetch it from the database. - j.DeleteRequestedAt = db.now() + j.DeleteRequestedAt = db.nowNullable() params := sqlc.RequestJobDeletionParams{ Now: j.DeleteRequestedAt, @@ -479,7 +479,7 @@ func (db *DB) RequestJobMassDeletion(ctx context.Context, lastUpdatedMax time.Ti // Update the selected jobs. params := sqlc.RequestMassJobDeletionParams{ - Now: db.now(), + Now: db.nowNullable(), UUIDs: uuids, } if err := queries.RequestMassJobDeletion(ctx, params); err != nil { @@ -529,7 +529,7 @@ func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error { queries := db.queries() params := sqlc.SaveJobStatusParams{ - Now: db.now(), + Now: db.nowNullable(), ID: int64(j.ID), Status: string(j.Status), Activity: j.Activity, @@ -547,7 +547,7 @@ func (db *DB) SaveJobPriority(ctx context.Context, j *Job) error { queries := db.queries() params := sqlc.SaveJobPriorityParams{ - Now: db.now(), + Now: db.nowNullable(), ID: int64(j.ID), Priority: int64(j.Priority), } @@ -670,7 +670,7 @@ func (db *DB) SaveTask(ctx context.Context, t *Task) error { } param := sqlc.UpdateTaskParams{ - UpdatedAt: db.now(), + UpdatedAt: db.nowNullable(), Name: t.Name, Type: t.Type, Priority: int64(t.Priority), @@ -709,7 +709,7 @@ func (db *DB) SaveTaskStatus(ctx context.Context, t *Task) error { queries := db.queries() err := queries.UpdateTaskStatus(ctx, sqlc.UpdateTaskStatusParams{ - UpdatedAt: db.now(), + UpdatedAt: db.nowNullable(), Status: string(t.Status), ID: int64(t.ID), }) @@ -723,7 +723,7 @@ func (db *DB) SaveTaskActivity(ctx context.Context, t *Task) error { queries := db.queries() err := queries.UpdateTaskActivity(ctx, sqlc.UpdateTaskActivityParams{ - UpdatedAt: db.now(), + UpdatedAt: db.nowNullable(), Activity: t.Activity, ID: int64(t.ID), }) @@ -740,7 +740,7 @@ func (db *DB) TaskAssignToWorker(ctx context.Context, t *Task, w *Worker) error queries := db.queries() err := queries.TaskAssignToWorker(ctx, sqlc.TaskAssignToWorkerParams{ - UpdatedAt: db.now(), + UpdatedAt: db.nowNullable(), WorkerID: sql.NullInt64{ Int64: int64(w.ID), Valid: true, @@ -935,7 +935,7 @@ func (db *DB) UpdateJobsTaskStatuses(ctx context.Context, job *Job, queries := db.queries() err := queries.UpdateJobsTaskStatuses(ctx, sqlc.UpdateJobsTaskStatusesParams{ - UpdatedAt: db.now(), + UpdatedAt: db.nowNullable(), Status: string(taskStatus), Activity: activity, JobID: int64(job.ID), @@ -959,7 +959,7 @@ func (db *DB) UpdateJobsTaskStatusesConditional(ctx context.Context, job *Job, queries := db.queries() err := queries.UpdateJobsTaskStatusesConditional(ctx, sqlc.UpdateJobsTaskStatusesConditionalParams{ - UpdatedAt: db.now(), + UpdatedAt: db.nowNullable(), Status: string(taskStatus), Activity: activity, JobID: int64(job.ID), @@ -976,7 +976,7 @@ func (db *DB) UpdateJobsTaskStatusesConditional(ctx context.Context, job *Job, func (db *DB) TaskTouchedByWorker(ctx context.Context, t *Task) error { queries := db.queries() - now := db.now() + now := db.nowNullable() err := queries.TaskTouchedByWorker(ctx, sqlc.TaskTouchedByWorkerParams{ UpdatedAt: now, LastTouchedAt: now, @@ -1004,7 +1004,7 @@ func (db *DB) AddWorkerToTaskFailedList(ctx context.Context, t *Task, w *Worker) queries := db.queries() err = queries.AddWorkerToTaskFailedList(ctx, sqlc.AddWorkerToTaskFailedListParams{ - CreatedAt: db.now().Time, + CreatedAt: db.nowNullable().Time, TaskID: int64(t.ID), WorkerID: int64(w.ID), }) diff --git a/internal/manager/persistence/jobs_blocklist.go b/internal/manager/persistence/jobs_blocklist.go index 444022a0..925e6c02 100644 --- a/internal/manager/persistence/jobs_blocklist.go +++ b/internal/manager/persistence/jobs_blocklist.go @@ -41,7 +41,7 @@ func (db *DB) AddWorkerToJobBlocklist(ctx context.Context, job *Job, worker *Wor queries := db.queries() return queries.AddWorkerToJobBlocklist(ctx, sqlc.AddWorkerToJobBlocklistParams{ - CreatedAt: db.now().Time, + CreatedAt: db.nowNullable().Time, JobID: int64(job.ID), WorkerID: int64(worker.ID), TaskType: taskType, diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index 389b9554..b81fd05b 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -125,7 +125,7 @@ func TestSaveJobStorageInfo(t *testing.T) { startTime := time.Date(2023, time.February, 7, 15, 0, 0, 0, time.UTC) mockNow := startTime - db.gormDB.NowFunc = func() time.Time { return mockNow } + db.nowfunc = func() time.Time { return mockNow } authoredJob := createTestAuthoredJobWithTasks() err := db.StoreAuthoredJob(ctx, authoredJob) @@ -269,8 +269,8 @@ func TestRequestJobDeletion(t *testing.T) { authoredJob2 := duplicateJobAndTasks(authoredJob1) persistAuthoredJob(t, ctx, db, authoredJob2) - mockNow := time.Now() - db.gormDB.NowFunc = func() time.Time { return mockNow } + mockNow := time.Now().UTC() + db.nowfunc = func() time.Time { return mockNow } err := db.RequestJobDeletion(ctx, job1) require.NoError(t, err) @@ -297,28 +297,28 @@ func TestRequestJobMassDeletion(t *testing.T) { defer close() origGormNow := db.gormDB.NowFunc - now := db.gormDB.NowFunc() + now := db.now() // Ensure different jobs get different timestamps. - db.gormDB.NowFunc = func() time.Time { return now.Add(-3 * time.Second) } + db.nowfunc = func() time.Time { return now.Add(-3 * time.Second) } authoredJob2 := duplicateJobAndTasks(authoredJob1) job2 := persistAuthoredJob(t, ctx, db, authoredJob2) - db.gormDB.NowFunc = func() time.Time { return now.Add(-4 * time.Second) } + db.nowfunc = func() time.Time { return now.Add(-4 * time.Second) } authoredJob3 := duplicateJobAndTasks(authoredJob1) job3 := persistAuthoredJob(t, ctx, db, authoredJob3) - db.gormDB.NowFunc = func() time.Time { return now.Add(-5 * time.Second) } + db.nowfunc = func() time.Time { return now.Add(-5 * time.Second) } authoredJob4 := duplicateJobAndTasks(authoredJob1) job4 := persistAuthoredJob(t, ctx, db, authoredJob4) // Request that "job3 and older" gets deleted. timeOfDeleteRequest := origGormNow() - db.gormDB.NowFunc = func() time.Time { return timeOfDeleteRequest } + db.nowfunc = func() time.Time { return timeOfDeleteRequest } uuids, err := db.RequestJobMassDeletion(ctx, job3.UpdatedAt) require.NoError(t, err) - db.gormDB.NowFunc = origGormNow + db.nowfunc = origGormNow // Only jobs 3 and 4 should be updated. assert.Equal(t, []string{job3.UUID, job4.UUID}, uuids) @@ -348,7 +348,7 @@ func TestRequestJobMassDeletion_noJobsFound(t *testing.T) { defer close() // Request deletion with a timestamp that doesn't match any jobs. - now := db.gormDB.NowFunc() + now := db.now() uuids, err := db.RequestJobMassDeletion(ctx, now.Add(-24*time.Hour)) assert.ErrorIs(t, err, ErrJobNotFound) assert.Zero(t, uuids) @@ -364,7 +364,7 @@ func TestFetchJobsDeletionRequested(t *testing.T) { defer close() now := time.Now() - db.gormDB.NowFunc = func() time.Time { return now } + db.nowfunc = func() time.Time { return now } authoredJob2 := duplicateJobAndTasks(authoredJob1) job2 := persistAuthoredJob(t, ctx, db, authoredJob2) @@ -382,7 +382,7 @@ func TestFetchJobsDeletionRequested(t *testing.T) { now.Add(-5 * time.Second), } currentTimestampIndex := 0 - db.gormDB.NowFunc = func() time.Time { + db.nowfunc = func() time.Time { now := timestamps[currentTimestampIndex] currentTimestampIndex++ return now @@ -688,7 +688,7 @@ func TestTaskTouchedByWorker(t *testing.T) { require.NoError(t, err) assert.True(t, task.LastTouchedAt.IsZero()) - now := db.gormDB.NowFunc() + now := db.now() err = db.TaskTouchedByWorker(ctx, task) require.NoError(t, err) diff --git a/internal/manager/persistence/last_rendered.go b/internal/manager/persistence/last_rendered.go index a5ec02aa..37238340 100644 --- a/internal/manager/persistence/last_rendered.go +++ b/internal/manager/persistence/last_rendered.go @@ -23,7 +23,7 @@ type LastRendered struct { func (db *DB) SetLastRendered(ctx context.Context, j *Job) error { queries := db.queries() - now := db.now() + now := db.nowNullable() return queries.SetLastRendered(ctx, sqlc.SetLastRenderedParams{ CreatedAt: now.Time, UpdatedAt: now, diff --git a/internal/manager/persistence/task_scheduler.go b/internal/manager/persistence/task_scheduler.go index 0c16d350..9d5539b5 100644 --- a/internal/manager/persistence/task_scheduler.go +++ b/internal/manager/persistence/task_scheduler.go @@ -109,7 +109,7 @@ func (db *DB) scheduleTask(ctx context.Context, queries *sqlc.Queries, w *Worker // Assign the task to the worker. err = queries.AssignTaskToWorker(ctx, sqlc.AssignTaskToWorkerParams{ WorkerID: workerID, - Now: db.now(), + Now: db.nowNullable(), TaskID: task.ID, }) diff --git a/internal/manager/persistence/task_scheduler_test.go b/internal/manager/persistence/task_scheduler_test.go index 81c5ea70..381f7888 100644 --- a/internal/manager/persistence/task_scheduler_test.go +++ b/internal/manager/persistence/task_scheduler_test.go @@ -51,7 +51,7 @@ func TestOneJobOneTask(t *testing.T) { assert.Equal(t, w.ID, *task.WorkerID, "task must be assigned to the requesting worker") // Check the task in the database. - now := db.gormDB.NowFunc() + now := db.now() dbTask, err := db.FetchTask(context.Background(), authTask.UUID) require.NoError(t, err) require.NotNil(t, dbTask) @@ -530,7 +530,7 @@ func windowsWorker(t *testing.T, db *DB) Worker { func saveTestWorker(t *testing.T, db *DB, worker *Worker) { params := sqlc.CreateWorkerParams{ - CreatedAt: db.gormDB.NowFunc(), + CreatedAt: db.now(), UUID: worker.UUID, Secret: worker.Secret, Name: worker.Name, diff --git a/internal/manager/persistence/test_support.go b/internal/manager/persistence/test_support.go index 589b3c07..3e0388b2 100644 --- a/internal/manager/persistence/test_support.go +++ b/internal/manager/persistence/test_support.go @@ -45,7 +45,6 @@ func CreateTestDB() (db *DB, closer func()) { config := gorm.Config{ Logger: dblogger, ConnPool: sqliteConn, - NowFunc: nowFunc, } db, err = openDBWithConfig(TestDSN, &config) diff --git a/internal/manager/persistence/timeout_test.go b/internal/manager/persistence/timeout_test.go index 364f4613..30ad05c7 100644 --- a/internal/manager/persistence/timeout_test.go +++ b/internal/manager/persistence/timeout_test.go @@ -18,7 +18,7 @@ func TestFetchTimedOutTasks(t *testing.T) { tasks, err := db.FetchTasksOfJob(ctx, job) require.NoError(t, err) - now := db.gormDB.NowFunc() + now := db.now() deadline := now.Add(-5 * time.Minute) // Mark the task as last touched before the deadline, i.e. old enough for a timeout. diff --git a/internal/manager/persistence/worker_sleep_schedule.go b/internal/manager/persistence/worker_sleep_schedule.go index 916c19c0..34ebe0a5 100644 --- a/internal/manager/persistence/worker_sleep_schedule.go +++ b/internal/manager/persistence/worker_sleep_schedule.go @@ -70,8 +70,8 @@ func (db *DB) SetWorkerSleepSchedule(ctx context.Context, workerUUID string, sch queries := db.queries() params := sqlc.SetWorkerSleepScheduleParams{ - CreatedAt: db.gormDB.NowFunc(), - UpdatedAt: db.now(), + CreatedAt: db.now(), + UpdatedAt: db.nowNullable(), WorkerID: int64(schedule.WorkerID), IsActive: schedule.IsActive, DaysOfWeek: schedule.DaysOfWeek, @@ -126,7 +126,7 @@ func (db *DB) FetchSleepScheduleWorker(ctx context.Context, schedule *SleepSched // FetchSleepSchedulesToCheck returns the sleep schedules that are due for a check. func (db *DB) FetchSleepSchedulesToCheck(ctx context.Context) ([]*SleepSchedule, error) { - now := db.now() + now := db.nowNullable() log.Debug(). Str("timeout", now.Time.String()). diff --git a/internal/manager/persistence/worker_sleep_schedule_test.go b/internal/manager/persistence/worker_sleep_schedule_test.go index b35f1232..b529b4e2 100644 --- a/internal/manager/persistence/worker_sleep_schedule_test.go +++ b/internal/manager/persistence/worker_sleep_schedule_test.go @@ -204,7 +204,7 @@ func TestSetWorkerSleepScheduleNextCheck(t *testing.T) { err := db.SetWorkerSleepSchedule(ctx, w.UUID, &schedule) require.NoError(t, err) - future := db.gormDB.NowFunc().Add(5 * time.Hour) + future := db.now().Add(5 * time.Hour) schedule.NextCheck = future err = db.SetWorkerSleepScheduleNextCheck(ctx, &schedule) @@ -223,7 +223,7 @@ func TestFetchSleepSchedulesToCheck(t *testing.T) { mockedPast := mockedNow.Add(-10 * time.Second) mockedFuture := mockedNow.Add(10 * time.Second) - db.gormDB.NowFunc = func() time.Time { return mockedNow } + db.nowfunc = func() time.Time { return mockedNow } schedule0 := SleepSchedule{ // Next check in the past -> should be checked. Worker: &Worker{ diff --git a/internal/manager/persistence/worker_tag.go b/internal/manager/persistence/worker_tag.go index 6c326866..ba8220d7 100644 --- a/internal/manager/persistence/worker_tag.go +++ b/internal/manager/persistence/worker_tag.go @@ -22,7 +22,7 @@ type WorkerTag struct { func (db *DB) CreateWorkerTag(ctx context.Context, wc *WorkerTag) error { queries := db.queries() - now := db.gormDB.NowFunc() + now := db.now() dbID, err := queries.CreateWorkerTag(ctx, sqlc.CreateWorkerTagParams{ CreatedAt: now, UUID: wc.UUID, @@ -76,7 +76,7 @@ func (db *DB) SaveWorkerTag(ctx context.Context, tag *WorkerTag) error { queries := db.queries() err := queries.SaveWorkerTag(ctx, sqlc.SaveWorkerTagParams{ - UpdatedAt: db.now(), + UpdatedAt: db.nowNullable(), UUID: tag.UUID, Name: tag.Name, Description: tag.Description, diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index 7971e13f..2fe37abd 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -71,7 +71,7 @@ func (w *Worker) StatusChangeClear() { func (db *DB) CreateWorker(ctx context.Context, w *Worker) error { queries := db.queries() - now := db.now().Time + now := db.nowNullable().Time workerID, err := queries.CreateWorker(ctx, sqlc.CreateWorkerParams{ CreatedAt: now, UUID: w.UUID, @@ -149,7 +149,7 @@ func (db *DB) DeleteWorker(ctx context.Context, uuid string) error { queries := db.queries() rowsAffected, err := queries.SoftDeleteWorker(ctx, sqlc.SoftDeleteWorkerParams{ - DeletedAt: db.now(), + DeletedAt: db.nowNullable(), UUID: uuid, }) if err != nil { @@ -224,7 +224,7 @@ func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error { queries := db.queries() err := queries.SaveWorkerStatus(ctx, sqlc.SaveWorkerStatusParams{ - UpdatedAt: db.now(), + UpdatedAt: db.nowNullable(), Status: string(w.Status), StatusRequested: string(w.StatusRequested), LazyStatusRequest: w.LazyStatusRequest, @@ -245,7 +245,7 @@ func (db *DB) SaveWorker(ctx context.Context, w *Worker) error { queries := db.queries() err := queries.SaveWorker(ctx, sqlc.SaveWorkerParams{ - UpdatedAt: db.now(), + UpdatedAt: db.nowNullable(), UUID: w.UUID, Secret: w.Secret, Name: w.Name, @@ -270,7 +270,7 @@ func (db *DB) SaveWorker(ctx context.Context, w *Worker) error { func (db *DB) WorkerSeen(ctx context.Context, w *Worker) error { queries := db.queries() - now := db.now() + now := db.nowNullable() err := queries.WorkerSeen(ctx, sqlc.WorkerSeenParams{ UpdatedAt: now, LastSeenAt: now, diff --git a/internal/manager/persistence/workers_test.go b/internal/manager/persistence/workers_test.go index 77f6119c..dc8fcb58 100644 --- a/internal/manager/persistence/workers_test.go +++ b/internal/manager/persistence/workers_test.go @@ -62,7 +62,7 @@ func TestFetchWorkerTask(t *testing.T) { startTime := time.Date(2024, time.July, 2, 7, 56, 0, 0, time.UTC) mockNow := startTime - db.gormDB.NowFunc = func() time.Time { return mockNow } + db.nowfunc = func() time.Time { return mockNow } // Worker without task. w := Worker{