diff --git a/internal/manager/persistence/db.go b/internal/manager/persistence/db.go index 6ba0048f..709c8543 100644 --- a/internal/manager/persistence/db.go +++ b/internal/manager/persistence/db.go @@ -9,16 +9,15 @@ import ( "fmt" "time" - "github.com/glebarez/sqlite" "github.com/rs/zerolog/log" - "gorm.io/gorm" + _ "modernc.org/sqlite" "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" ) // DB provides the database interface. type DB struct { - gormDB *gorm.DB + sqlDB *sql.DB nowfunc func() time.Time // See PeriodicIntegrityCheck(). @@ -64,7 +63,7 @@ func OpenDB(ctx context.Context, dsn string) (*DB, error) { return nil, ErrIntegrity } - db.vacuum() + db.vacuum(ctx) if err := db.migrate(ctx); err != nil { return nil, err @@ -78,41 +77,18 @@ func OpenDB(ctx context.Context, dsn string) (*DB, error) { // Perform another vacuum after database migration, as that may have copied a // lot of data and then dropped another lot of data. - db.vacuum() + db.vacuum(ctx) closeConnOnReturn = false return db, nil } func openDB(ctx context.Context, dsn string) (*DB, error) { - globalLogLevel := log.Logger.GetLevel() - dblogger := NewDBLogger(log.Level(globalLogLevel)) - - config := gorm.Config{ - Logger: dblogger, - } - - return openDBWithConfig(dsn, &config) -} - -func openDBWithConfig(dsn string, config *gorm.Config) (*DB, error) { - db := DB{ - nowfunc: time.Now, - - // Buffer one request, so that even when a consistency check is already - // running, another can be queued without blocking. Queueing more than one - // doesn't make sense, though. - consistencyCheckRequests: make(chan struct{}, 1), - } - - config.NowFunc = db.now - - dialector := sqlite.Open(dsn) - gormDB, err := gorm.Open(dialector, config) + // Connect to the database. + sqlDB, err := sql.Open("sqlite", dsn) if err != nil { return nil, err } - db.gormDB = gormDB // Close the database connection if there was some error. This prevents // leaking database connections & should remove any write-ahead-log files. @@ -121,37 +97,44 @@ func openDBWithConfig(dsn string, config *gorm.Config) (*DB, error) { if !closeConnOnReturn { return } - if err := db.Close(); err != nil { + if err := sqlDB.Close(); err != nil { log.Debug().AnErr("cause", err).Msg("cannot close database connection") } }() - // Use the generic sql.DB interface to set some connection pool options. - sqlDB, err := gormDB.DB() - if err != nil { - return nil, err - } // Only allow a single database connection, to avoid SQLITE_BUSY errors. // It's not certain that this'll improve the situation, but it's worth a try. sqlDB.SetMaxIdleConns(1) // Max num of connections in the idle connection pool. sqlDB.SetMaxOpenConns(1) // Max num of open connections to the database. + db := DB{ + sqlDB: sqlDB, + nowfunc: func() time.Time { return time.Now().UTC() }, + + // Buffer one request, so that even when a consistency check is already + // running, another can be queued without blocking. Queueing more than one + // doesn't make sense, though. + consistencyCheckRequests: make(chan struct{}, 1), + } + // Always enable foreign key checks, to make SQLite behave like a real database. - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + pragmaCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - if err := db.pragmaForeignKeys(ctx, true); err != nil { + if err := db.pragmaForeignKeys(pragmaCtx, true); err != nil { return nil, err } + queries := db.queries() + // Write-ahead-log journal may improve writing speed. log.Trace().Msg("enabling SQLite write-ahead-log journal mode") - if tx := gormDB.Exec("PRAGMA journal_mode = WAL"); tx.Error != nil { - return nil, fmt.Errorf("enabling SQLite write-ahead-log journal mode: %w", tx.Error) + if err := queries.PragmaJournalModeWAL(pragmaCtx); err != nil { + return nil, fmt.Errorf("enabling SQLite write-ahead-log journal mode: %w", err) } // Switching from 'full' (default) to 'normal' sync may improve writing speed. log.Trace().Msg("enabling SQLite 'normal' synchronisation") - if tx := gormDB.Exec("PRAGMA synchronous = normal"); tx.Error != nil { - return nil, fmt.Errorf("enabling SQLite 'normal' sync mode: %w", tx.Error) + if err := queries.PragmaSynchronousNormal(pragmaCtx); err != nil { + return nil, fmt.Errorf("enabling SQLite 'normal' sync mode: %w", err) } closeConnOnReturn = false @@ -159,37 +142,21 @@ func openDBWithConfig(dsn string, config *gorm.Config) (*DB, error) { } // vacuum executes the SQL "VACUUM" command, and logs any errors. -func (db *DB) vacuum() { - tx := db.gormDB.Exec("vacuum") - if tx.Error != nil { - log.Error().Err(tx.Error).Msg("error vacuuming database") +func (db *DB) vacuum(ctx context.Context) { + err := db.queries().Vacuum(ctx) + if err != nil { + log.Error().Err(err).Msg("error vacuuming database") } } // Close closes the connection to the database. func (db *DB) Close() error { - sqldb, err := db.gormDB.DB() - if err != nil { - return err - } - return sqldb.Close() + return db.sqlDB.Close() } // queries returns the SQLC Queries struct, connected to this database. -// It is intended that all GORM queries will be migrated to use this interface -// instead. -// -// Note that this function does not return an error. Instead it just panics when -// it cannot obtain the low-level GORM database interface. I have no idea when -// this will ever fail, so I'm opting to simplify the use of this function -// instead. func (db *DB) queries() *sqlc.Queries { - sqldb, err := db.gormDB.DB() - if err != nil { - panic(fmt.Sprintf("could not get low-level database driver: %v", err)) - } - - loggingWrapper := LoggingDBConn{sqldb} + loggingWrapper := LoggingDBConn{db.sqlDB} return sqlc.New(&loggingWrapper) } @@ -207,12 +174,7 @@ type queriesTX struct { // is closed (either committed or rolled back). Otherwise SQLite will deadlock, // as it will make any other query wait until this transaction is done. func (db *DB) queriesWithTX() (*queriesTX, error) { - sqldb, err := db.gormDB.DB() - if err != nil { - panic(fmt.Sprintf("could not get low-level database driver: %v", err)) - } - - tx, err := sqldb.Begin() + tx, err := db.sqlDB.Begin() if err != nil { return nil, fmt.Errorf("could not begin database transaction: %w", err) } @@ -231,7 +193,7 @@ func (db *DB) queriesWithTX() (*queriesTX, error) { // now returns 'now' as reported by db.nowfunc. // It always converts the timestamp to UTC. func (db *DB) now() time.Time { - return db.nowfunc().UTC() + return db.nowfunc() } // nowNullable returns the result of `now()` wrapped in a sql.NullTime. diff --git a/internal/manager/persistence/db_migration.go b/internal/manager/persistence/db_migration.go index 4c0bacef..2d34b18f 100644 --- a/internal/manager/persistence/db_migration.go +++ b/internal/manager/persistence/db_migration.go @@ -25,12 +25,6 @@ func (db *DB) migrate(ctx context.Context) error { log.Fatal().AnErr("cause", err).Msg("could not tell Goose to use sqlite3") } - // Hook up Goose to the database. - lowLevelDB, err := db.gormDB.DB() - if err != nil { - log.Fatal().AnErr("cause", err).Msg("GORM would not give us its low-level interface") - } - // Disable foreign key constraints during the migrations. This is necessary // for SQLite to do column renames / drops, as that requires creating a new // table with the new schema, copying the data, dropping the old table, and @@ -47,7 +41,7 @@ func (db *DB) migrate(ctx context.Context) error { // Run Goose. log.Debug().Msg("migrating database with Goose") - if err := goose.UpContext(ctx, lowLevelDB, "migrations"); err != nil { + if err := goose.UpContext(ctx, db.sqlDB, "migrations"); err != nil { log.Fatal().AnErr("cause", err).Msg("could not migrate database to the latest version") } diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index b81fd05b..e7d0d9ba 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -296,7 +296,7 @@ func TestRequestJobMassDeletion(t *testing.T) { ctx, close, db, job1, authoredJob1 := jobTasksTestFixtures(t) defer close() - origGormNow := db.gormDB.NowFunc + realNowFunc := db.nowfunc now := db.now() // Ensure different jobs get different timestamps. @@ -313,12 +313,12 @@ func TestRequestJobMassDeletion(t *testing.T) { job4 := persistAuthoredJob(t, ctx, db, authoredJob4) // Request that "job3 and older" gets deleted. - timeOfDeleteRequest := origGormNow() + timeOfDeleteRequest := realNowFunc() db.nowfunc = func() time.Time { return timeOfDeleteRequest } uuids, err := db.RequestJobMassDeletion(ctx, job3.UpdatedAt) require.NoError(t, err) - db.nowfunc = origGormNow + db.nowfunc = realNowFunc // Only jobs 3 and 4 should be updated. assert.Equal(t, []string{job3.UUID, job4.UUID}, uuids) diff --git a/internal/manager/persistence/logger.go b/internal/manager/persistence/logger.go index a5db0b8f..b9ef08a7 100644 --- a/internal/manager/persistence/logger.go +++ b/internal/manager/persistence/logger.go @@ -5,149 +5,12 @@ package persistence import ( "context" "database/sql" - "errors" - "fmt" - "time" - "github.com/rs/zerolog" "github.com/rs/zerolog/log" - "gorm.io/gorm" - gormlogger "gorm.io/gorm/logger" + "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" ) -// dbLogger implements the behaviour of Gorm's default logger on top of Zerolog. -// See https://github.com/go-gorm/gorm/blob/master/logger/logger.go -type dbLogger struct { - zlog *zerolog.Logger - - IgnoreRecordNotFoundError bool - SlowThreshold time.Duration -} - -var _ gormlogger.Interface = (*dbLogger)(nil) - -var logLevelMap = map[gormlogger.LogLevel]zerolog.Level{ - gormlogger.Silent: zerolog.Disabled, - gormlogger.Error: zerolog.ErrorLevel, - gormlogger.Warn: zerolog.WarnLevel, - gormlogger.Info: zerolog.InfoLevel, -} - -func gormToZlogLevel(logLevel gormlogger.LogLevel) zerolog.Level { - zlogLevel, ok := logLevelMap[logLevel] - if !ok { - // Just a default value that seemed sensible at the time of writing. - return zerolog.DebugLevel - } - return zlogLevel -} - -// NewDBLogger wraps a zerolog logger to implement a Gorm logger interface. -func NewDBLogger(zlog zerolog.Logger) *dbLogger { - return &dbLogger{ - zlog: &zlog, - // Remaining properties default to their zero value. - } -} - -// LogMode returns a child logger at the given log level. -func (l *dbLogger) LogMode(logLevel gormlogger.LogLevel) gormlogger.Interface { - childLogger := l.zlog.Level(gormToZlogLevel(logLevel)) - newlogger := *l - newlogger.zlog = &childLogger - return &newlogger -} - -func (l *dbLogger) Info(ctx context.Context, msg string, args ...interface{}) { - l.logEvent(zerolog.InfoLevel, msg, args) -} - -func (l *dbLogger) Warn(ctx context.Context, msg string, args ...interface{}) { - l.logEvent(zerolog.WarnLevel, msg, args) -} - -func (l *dbLogger) Error(ctx context.Context, msg string, args ...interface{}) { - l.logEvent(zerolog.ErrorLevel, msg, args) -} - -// Trace traces the execution of SQL and potentially logs errors, warnings, and infos. -// Note that it doesn't mean "trace-level logging". -func (l *dbLogger) Trace(ctx context.Context, begin time.Time, fc func() (sql string, rowsAffected int64), err error) { - zlogLevel := l.zlog.GetLevel() - if zlogLevel == zerolog.Disabled { - return - } - - elapsed := time.Since(begin) - logCtx := l.zlog.With().CallerWithSkipFrameCount(5) - - // Function to lazily get the SQL, affected rows, and logger. - buildLogger := func() (loggerPtr *zerolog.Logger, sql string) { - sql, rows := fc() - logCtx = logCtx.AnErr("cause", err) - if rows >= 0 { - logCtx = logCtx.Int64("rowsAffected", rows) - } - logger := logCtx.Logger() - return &logger, sql - } - - switch { - case err != nil && zlogLevel <= zerolog.ErrorLevel: - logger, sql := buildLogger() - if l.silenceLoggingError(err) { - logger.Debug().Msg(sql) - } else { - logger.Error().Msg(sql) - } - - case elapsed > l.SlowThreshold && l.SlowThreshold != 0 && zlogLevel <= zerolog.WarnLevel: - logger, sql := buildLogger() - logger.Warn(). - Str("sql", sql). - Dur("elapsed", elapsed). - Dur("slowThreshold", l.SlowThreshold). - Msg("slow database query") - - case zlogLevel <= zerolog.TraceLevel: - logger, sql := buildLogger() - logger.Trace().Msg(sql) - } -} - -func (l dbLogger) silenceLoggingError(err error) bool { - switch { - case l.IgnoreRecordNotFoundError && errors.Is(err, gorm.ErrRecordNotFound): - return true - case errors.Is(err, context.Canceled): - // These are usually caused by the HTTP client connection closing. Stopping - // a database query is normal behaviour in such a case, so this shouldn't be - // logged as an error. - return true - default: - return false - } -} - -// logEvent logs an even at the given level. -func (l dbLogger) logEvent(level zerolog.Level, msg string, args ...interface{}) { - if l.zlog.GetLevel() > level { - return - } - logger := l.logger(args) - logger.WithLevel(level).Msg("logEvent: " + msg) -} - -// logger constructs a zerolog logger. The given arguments are added via reflection. -func (l dbLogger) logger(args ...interface{}) zerolog.Logger { - logCtx := l.zlog.With() - for idx, arg := range args { - logCtx.Interface(fmt.Sprintf("arg%d", idx), arg) - } - return logCtx.Logger() -} - // LoggingDBConn wraps a database/sql.DB connection, so that it can be used with // sqlc and log all the queries. type LoggingDBConn struct { diff --git a/internal/manager/persistence/sqlc/pragma.go b/internal/manager/persistence/sqlc/pragma.go index 570998ba..134c5a57 100644 --- a/internal/manager/persistence/sqlc/pragma.go +++ b/internal/manager/persistence/sqlc/pragma.go @@ -108,3 +108,24 @@ func (q *Queries) PragmaBusyTimeout(ctx context.Context, busyTimeout time.Durati _, err := q.db.ExecContext(ctx, sql) return err } + +const pragmaJournalModeWAL = `PRAGMA journal_mode = WAL` + +func (q *Queries) PragmaJournalModeWAL(ctx context.Context) error { + _, err := q.db.ExecContext(ctx, pragmaJournalModeWAL) + return err +} + +const pragmaSynchronousNormal = `PRAGMA synchronous = normal` + +func (q *Queries) PragmaSynchronousNormal(ctx context.Context) error { + _, err := q.db.ExecContext(ctx, pragmaSynchronousNormal) + return err +} + +const vacuum = `VACUUM` + +func (q *Queries) Vacuum(ctx context.Context) error { + _, err := q.db.ExecContext(ctx, vacuum) + return err +} diff --git a/internal/manager/persistence/test_support.go b/internal/manager/persistence/test_support.go index 3e0388b2..b7644943 100644 --- a/internal/manager/persistence/test_support.go +++ b/internal/manager/persistence/test_support.go @@ -5,17 +5,12 @@ package persistence import ( "context" - "database/sql" "fmt" "os" "testing" "time" - "github.com/glebarez/sqlite" - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" "github.com/stretchr/testify/require" - "gorm.io/gorm" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -31,29 +26,16 @@ func CreateTestDB() (db *DB, closer func()) { } } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + var err error - dblogger := NewDBLogger(log.Level(zerolog.TraceLevel).Output(os.Stdout)) - - // Open the database ourselves, so that we have a low-level connection that - // can be closed when the unit test is done running. - sqliteConn, err := sql.Open(sqlite.DriverName, TestDSN) - if err != nil { - panic(fmt.Sprintf("opening SQLite connection: %v", err)) - } - - config := gorm.Config{ - Logger: dblogger, - ConnPool: sqliteConn, - } - - db, err = openDBWithConfig(TestDSN, &config) + db, err = openDB(ctx, TestDSN) if err != nil { panic(fmt.Sprintf("opening DB: %v", err)) } - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() err = db.migrate(ctx) if err != nil { panic(fmt.Sprintf("migrating DB: %v", err))