Manager: replace GORM 'now' function with our own implementation

GORM implicitly sets 'created at', 'updated at' and 'deleted at' timestamps
to 'now' by calling a 'now function'. This is now implemented by Flamenco
directly, instead of relying on GORM.

Ref: #104305
This commit is contained in:
Sybren A. Stüvel 2024-09-26 22:38:17 +02:00
parent 29419cb30e
commit 8513e2fdc8
14 changed files with 69 additions and 66 deletions

View File

@ -19,6 +19,7 @@ import (
// DB provides the database interface.
type DB struct {
gormDB *gorm.DB
nowfunc func() time.Time
// See PeriodicIntegrityCheck().
consistencyCheckRequests chan struct{}
@ -89,21 +90,14 @@ func openDB(ctx context.Context, dsn string) (*DB, error) {
config := gorm.Config{
Logger: dblogger,
NowFunc: nowFunc,
}
return openDBWithConfig(dsn, &config)
}
func openDBWithConfig(dsn string, config *gorm.Config) (*DB, error) {
dialector := sqlite.Open(dsn)
gormDB, err := gorm.Open(dialector, config)
if err != nil {
return nil, err
}
db := DB{
gormDB: gormDB,
nowfunc: time.Now,
// Buffer one request, so that even when a consistency check is already
// running, another can be queued without blocking. Queueing more than one
@ -111,6 +105,15 @@ func openDBWithConfig(dsn string, config *gorm.Config) (*DB, error) {
consistencyCheckRequests: make(chan struct{}, 1),
}
config.NowFunc = db.now
dialector := sqlite.Open(dsn)
gormDB, err := gorm.Open(dialector, config)
if err != nil {
return nil, err
}
db.gormDB = gormDB
// Close the database connection if there was some error. This prevents
// leaking database connections & should remove any write-ahead-log files.
closeConnOnReturn := true
@ -155,12 +158,6 @@ func openDBWithConfig(dsn string, config *gorm.Config) (*DB, error) {
return &db, nil
}
// nowFunc returns 'now' in UTC, so that GORM-managed times (createdAt,
// deletedAt, updatedAt) are stored in UTC.
func nowFunc() time.Time {
return time.Now().UTC()
}
// vacuum executes the SQL "VACUUM" command, and logs any errors.
func (db *DB) vacuum() {
tx := db.gormDB.Exec("vacuum")
@ -231,10 +228,17 @@ func (db *DB) queriesWithTX() (*queriesTX, error) {
return &qtx, nil
}
// now returns the result of `nowFunc()` wrapped in a sql.NullTime.
func (db *DB) now() sql.NullTime {
// now returns 'now' as reported by db.nowfunc.
// It always converts the timestamp to UTC.
func (db *DB) now() time.Time {
return db.nowfunc().UTC()
}
// nowNullable returns the result of `now()` wrapped in a sql.NullTime.
// It is nullable just for ease of use, it will never actually be null.
func (db *DB) nowNullable() sql.NullTime {
return sql.NullTime{
Time: db.gormDB.NowFunc(),
Time: db.now(),
Valid: true,
}
}

View File

@ -167,7 +167,7 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au
// Create the job itself.
params := sqlc.CreateJobParams{
CreatedAt: db.gormDB.NowFunc(),
CreatedAt: db.now(),
UUID: authoredJob.JobID,
Name: authoredJob.Name,
JobType: authoredJob.JobType,
@ -246,7 +246,7 @@ func (db *DB) storeAuthoredJobTaks(
}
// Give every task the same creation timestamp.
now := db.gormDB.NowFunc()
now := db.now()
uuidToTask := make(map[string]TaskInfo)
for _, authoredTask := range authoredJob.Tasks {
@ -442,7 +442,7 @@ func (db *DB) RequestJobDeletion(ctx context.Context, j *Job) error {
queries := db.queries()
// Update the given job itself, so we don't have to re-fetch it from the database.
j.DeleteRequestedAt = db.now()
j.DeleteRequestedAt = db.nowNullable()
params := sqlc.RequestJobDeletionParams{
Now: j.DeleteRequestedAt,
@ -479,7 +479,7 @@ func (db *DB) RequestJobMassDeletion(ctx context.Context, lastUpdatedMax time.Ti
// Update the selected jobs.
params := sqlc.RequestMassJobDeletionParams{
Now: db.now(),
Now: db.nowNullable(),
UUIDs: uuids,
}
if err := queries.RequestMassJobDeletion(ctx, params); err != nil {
@ -529,7 +529,7 @@ func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error {
queries := db.queries()
params := sqlc.SaveJobStatusParams{
Now: db.now(),
Now: db.nowNullable(),
ID: int64(j.ID),
Status: string(j.Status),
Activity: j.Activity,
@ -547,7 +547,7 @@ func (db *DB) SaveJobPriority(ctx context.Context, j *Job) error {
queries := db.queries()
params := sqlc.SaveJobPriorityParams{
Now: db.now(),
Now: db.nowNullable(),
ID: int64(j.ID),
Priority: int64(j.Priority),
}
@ -670,7 +670,7 @@ func (db *DB) SaveTask(ctx context.Context, t *Task) error {
}
param := sqlc.UpdateTaskParams{
UpdatedAt: db.now(),
UpdatedAt: db.nowNullable(),
Name: t.Name,
Type: t.Type,
Priority: int64(t.Priority),
@ -709,7 +709,7 @@ func (db *DB) SaveTaskStatus(ctx context.Context, t *Task) error {
queries := db.queries()
err := queries.UpdateTaskStatus(ctx, sqlc.UpdateTaskStatusParams{
UpdatedAt: db.now(),
UpdatedAt: db.nowNullable(),
Status: string(t.Status),
ID: int64(t.ID),
})
@ -723,7 +723,7 @@ func (db *DB) SaveTaskActivity(ctx context.Context, t *Task) error {
queries := db.queries()
err := queries.UpdateTaskActivity(ctx, sqlc.UpdateTaskActivityParams{
UpdatedAt: db.now(),
UpdatedAt: db.nowNullable(),
Activity: t.Activity,
ID: int64(t.ID),
})
@ -740,7 +740,7 @@ func (db *DB) TaskAssignToWorker(ctx context.Context, t *Task, w *Worker) error
queries := db.queries()
err := queries.TaskAssignToWorker(ctx, sqlc.TaskAssignToWorkerParams{
UpdatedAt: db.now(),
UpdatedAt: db.nowNullable(),
WorkerID: sql.NullInt64{
Int64: int64(w.ID),
Valid: true,
@ -935,7 +935,7 @@ func (db *DB) UpdateJobsTaskStatuses(ctx context.Context, job *Job,
queries := db.queries()
err := queries.UpdateJobsTaskStatuses(ctx, sqlc.UpdateJobsTaskStatusesParams{
UpdatedAt: db.now(),
UpdatedAt: db.nowNullable(),
Status: string(taskStatus),
Activity: activity,
JobID: int64(job.ID),
@ -959,7 +959,7 @@ func (db *DB) UpdateJobsTaskStatusesConditional(ctx context.Context, job *Job,
queries := db.queries()
err := queries.UpdateJobsTaskStatusesConditional(ctx, sqlc.UpdateJobsTaskStatusesConditionalParams{
UpdatedAt: db.now(),
UpdatedAt: db.nowNullable(),
Status: string(taskStatus),
Activity: activity,
JobID: int64(job.ID),
@ -976,7 +976,7 @@ func (db *DB) UpdateJobsTaskStatusesConditional(ctx context.Context, job *Job,
func (db *DB) TaskTouchedByWorker(ctx context.Context, t *Task) error {
queries := db.queries()
now := db.now()
now := db.nowNullable()
err := queries.TaskTouchedByWorker(ctx, sqlc.TaskTouchedByWorkerParams{
UpdatedAt: now,
LastTouchedAt: now,
@ -1004,7 +1004,7 @@ func (db *DB) AddWorkerToTaskFailedList(ctx context.Context, t *Task, w *Worker)
queries := db.queries()
err = queries.AddWorkerToTaskFailedList(ctx, sqlc.AddWorkerToTaskFailedListParams{
CreatedAt: db.now().Time,
CreatedAt: db.nowNullable().Time,
TaskID: int64(t.ID),
WorkerID: int64(w.ID),
})

View File

@ -41,7 +41,7 @@ func (db *DB) AddWorkerToJobBlocklist(ctx context.Context, job *Job, worker *Wor
queries := db.queries()
return queries.AddWorkerToJobBlocklist(ctx, sqlc.AddWorkerToJobBlocklistParams{
CreatedAt: db.now().Time,
CreatedAt: db.nowNullable().Time,
JobID: int64(job.ID),
WorkerID: int64(worker.ID),
TaskType: taskType,

View File

@ -125,7 +125,7 @@ func TestSaveJobStorageInfo(t *testing.T) {
startTime := time.Date(2023, time.February, 7, 15, 0, 0, 0, time.UTC)
mockNow := startTime
db.gormDB.NowFunc = func() time.Time { return mockNow }
db.nowfunc = func() time.Time { return mockNow }
authoredJob := createTestAuthoredJobWithTasks()
err := db.StoreAuthoredJob(ctx, authoredJob)
@ -269,8 +269,8 @@ func TestRequestJobDeletion(t *testing.T) {
authoredJob2 := duplicateJobAndTasks(authoredJob1)
persistAuthoredJob(t, ctx, db, authoredJob2)
mockNow := time.Now()
db.gormDB.NowFunc = func() time.Time { return mockNow }
mockNow := time.Now().UTC()
db.nowfunc = func() time.Time { return mockNow }
err := db.RequestJobDeletion(ctx, job1)
require.NoError(t, err)
@ -297,28 +297,28 @@ func TestRequestJobMassDeletion(t *testing.T) {
defer close()
origGormNow := db.gormDB.NowFunc
now := db.gormDB.NowFunc()
now := db.now()
// Ensure different jobs get different timestamps.
db.gormDB.NowFunc = func() time.Time { return now.Add(-3 * time.Second) }
db.nowfunc = func() time.Time { return now.Add(-3 * time.Second) }
authoredJob2 := duplicateJobAndTasks(authoredJob1)
job2 := persistAuthoredJob(t, ctx, db, authoredJob2)
db.gormDB.NowFunc = func() time.Time { return now.Add(-4 * time.Second) }
db.nowfunc = func() time.Time { return now.Add(-4 * time.Second) }
authoredJob3 := duplicateJobAndTasks(authoredJob1)
job3 := persistAuthoredJob(t, ctx, db, authoredJob3)
db.gormDB.NowFunc = func() time.Time { return now.Add(-5 * time.Second) }
db.nowfunc = func() time.Time { return now.Add(-5 * time.Second) }
authoredJob4 := duplicateJobAndTasks(authoredJob1)
job4 := persistAuthoredJob(t, ctx, db, authoredJob4)
// Request that "job3 and older" gets deleted.
timeOfDeleteRequest := origGormNow()
db.gormDB.NowFunc = func() time.Time { return timeOfDeleteRequest }
db.nowfunc = func() time.Time { return timeOfDeleteRequest }
uuids, err := db.RequestJobMassDeletion(ctx, job3.UpdatedAt)
require.NoError(t, err)
db.gormDB.NowFunc = origGormNow
db.nowfunc = origGormNow
// Only jobs 3 and 4 should be updated.
assert.Equal(t, []string{job3.UUID, job4.UUID}, uuids)
@ -348,7 +348,7 @@ func TestRequestJobMassDeletion_noJobsFound(t *testing.T) {
defer close()
// Request deletion with a timestamp that doesn't match any jobs.
now := db.gormDB.NowFunc()
now := db.now()
uuids, err := db.RequestJobMassDeletion(ctx, now.Add(-24*time.Hour))
assert.ErrorIs(t, err, ErrJobNotFound)
assert.Zero(t, uuids)
@ -364,7 +364,7 @@ func TestFetchJobsDeletionRequested(t *testing.T) {
defer close()
now := time.Now()
db.gormDB.NowFunc = func() time.Time { return now }
db.nowfunc = func() time.Time { return now }
authoredJob2 := duplicateJobAndTasks(authoredJob1)
job2 := persistAuthoredJob(t, ctx, db, authoredJob2)
@ -382,7 +382,7 @@ func TestFetchJobsDeletionRequested(t *testing.T) {
now.Add(-5 * time.Second),
}
currentTimestampIndex := 0
db.gormDB.NowFunc = func() time.Time {
db.nowfunc = func() time.Time {
now := timestamps[currentTimestampIndex]
currentTimestampIndex++
return now
@ -688,7 +688,7 @@ func TestTaskTouchedByWorker(t *testing.T) {
require.NoError(t, err)
assert.True(t, task.LastTouchedAt.IsZero())
now := db.gormDB.NowFunc()
now := db.now()
err = db.TaskTouchedByWorker(ctx, task)
require.NoError(t, err)

View File

@ -23,7 +23,7 @@ type LastRendered struct {
func (db *DB) SetLastRendered(ctx context.Context, j *Job) error {
queries := db.queries()
now := db.now()
now := db.nowNullable()
return queries.SetLastRendered(ctx, sqlc.SetLastRenderedParams{
CreatedAt: now.Time,
UpdatedAt: now,

View File

@ -109,7 +109,7 @@ func (db *DB) scheduleTask(ctx context.Context, queries *sqlc.Queries, w *Worker
// Assign the task to the worker.
err = queries.AssignTaskToWorker(ctx, sqlc.AssignTaskToWorkerParams{
WorkerID: workerID,
Now: db.now(),
Now: db.nowNullable(),
TaskID: task.ID,
})

View File

@ -51,7 +51,7 @@ func TestOneJobOneTask(t *testing.T) {
assert.Equal(t, w.ID, *task.WorkerID, "task must be assigned to the requesting worker")
// Check the task in the database.
now := db.gormDB.NowFunc()
now := db.now()
dbTask, err := db.FetchTask(context.Background(), authTask.UUID)
require.NoError(t, err)
require.NotNil(t, dbTask)
@ -530,7 +530,7 @@ func windowsWorker(t *testing.T, db *DB) Worker {
func saveTestWorker(t *testing.T, db *DB, worker *Worker) {
params := sqlc.CreateWorkerParams{
CreatedAt: db.gormDB.NowFunc(),
CreatedAt: db.now(),
UUID: worker.UUID,
Secret: worker.Secret,
Name: worker.Name,

View File

@ -45,7 +45,6 @@ func CreateTestDB() (db *DB, closer func()) {
config := gorm.Config{
Logger: dblogger,
ConnPool: sqliteConn,
NowFunc: nowFunc,
}
db, err = openDBWithConfig(TestDSN, &config)

View File

@ -18,7 +18,7 @@ func TestFetchTimedOutTasks(t *testing.T) {
tasks, err := db.FetchTasksOfJob(ctx, job)
require.NoError(t, err)
now := db.gormDB.NowFunc()
now := db.now()
deadline := now.Add(-5 * time.Minute)
// Mark the task as last touched before the deadline, i.e. old enough for a timeout.

View File

@ -70,8 +70,8 @@ func (db *DB) SetWorkerSleepSchedule(ctx context.Context, workerUUID string, sch
queries := db.queries()
params := sqlc.SetWorkerSleepScheduleParams{
CreatedAt: db.gormDB.NowFunc(),
UpdatedAt: db.now(),
CreatedAt: db.now(),
UpdatedAt: db.nowNullable(),
WorkerID: int64(schedule.WorkerID),
IsActive: schedule.IsActive,
DaysOfWeek: schedule.DaysOfWeek,
@ -126,7 +126,7 @@ func (db *DB) FetchSleepScheduleWorker(ctx context.Context, schedule *SleepSched
// FetchSleepSchedulesToCheck returns the sleep schedules that are due for a check.
func (db *DB) FetchSleepSchedulesToCheck(ctx context.Context) ([]*SleepSchedule, error) {
now := db.now()
now := db.nowNullable()
log.Debug().
Str("timeout", now.Time.String()).

View File

@ -204,7 +204,7 @@ func TestSetWorkerSleepScheduleNextCheck(t *testing.T) {
err := db.SetWorkerSleepSchedule(ctx, w.UUID, &schedule)
require.NoError(t, err)
future := db.gormDB.NowFunc().Add(5 * time.Hour)
future := db.now().Add(5 * time.Hour)
schedule.NextCheck = future
err = db.SetWorkerSleepScheduleNextCheck(ctx, &schedule)
@ -223,7 +223,7 @@ func TestFetchSleepSchedulesToCheck(t *testing.T) {
mockedPast := mockedNow.Add(-10 * time.Second)
mockedFuture := mockedNow.Add(10 * time.Second)
db.gormDB.NowFunc = func() time.Time { return mockedNow }
db.nowfunc = func() time.Time { return mockedNow }
schedule0 := SleepSchedule{ // Next check in the past -> should be checked.
Worker: &Worker{

View File

@ -22,7 +22,7 @@ type WorkerTag struct {
func (db *DB) CreateWorkerTag(ctx context.Context, wc *WorkerTag) error {
queries := db.queries()
now := db.gormDB.NowFunc()
now := db.now()
dbID, err := queries.CreateWorkerTag(ctx, sqlc.CreateWorkerTagParams{
CreatedAt: now,
UUID: wc.UUID,
@ -76,7 +76,7 @@ func (db *DB) SaveWorkerTag(ctx context.Context, tag *WorkerTag) error {
queries := db.queries()
err := queries.SaveWorkerTag(ctx, sqlc.SaveWorkerTagParams{
UpdatedAt: db.now(),
UpdatedAt: db.nowNullable(),
UUID: tag.UUID,
Name: tag.Name,
Description: tag.Description,

View File

@ -71,7 +71,7 @@ func (w *Worker) StatusChangeClear() {
func (db *DB) CreateWorker(ctx context.Context, w *Worker) error {
queries := db.queries()
now := db.now().Time
now := db.nowNullable().Time
workerID, err := queries.CreateWorker(ctx, sqlc.CreateWorkerParams{
CreatedAt: now,
UUID: w.UUID,
@ -149,7 +149,7 @@ func (db *DB) DeleteWorker(ctx context.Context, uuid string) error {
queries := db.queries()
rowsAffected, err := queries.SoftDeleteWorker(ctx, sqlc.SoftDeleteWorkerParams{
DeletedAt: db.now(),
DeletedAt: db.nowNullable(),
UUID: uuid,
})
if err != nil {
@ -224,7 +224,7 @@ func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error {
queries := db.queries()
err := queries.SaveWorkerStatus(ctx, sqlc.SaveWorkerStatusParams{
UpdatedAt: db.now(),
UpdatedAt: db.nowNullable(),
Status: string(w.Status),
StatusRequested: string(w.StatusRequested),
LazyStatusRequest: w.LazyStatusRequest,
@ -245,7 +245,7 @@ func (db *DB) SaveWorker(ctx context.Context, w *Worker) error {
queries := db.queries()
err := queries.SaveWorker(ctx, sqlc.SaveWorkerParams{
UpdatedAt: db.now(),
UpdatedAt: db.nowNullable(),
UUID: w.UUID,
Secret: w.Secret,
Name: w.Name,
@ -270,7 +270,7 @@ func (db *DB) SaveWorker(ctx context.Context, w *Worker) error {
func (db *DB) WorkerSeen(ctx context.Context, w *Worker) error {
queries := db.queries()
now := db.now()
now := db.nowNullable()
err := queries.WorkerSeen(ctx, sqlc.WorkerSeenParams{
UpdatedAt: now,
LastSeenAt: now,

View File

@ -62,7 +62,7 @@ func TestFetchWorkerTask(t *testing.T) {
startTime := time.Date(2024, time.July, 2, 7, 56, 0, 0, time.UTC)
mockNow := startTime
db.gormDB.NowFunc = func() time.Time { return mockNow }
db.nowfunc = func() time.Time { return mockNow }
// Worker without task.
w := Worker{