diff --git a/internal/worker/upstream_buffer.go b/internal/worker/upstream_buffer.go index d0455411..a30b13a8 100644 --- a/internal/worker/upstream_buffer.go +++ b/internal/worker/upstream_buffer.go @@ -21,6 +21,13 @@ import ( // TODO: pull the SQLite stuff out of this file into a more global place, so // that other areas of Flamenco Worker can also use it. +// Note that there are two contexts used in this file. One (`dbCtx`) is for +// database access and is a local, short-lived, background context. The other +// (`ctx`) is the one that's passed from the caller, which should indicate the +// global worker context. If that context is done, queueing updates in the +// database will still work, but all communication with Flamenco Manager will +// halt. + // UpstreamBufferDB implements the UpstreamBuffer interface using a database as backend. type UpstreamBufferDB struct { db *sql.DB @@ -35,6 +42,7 @@ type UpstreamBufferDB struct { } const defaultUpstreamFlushInterval = 30 * time.Second +const databaseContextTimeout = 10 * time.Second var _ UpstreamBuffer = (*UpstreamBufferDB)(nil) @@ -54,7 +62,7 @@ func NewUpstreamBuffer(client FlamencoClient, clock TimeService) (*UpstreamBuffe } // OpenDB opens the database. Must be called once before using. -func (ub *UpstreamBufferDB) OpenDB(ctx context.Context, databaseFilename string) error { +func (ub *UpstreamBufferDB) OpenDB(dbCtx context.Context, databaseFilename string) error { if ub.db != nil { return errors.New("upstream buffer database already opened") } @@ -64,13 +72,13 @@ func (ub *UpstreamBufferDB) OpenDB(ctx context.Context, databaseFilename string) return fmt.Errorf("opening %s: %w", databaseFilename, err) } - if err := db.PingContext(ctx); err != nil { + if err := db.PingContext(dbCtx); err != nil { return fmt.Errorf("accessing %s: %w", databaseFilename, err) } ub.db = db - if err := ub.prepareDatabase(ctx); err != nil { + if err := ub.prepareDatabase(dbCtx); err != nil { return err } @@ -84,7 +92,7 @@ func (ub *UpstreamBufferDB) SendTaskUpdate(ctx context.Context, taskID string, u ub.dbMutex.Lock() defer ub.dbMutex.Unlock() - queueSize, err := ub.queueSize(ctx) + queueSize, err := ub.queueSize() if err != nil { return fmt.Errorf("unable to determine upstream queue size: %w", err) } @@ -93,7 +101,7 @@ func (ub *UpstreamBufferDB) SendTaskUpdate(ctx context.Context, taskID string, u if queueSize > 0 { log.Debug().Int("queueSize", queueSize). Msg("task updates already queued, immediately queueing new update") - return ub.queueTaskUpdate(ctx, taskID, update) + return ub.queueTaskUpdate(taskID, update) } // Try to deliver the update. @@ -101,7 +109,7 @@ func (ub *UpstreamBufferDB) SendTaskUpdate(ctx context.Context, taskID string, u if err != nil { log.Warn().Err(err).Str("task", taskID). Msg("error communicating with Manager, going to queue task update for sending later") - return ub.queueTaskUpdate(ctx, taskID, update) + return ub.queueTaskUpdate(taskID, update) } // The Manager responded, so no need to queue this update, even when there was an error. @@ -117,7 +125,7 @@ func (ub *UpstreamBufferDB) SendTaskUpdate(ctx context.Context, taskID string, u } // Close releases the database. It does not try to flush any pending items. -func (ub *UpstreamBufferDB) Close(ctx context.Context) error { +func (ub *UpstreamBufferDB) Close() error { if ub.db == nil { return nil } @@ -131,11 +139,11 @@ func (ub *UpstreamBufferDB) Close(ctx context.Context) error { } // prepareDatabase creates the database schema, if necessary. -func (ub *UpstreamBufferDB) prepareDatabase(ctx context.Context) error { +func (ub *UpstreamBufferDB) prepareDatabase(dbCtx context.Context) error { ub.dbMutex.Lock() defer ub.dbMutex.Unlock() - tx, err := ub.db.BeginTx(ctx, nil) + tx, err := ub.db.BeginTx(dbCtx, nil) if err != nil { return fmt.Errorf("beginning database transaction: %w", err) } @@ -144,7 +152,7 @@ func (ub *UpstreamBufferDB) prepareDatabase(ctx context.Context) error { stmt := `CREATE TABLE IF NOT EXISTS task_update_queue(task_id VARCHAR(36), payload BLOB)` log.Debug().Str("sql", stmt).Msg("creating database table") - if _, err := tx.ExecContext(ctx, stmt); err != nil { + if _, err := tx.ExecContext(dbCtx, stmt); err != nil { return fmt.Errorf("creating database table: %w", err) } @@ -155,15 +163,18 @@ func (ub *UpstreamBufferDB) prepareDatabase(ctx context.Context) error { return nil } -func (ub *UpstreamBufferDB) queueSize(ctx context.Context) (int, error) { +func (ub *UpstreamBufferDB) queueSize() (int, error) { if ub.db == nil { log.Panic().Msg("no database opened, unable to inspect upstream queue") } + dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), databaseContextTimeout) + defer dbCtxCancel() + var queueSize int err := ub.db. - QueryRowContext(ctx, "SELECT count(*) FROM task_update_queue"). + QueryRowContext(dbCtx, "SELECT count(*) FROM task_update_queue"). Scan(&queueSize) switch { @@ -176,12 +187,15 @@ func (ub *UpstreamBufferDB) queueSize(ctx context.Context) (int, error) { } } -func (ub *UpstreamBufferDB) queueTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error { +func (ub *UpstreamBufferDB) queueTaskUpdate(taskID string, update api.TaskUpdateJSONRequestBody) error { if ub.db == nil { log.Panic().Msg("no database opened, unable to queue task updates") } - tx, err := ub.db.BeginTx(ctx, nil) + dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), databaseContextTimeout) + defer dbCtxCancel() + + tx, err := ub.db.BeginTx(dbCtx, nil) if err != nil { return fmt.Errorf("beginning database transaction: %w", err) } @@ -195,7 +209,7 @@ func (ub *UpstreamBufferDB) queueTaskUpdate(ctx context.Context, taskID string, stmt := `INSERT INTO task_update_queue (task_id, payload) VALUES (?, ?)` log.Debug().Str("sql", stmt).Str("task", taskID).Msg("inserting task update") - if _, err := tx.ExecContext(ctx, stmt, taskID, blob); err != nil { + if _, err := tx.ExecContext(dbCtx, stmt, taskID, blob); err != nil { return fmt.Errorf("queueing task update: %w", err) } @@ -215,7 +229,7 @@ func (ub *UpstreamBufferDB) Flush(ctx context.Context) error { } // See if we need to flush at all. - queueSize, err := ub.queueSize(ctx) + queueSize, err := ub.queueSize() switch { case err != nil: return fmt.Errorf("unable to determine queue size: %w", err) @@ -237,7 +251,10 @@ func (ub *UpstreamBufferDB) Flush(ctx context.Context) error { } func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err error) { - tx, err := ub.db.BeginTx(ctx, nil) + dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), databaseContextTimeout) + defer dbCtxCancel() + + tx, err := ub.db.BeginTx(dbCtx, nil) if err != nil { return false, fmt.Errorf("beginning database transaction: %w", err) } @@ -250,7 +267,7 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err var taskID string var blob []byte - err = tx.QueryRowContext(ctx, stmt).Scan(&rowID, &taskID, &blob) + err = tx.QueryRowContext(dbCtx, stmt).Scan(&rowID, &taskID, &blob) switch { case err == sql.ErrNoRows: // Flush operation is done. @@ -268,7 +285,7 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err // than to discard it and ignore it ever happened. logger.Warn().Err(err). Msg("unable to unmarshal queued task update, discarding") - if err := ub.discardRow(ctx, tx, rowID); err != nil { + if err := ub.discardRow(tx, rowID); err != nil { return false, err } return false, tx.Commit() @@ -295,17 +312,20 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err Msg("queued task update discarded by Manager, unknown reason") } - if err := ub.discardRow(ctx, tx, rowID); err != nil { + if err := ub.discardRow(tx, rowID); err != nil { return false, err } return false, tx.Commit() } -func (ub *UpstreamBufferDB) discardRow(ctx context.Context, tx *sql.Tx, rowID int64) error { +func (ub *UpstreamBufferDB) discardRow(tx *sql.Tx, rowID int64) error { + dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), databaseContextTimeout) + defer dbCtxCancel() + stmt := `DELETE FROM task_update_queue WHERE rowid = ?` log.Trace().Str("sql", stmt).Int64("rowID", rowID).Msg("un-queueing task update") - _, err := tx.ExecContext(ctx, stmt, rowID) + _, err := tx.ExecContext(dbCtx, stmt, rowID) if err != nil { return fmt.Errorf("un-queueing task update: %w", err) } @@ -334,7 +354,7 @@ func (ub *UpstreamBufferDB) periodicFlushLoop() { } func rollbackTransaction(tx *sql.Tx) { - if err := tx.Rollback(); err != nil { + if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { log.Error().Err(err).Msg("rolling back transaction") } } diff --git a/internal/worker/upstream_buffer_test.go b/internal/worker/upstream_buffer_test.go index 02344879..7348d939 100644 --- a/internal/worker/upstream_buffer_test.go +++ b/internal/worker/upstream_buffer_test.go @@ -46,7 +46,7 @@ func TestUpstreamBufferCloseUnopened(t *testing.T) { defer mockCtrl.Finish() ub, _ := mockUpstreamBufferDB(t, mockCtrl) - err := ub.Close(context.Background()) + err := ub.Close() assert.NoError(t, err, "Closing without opening should be OK") } @@ -76,7 +76,7 @@ func TestUpstreamBufferManagerUnavailable(t *testing.T) { assert.NoError(t, err) // Check the queue size, it should have an item queued. - queueSize, err := ub.queueSize(ctx) + queueSize, err := ub.queueSize() assert.NoError(t, err) assert.Equal(t, 1, queueSize) @@ -94,10 +94,10 @@ func TestUpstreamBufferManagerUnavailable(t *testing.T) { // Queue should be empty now. ub.dbMutex.Lock() - queueSize, err = ub.queueSize(ctx) + queueSize, err = ub.queueSize() ub.dbMutex.Unlock() assert.NoError(t, err) assert.Equal(t, 0, queueSize) - assert.NoError(t, ub.Close(ctx)) + assert.NoError(t, ub.Close()) }