Manager: remove GORM from database interface

Remove all calls to GORM from the database interface code.

Ref: #104305
This commit is contained in:
Sybren A. Stüvel 2024-09-26 22:58:11 +02:00
parent f1a72903a0
commit 816046663e
6 changed files with 63 additions and 241 deletions

View File

@ -9,16 +9,15 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/glebarez/sqlite"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"gorm.io/gorm" _ "modernc.org/sqlite"
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
) )
// DB provides the database interface. // DB provides the database interface.
type DB struct { type DB struct {
gormDB *gorm.DB sqlDB *sql.DB
nowfunc func() time.Time nowfunc func() time.Time
// See PeriodicIntegrityCheck(). // See PeriodicIntegrityCheck().
@ -64,7 +63,7 @@ func OpenDB(ctx context.Context, dsn string) (*DB, error) {
return nil, ErrIntegrity return nil, ErrIntegrity
} }
db.vacuum() db.vacuum(ctx)
if err := db.migrate(ctx); err != nil { if err := db.migrate(ctx); err != nil {
return nil, err return nil, err
@ -78,41 +77,18 @@ func OpenDB(ctx context.Context, dsn string) (*DB, error) {
// Perform another vacuum after database migration, as that may have copied a // Perform another vacuum after database migration, as that may have copied a
// lot of data and then dropped another lot of data. // lot of data and then dropped another lot of data.
db.vacuum() db.vacuum(ctx)
closeConnOnReturn = false closeConnOnReturn = false
return db, nil return db, nil
} }
func openDB(ctx context.Context, dsn string) (*DB, error) { func openDB(ctx context.Context, dsn string) (*DB, error) {
globalLogLevel := log.Logger.GetLevel() // Connect to the database.
dblogger := NewDBLogger(log.Level(globalLogLevel)) sqlDB, err := sql.Open("sqlite", dsn)
config := gorm.Config{
Logger: dblogger,
}
return openDBWithConfig(dsn, &config)
}
func openDBWithConfig(dsn string, config *gorm.Config) (*DB, error) {
db := DB{
nowfunc: time.Now,
// Buffer one request, so that even when a consistency check is already
// running, another can be queued without blocking. Queueing more than one
// doesn't make sense, though.
consistencyCheckRequests: make(chan struct{}, 1),
}
config.NowFunc = db.now
dialector := sqlite.Open(dsn)
gormDB, err := gorm.Open(dialector, config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
db.gormDB = gormDB
// Close the database connection if there was some error. This prevents // Close the database connection if there was some error. This prevents
// leaking database connections & should remove any write-ahead-log files. // leaking database connections & should remove any write-ahead-log files.
@ -121,37 +97,44 @@ func openDBWithConfig(dsn string, config *gorm.Config) (*DB, error) {
if !closeConnOnReturn { if !closeConnOnReturn {
return return
} }
if err := db.Close(); err != nil { if err := sqlDB.Close(); err != nil {
log.Debug().AnErr("cause", err).Msg("cannot close database connection") log.Debug().AnErr("cause", err).Msg("cannot close database connection")
} }
}() }()
// Use the generic sql.DB interface to set some connection pool options.
sqlDB, err := gormDB.DB()
if err != nil {
return nil, err
}
// Only allow a single database connection, to avoid SQLITE_BUSY errors. // Only allow a single database connection, to avoid SQLITE_BUSY errors.
// It's not certain that this'll improve the situation, but it's worth a try. // It's not certain that this'll improve the situation, but it's worth a try.
sqlDB.SetMaxIdleConns(1) // Max num of connections in the idle connection pool. sqlDB.SetMaxIdleConns(1) // Max num of connections in the idle connection pool.
sqlDB.SetMaxOpenConns(1) // Max num of open connections to the database. sqlDB.SetMaxOpenConns(1) // Max num of open connections to the database.
db := DB{
sqlDB: sqlDB,
nowfunc: func() time.Time { return time.Now().UTC() },
// Buffer one request, so that even when a consistency check is already
// running, another can be queued without blocking. Queueing more than one
// doesn't make sense, though.
consistencyCheckRequests: make(chan struct{}, 1),
}
// Always enable foreign key checks, to make SQLite behave like a real database. // Always enable foreign key checks, to make SQLite behave like a real database.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) pragmaCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel() defer cancel()
if err := db.pragmaForeignKeys(ctx, true); err != nil { if err := db.pragmaForeignKeys(pragmaCtx, true); err != nil {
return nil, err return nil, err
} }
queries := db.queries()
// Write-ahead-log journal may improve writing speed. // Write-ahead-log journal may improve writing speed.
log.Trace().Msg("enabling SQLite write-ahead-log journal mode") log.Trace().Msg("enabling SQLite write-ahead-log journal mode")
if tx := gormDB.Exec("PRAGMA journal_mode = WAL"); tx.Error != nil { if err := queries.PragmaJournalModeWAL(pragmaCtx); err != nil {
return nil, fmt.Errorf("enabling SQLite write-ahead-log journal mode: %w", tx.Error) return nil, fmt.Errorf("enabling SQLite write-ahead-log journal mode: %w", err)
} }
// Switching from 'full' (default) to 'normal' sync may improve writing speed. // Switching from 'full' (default) to 'normal' sync may improve writing speed.
log.Trace().Msg("enabling SQLite 'normal' synchronisation") log.Trace().Msg("enabling SQLite 'normal' synchronisation")
if tx := gormDB.Exec("PRAGMA synchronous = normal"); tx.Error != nil { if err := queries.PragmaSynchronousNormal(pragmaCtx); err != nil {
return nil, fmt.Errorf("enabling SQLite 'normal' sync mode: %w", tx.Error) return nil, fmt.Errorf("enabling SQLite 'normal' sync mode: %w", err)
} }
closeConnOnReturn = false closeConnOnReturn = false
@ -159,37 +142,21 @@ func openDBWithConfig(dsn string, config *gorm.Config) (*DB, error) {
} }
// vacuum executes the SQL "VACUUM" command, and logs any errors. // vacuum executes the SQL "VACUUM" command, and logs any errors.
func (db *DB) vacuum() { func (db *DB) vacuum(ctx context.Context) {
tx := db.gormDB.Exec("vacuum") err := db.queries().Vacuum(ctx)
if tx.Error != nil { if err != nil {
log.Error().Err(tx.Error).Msg("error vacuuming database") log.Error().Err(err).Msg("error vacuuming database")
} }
} }
// Close closes the connection to the database. // Close closes the connection to the database.
func (db *DB) Close() error { func (db *DB) Close() error {
sqldb, err := db.gormDB.DB() return db.sqlDB.Close()
if err != nil {
return err
}
return sqldb.Close()
} }
// queries returns the SQLC Queries struct, connected to this database. // queries returns the SQLC Queries struct, connected to this database.
// It is intended that all GORM queries will be migrated to use this interface
// instead.
//
// Note that this function does not return an error. Instead it just panics when
// it cannot obtain the low-level GORM database interface. I have no idea when
// this will ever fail, so I'm opting to simplify the use of this function
// instead.
func (db *DB) queries() *sqlc.Queries { func (db *DB) queries() *sqlc.Queries {
sqldb, err := db.gormDB.DB() loggingWrapper := LoggingDBConn{db.sqlDB}
if err != nil {
panic(fmt.Sprintf("could not get low-level database driver: %v", err))
}
loggingWrapper := LoggingDBConn{sqldb}
return sqlc.New(&loggingWrapper) return sqlc.New(&loggingWrapper)
} }
@ -207,12 +174,7 @@ type queriesTX struct {
// is closed (either committed or rolled back). Otherwise SQLite will deadlock, // is closed (either committed or rolled back). Otherwise SQLite will deadlock,
// as it will make any other query wait until this transaction is done. // as it will make any other query wait until this transaction is done.
func (db *DB) queriesWithTX() (*queriesTX, error) { func (db *DB) queriesWithTX() (*queriesTX, error) {
sqldb, err := db.gormDB.DB() tx, err := db.sqlDB.Begin()
if err != nil {
panic(fmt.Sprintf("could not get low-level database driver: %v", err))
}
tx, err := sqldb.Begin()
if err != nil { if err != nil {
return nil, fmt.Errorf("could not begin database transaction: %w", err) return nil, fmt.Errorf("could not begin database transaction: %w", err)
} }
@ -231,7 +193,7 @@ func (db *DB) queriesWithTX() (*queriesTX, error) {
// now returns 'now' as reported by db.nowfunc. // now returns 'now' as reported by db.nowfunc.
// It always converts the timestamp to UTC. // It always converts the timestamp to UTC.
func (db *DB) now() time.Time { func (db *DB) now() time.Time {
return db.nowfunc().UTC() return db.nowfunc()
} }
// nowNullable returns the result of `now()` wrapped in a sql.NullTime. // nowNullable returns the result of `now()` wrapped in a sql.NullTime.

View File

@ -25,12 +25,6 @@ func (db *DB) migrate(ctx context.Context) error {
log.Fatal().AnErr("cause", err).Msg("could not tell Goose to use sqlite3") log.Fatal().AnErr("cause", err).Msg("could not tell Goose to use sqlite3")
} }
// Hook up Goose to the database.
lowLevelDB, err := db.gormDB.DB()
if err != nil {
log.Fatal().AnErr("cause", err).Msg("GORM would not give us its low-level interface")
}
// Disable foreign key constraints during the migrations. This is necessary // Disable foreign key constraints during the migrations. This is necessary
// for SQLite to do column renames / drops, as that requires creating a new // for SQLite to do column renames / drops, as that requires creating a new
// table with the new schema, copying the data, dropping the old table, and // table with the new schema, copying the data, dropping the old table, and
@ -47,7 +41,7 @@ func (db *DB) migrate(ctx context.Context) error {
// Run Goose. // Run Goose.
log.Debug().Msg("migrating database with Goose") log.Debug().Msg("migrating database with Goose")
if err := goose.UpContext(ctx, lowLevelDB, "migrations"); err != nil { if err := goose.UpContext(ctx, db.sqlDB, "migrations"); err != nil {
log.Fatal().AnErr("cause", err).Msg("could not migrate database to the latest version") log.Fatal().AnErr("cause", err).Msg("could not migrate database to the latest version")
} }

View File

@ -296,7 +296,7 @@ func TestRequestJobMassDeletion(t *testing.T) {
ctx, close, db, job1, authoredJob1 := jobTasksTestFixtures(t) ctx, close, db, job1, authoredJob1 := jobTasksTestFixtures(t)
defer close() defer close()
origGormNow := db.gormDB.NowFunc realNowFunc := db.nowfunc
now := db.now() now := db.now()
// Ensure different jobs get different timestamps. // Ensure different jobs get different timestamps.
@ -313,12 +313,12 @@ func TestRequestJobMassDeletion(t *testing.T) {
job4 := persistAuthoredJob(t, ctx, db, authoredJob4) job4 := persistAuthoredJob(t, ctx, db, authoredJob4)
// Request that "job3 and older" gets deleted. // Request that "job3 and older" gets deleted.
timeOfDeleteRequest := origGormNow() timeOfDeleteRequest := realNowFunc()
db.nowfunc = func() time.Time { return timeOfDeleteRequest } db.nowfunc = func() time.Time { return timeOfDeleteRequest }
uuids, err := db.RequestJobMassDeletion(ctx, job3.UpdatedAt) uuids, err := db.RequestJobMassDeletion(ctx, job3.UpdatedAt)
require.NoError(t, err) require.NoError(t, err)
db.nowfunc = origGormNow db.nowfunc = realNowFunc
// Only jobs 3 and 4 should be updated. // Only jobs 3 and 4 should be updated.
assert.Equal(t, []string{job3.UUID, job4.UUID}, uuids) assert.Equal(t, []string{job3.UUID, job4.UUID}, uuids)

View File

@ -5,149 +5,12 @@ package persistence
import ( import (
"context" "context"
"database/sql" "database/sql"
"errors"
"fmt"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"gorm.io/gorm"
gormlogger "gorm.io/gorm/logger"
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
) )
// dbLogger implements the behaviour of Gorm's default logger on top of Zerolog.
// See https://github.com/go-gorm/gorm/blob/master/logger/logger.go
type dbLogger struct {
zlog *zerolog.Logger
IgnoreRecordNotFoundError bool
SlowThreshold time.Duration
}
var _ gormlogger.Interface = (*dbLogger)(nil)
var logLevelMap = map[gormlogger.LogLevel]zerolog.Level{
gormlogger.Silent: zerolog.Disabled,
gormlogger.Error: zerolog.ErrorLevel,
gormlogger.Warn: zerolog.WarnLevel,
gormlogger.Info: zerolog.InfoLevel,
}
func gormToZlogLevel(logLevel gormlogger.LogLevel) zerolog.Level {
zlogLevel, ok := logLevelMap[logLevel]
if !ok {
// Just a default value that seemed sensible at the time of writing.
return zerolog.DebugLevel
}
return zlogLevel
}
// NewDBLogger wraps a zerolog logger to implement a Gorm logger interface.
func NewDBLogger(zlog zerolog.Logger) *dbLogger {
return &dbLogger{
zlog: &zlog,
// Remaining properties default to their zero value.
}
}
// LogMode returns a child logger at the given log level.
func (l *dbLogger) LogMode(logLevel gormlogger.LogLevel) gormlogger.Interface {
childLogger := l.zlog.Level(gormToZlogLevel(logLevel))
newlogger := *l
newlogger.zlog = &childLogger
return &newlogger
}
func (l *dbLogger) Info(ctx context.Context, msg string, args ...interface{}) {
l.logEvent(zerolog.InfoLevel, msg, args)
}
func (l *dbLogger) Warn(ctx context.Context, msg string, args ...interface{}) {
l.logEvent(zerolog.WarnLevel, msg, args)
}
func (l *dbLogger) Error(ctx context.Context, msg string, args ...interface{}) {
l.logEvent(zerolog.ErrorLevel, msg, args)
}
// Trace traces the execution of SQL and potentially logs errors, warnings, and infos.
// Note that it doesn't mean "trace-level logging".
func (l *dbLogger) Trace(ctx context.Context, begin time.Time, fc func() (sql string, rowsAffected int64), err error) {
zlogLevel := l.zlog.GetLevel()
if zlogLevel == zerolog.Disabled {
return
}
elapsed := time.Since(begin)
logCtx := l.zlog.With().CallerWithSkipFrameCount(5)
// Function to lazily get the SQL, affected rows, and logger.
buildLogger := func() (loggerPtr *zerolog.Logger, sql string) {
sql, rows := fc()
logCtx = logCtx.AnErr("cause", err)
if rows >= 0 {
logCtx = logCtx.Int64("rowsAffected", rows)
}
logger := logCtx.Logger()
return &logger, sql
}
switch {
case err != nil && zlogLevel <= zerolog.ErrorLevel:
logger, sql := buildLogger()
if l.silenceLoggingError(err) {
logger.Debug().Msg(sql)
} else {
logger.Error().Msg(sql)
}
case elapsed > l.SlowThreshold && l.SlowThreshold != 0 && zlogLevel <= zerolog.WarnLevel:
logger, sql := buildLogger()
logger.Warn().
Str("sql", sql).
Dur("elapsed", elapsed).
Dur("slowThreshold", l.SlowThreshold).
Msg("slow database query")
case zlogLevel <= zerolog.TraceLevel:
logger, sql := buildLogger()
logger.Trace().Msg(sql)
}
}
func (l dbLogger) silenceLoggingError(err error) bool {
switch {
case l.IgnoreRecordNotFoundError && errors.Is(err, gorm.ErrRecordNotFound):
return true
case errors.Is(err, context.Canceled):
// These are usually caused by the HTTP client connection closing. Stopping
// a database query is normal behaviour in such a case, so this shouldn't be
// logged as an error.
return true
default:
return false
}
}
// logEvent logs an even at the given level.
func (l dbLogger) logEvent(level zerolog.Level, msg string, args ...interface{}) {
if l.zlog.GetLevel() > level {
return
}
logger := l.logger(args)
logger.WithLevel(level).Msg("logEvent: " + msg)
}
// logger constructs a zerolog logger. The given arguments are added via reflection.
func (l dbLogger) logger(args ...interface{}) zerolog.Logger {
logCtx := l.zlog.With()
for idx, arg := range args {
logCtx.Interface(fmt.Sprintf("arg%d", idx), arg)
}
return logCtx.Logger()
}
// LoggingDBConn wraps a database/sql.DB connection, so that it can be used with // LoggingDBConn wraps a database/sql.DB connection, so that it can be used with
// sqlc and log all the queries. // sqlc and log all the queries.
type LoggingDBConn struct { type LoggingDBConn struct {

View File

@ -108,3 +108,24 @@ func (q *Queries) PragmaBusyTimeout(ctx context.Context, busyTimeout time.Durati
_, err := q.db.ExecContext(ctx, sql) _, err := q.db.ExecContext(ctx, sql)
return err return err
} }
const pragmaJournalModeWAL = `PRAGMA journal_mode = WAL`
func (q *Queries) PragmaJournalModeWAL(ctx context.Context) error {
_, err := q.db.ExecContext(ctx, pragmaJournalModeWAL)
return err
}
const pragmaSynchronousNormal = `PRAGMA synchronous = normal`
func (q *Queries) PragmaSynchronousNormal(ctx context.Context) error {
_, err := q.db.ExecContext(ctx, pragmaSynchronousNormal)
return err
}
const vacuum = `VACUUM`
func (q *Queries) Vacuum(ctx context.Context) error {
_, err := q.db.ExecContext(ctx, vacuum)
return err
}

View File

@ -5,17 +5,12 @@ package persistence
import ( import (
"context" "context"
"database/sql"
"fmt" "fmt"
"os" "os"
"testing" "testing"
"time" "time"
"github.com/glebarez/sqlite"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"gorm.io/gorm"
"projects.blender.org/studio/flamenco/pkg/api" "projects.blender.org/studio/flamenco/pkg/api"
) )
@ -31,29 +26,16 @@ func CreateTestDB() (db *DB, closer func()) {
} }
} }
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
var err error var err error
dblogger := NewDBLogger(log.Level(zerolog.TraceLevel).Output(os.Stdout)) db, err = openDB(ctx, TestDSN)
// Open the database ourselves, so that we have a low-level connection that
// can be closed when the unit test is done running.
sqliteConn, err := sql.Open(sqlite.DriverName, TestDSN)
if err != nil {
panic(fmt.Sprintf("opening SQLite connection: %v", err))
}
config := gorm.Config{
Logger: dblogger,
ConnPool: sqliteConn,
}
db, err = openDBWithConfig(TestDSN, &config)
if err != nil { if err != nil {
panic(fmt.Sprintf("opening DB: %v", err)) panic(fmt.Sprintf("opening DB: %v", err))
} }
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
err = db.migrate(ctx) err = db.migrate(ctx)
if err != nil { if err != nil {
panic(fmt.Sprintf("migrating DB: %v", err)) panic(fmt.Sprintf("migrating DB: %v", err))