flamenco/internal/worker/upstream_buffer.go
Sybren A. Stüvel 9f5e4cc0cc License: license all code under "GPL-3.0-or-later"
The add-on code was copy-pasted from other addons and used the GPL v2
license, whereas by accident the LICENSE text file had the GNU "Affero" GPL
license v3 (instead of regular GPL v3).

This is now all streamlined, and all code is licensed as "GPL v3 or later".

Furthermore, the code comments just show a SPDX License Identifier
instead of an entire license block.
2022-03-07 15:26:46 +01:00

335 lines
8.7 KiB
Go

package worker
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
"time"
"github.com/rs/zerolog/log"
_ "modernc.org/sqlite"
"git.blender.org/flamenco/pkg/api"
)
// TODO: pull the SQLite stuff out of this file into a more global place, so
// that other areas of Flamenco Worker can also use it.
// UpstreamBufferDB implements the UpstreamBuffer interface using a database as backend.
type UpstreamBufferDB struct {
db *sql.DB
dbMutex *sync.Mutex // Protects from "database locked" errors
client FlamencoClient
clock TimeService
flushInterval time.Duration
done chan struct{}
wg *sync.WaitGroup
}
const defaultUpstreamFlushInterval = 30 * time.Second
var _ UpstreamBuffer = (*UpstreamBufferDB)(nil)
func NewUpstreamBuffer(client FlamencoClient, clock TimeService) (*UpstreamBufferDB, error) {
ub := UpstreamBufferDB{
db: nil,
dbMutex: new(sync.Mutex),
client: client,
clock: clock,
flushInterval: defaultUpstreamFlushInterval,
done: make(chan struct{}),
wg: new(sync.WaitGroup),
}
return &ub, nil
}
// OpenDB opens the database. Must be called once before using.
func (ub *UpstreamBufferDB) OpenDB(ctx context.Context, databaseFilename string) error {
if ub.db != nil {
return errors.New("upstream buffer database already opened")
}
db, err := sql.Open("sqlite", databaseFilename)
if err != nil {
return fmt.Errorf("opening %s: %w", databaseFilename, err)
}
if err := db.PingContext(ctx); err != nil {
return fmt.Errorf("accessing %s: %w", databaseFilename, err)
}
ub.db = db
if err := ub.prepareDatabase(ctx); err != nil {
return err
}
ub.wg.Add(1)
go ub.periodicFlushLoop()
return nil
}
func (ub *UpstreamBufferDB) SendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error {
ub.dbMutex.Lock()
defer ub.dbMutex.Unlock()
queueSize, err := ub.queueSize(ctx)
if err != nil {
return fmt.Errorf("unable to determine upstream queue size: %w", err)
}
// Immediately queue if there is already stuff queued, to ensure the order of updates is maintained.
if queueSize > 0 {
log.Debug().Int("queueSize", queueSize).
Msg("task updates already queued, immediately queueing new update")
return ub.queueTaskUpdate(ctx, taskID, update)
}
// Try to deliver the update.
resp, err := ub.client.TaskUpdateWithResponse(ctx, taskID, update)
if err != nil {
log.Warn().Err(err).Str("task", taskID).
Msg("error communicating with Manager, going to queue task update for sending later")
return ub.queueTaskUpdate(ctx, taskID, update)
}
// The Manager responded, so no need to queue this update, even when there was an error.
switch resp.StatusCode() {
case http.StatusNoContent:
return nil
case http.StatusConflict:
return ErrTaskReassigned
default:
return fmt.Errorf("unknown error from Manager, code %d: %v",
resp.StatusCode(), resp.JSONDefault)
}
}
// Close releases the database. It does not try to flush any pending items.
func (ub *UpstreamBufferDB) Close(ctx context.Context) error {
if ub.db == nil {
return nil
}
// Stop the periodic flush loop.
close(ub.done)
ub.wg.Wait()
// Close the database.
return ub.db.Close()
}
// prepareDatabase creates the database schema, if necessary.
func (ub *UpstreamBufferDB) prepareDatabase(ctx context.Context) error {
ub.dbMutex.Lock()
defer ub.dbMutex.Unlock()
tx, err := ub.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("beginning database transaction: %w", err)
}
defer tx.Rollback()
stmt := `CREATE TABLE IF NOT EXISTS task_update_queue(task_id VARCHAR(36), payload BLOB)`
log.Debug().Str("sql", stmt).Msg("creating database table")
if _, err := tx.ExecContext(ctx, stmt); err != nil {
return fmt.Errorf("creating database table: %w", err)
}
if err = tx.Commit(); err != nil {
return fmt.Errorf("commiting creation of database table: %w", err)
}
return nil
}
func (ub *UpstreamBufferDB) queueSize(ctx context.Context) (int, error) {
if ub.db == nil {
log.Panic().Msg("no database opened, unable to inspect upstream queue")
}
var queueSize int
err := ub.db.
QueryRowContext(ctx, "SELECT count(*) FROM task_update_queue").
Scan(&queueSize)
switch {
case err == sql.ErrNoRows:
return 0, nil
case err != nil:
return 0, err
default:
return queueSize, nil
}
}
func (ub *UpstreamBufferDB) queueTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error {
if ub.db == nil {
log.Panic().Msg("no database opened, unable to queue task updates")
}
tx, err := ub.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("beginning database transaction: %w", err)
}
defer tx.Rollback()
blob, err := json.Marshal(update)
if err != nil {
return fmt.Errorf("converting task update to JSON: %w", err)
}
stmt := `INSERT INTO task_update_queue (task_id, payload) VALUES (?, ?)`
log.Debug().Str("sql", stmt).Str("task", taskID).Msg("inserting task update")
if _, err := tx.ExecContext(ctx, stmt, taskID, blob); err != nil {
return fmt.Errorf("queueing task update: %w", err)
}
if err = tx.Commit(); err != nil {
return fmt.Errorf("committing queued task update: %w", err)
}
return nil
}
func (ub *UpstreamBufferDB) Flush(ctx context.Context) error {
ub.dbMutex.Lock()
defer ub.dbMutex.Unlock()
if ub.db == nil {
log.Panic().Msg("no database opened, unable to queue task updates")
}
// See if we need to flush at all.
queueSize, err := ub.queueSize(ctx)
switch {
case err != nil:
return fmt.Errorf("unable to determine queue size: %w", err)
case queueSize == 0:
log.Debug().Msg("task update queue empty, nothing to flush")
return nil
}
// Keep flushing until the queue is empty or there is an error.
var done bool
for !done {
done, err = ub.flushFirstItem(ctx)
if err != nil {
return err
}
}
return nil
}
func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err error) {
tx, err := ub.db.BeginTx(ctx, nil)
if err != nil {
return false, fmt.Errorf("beginning database transaction: %w", err)
}
defer tx.Rollback()
stmt := `SELECT rowid, task_id, payload FROM task_update_queue ORDER BY rowid LIMIT 1`
log.Trace().Str("sql", stmt).Msg("fetching queued task updates")
var rowID int64
var taskID string
var blob []byte
err = tx.QueryRowContext(ctx, stmt).Scan(&rowID, &taskID, &blob)
switch {
case err == sql.ErrNoRows:
// Flush operation is done.
log.Debug().Msg("task update queue empty")
return true, nil
case err != nil:
return false, fmt.Errorf("querying task update queue: %w", err)
}
logger := log.With().Str("task", taskID).Logger()
var update api.TaskUpdateJSONRequestBody
if err := json.Unmarshal(blob, &update); err != nil {
// If we can't unmarshal the queued task update, there is little else to do
// than to discard it and ignore it ever happened.
logger.Warn().Err(err).
Msg("unable to unmarshal queued task update, discarding")
if err := ub.discardRow(ctx, tx, rowID); err != nil {
return false, err
}
return false, tx.Commit()
}
// actually attempt delivery.
resp, err := ub.client.TaskUpdateWithResponse(ctx, taskID, update)
if err != nil {
logger.Info().Err(err).Msg("communication with Manager still problematic")
return true, err
}
// Regardless of the response, there is little else to do but to discard the
// update from the queue.
switch resp.StatusCode() {
case http.StatusNoContent:
logger.Debug().Msg("queued task updated accepted by Manager")
case http.StatusConflict:
logger.Warn().Msg("queued task update discarded by Manager, task was already reassigned to other Worker")
default:
logger.Warn().
Int("statusCode", resp.StatusCode()).
Interface("response", resp.JSONDefault).
Msg("queued task update discarded by Manager, unknown reason")
}
if err := ub.discardRow(ctx, tx, rowID); err != nil {
return false, err
}
return false, tx.Commit()
}
func (ub *UpstreamBufferDB) discardRow(ctx context.Context, tx *sql.Tx, rowID int64) error {
stmt := `DELETE FROM task_update_queue WHERE rowid = ?`
log.Trace().Str("sql", stmt).Int64("rowID", rowID).Msg("un-queueing task update")
_, err := tx.ExecContext(ctx, stmt, rowID)
if err != nil {
return fmt.Errorf("un-queueing task update: %w", err)
}
return nil
}
func (ub *UpstreamBufferDB) periodicFlushLoop() {
defer ub.wg.Done()
defer log.Debug().Msg("periodic task update flush loop stopping")
log.Debug().Msg("periodic task update flush loop starting")
ctx := context.Background()
for {
select {
case <-ub.done:
return
case <-ub.clock.After(ub.flushInterval):
log.Trace().Msg("task upstream queue: periodic flush")
err := ub.Flush(ctx)
if err != nil {
log.Warn().Err(err).Msg("error flushing task update queue")
}
}
}
}