diff --git a/internal/worker/persistence/test_support.go b/internal/worker/persistence/test_support.go new file mode 100644 index 00000000..0aa0debc --- /dev/null +++ b/internal/worker/persistence/test_support.go @@ -0,0 +1,71 @@ +// Package persistence provides the database interface for Flamenco Manager. +package persistence + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "fmt" + "os" + "time" +) + +// Change this to a filename if you want to run a single test and inspect the +// resulting database. +const TestDSN = "file::memory:" + +func createTestDB() (db *DB, closer func()) { + // Delete the SQLite file if it exists on disk. + if _, err := os.Stat(TestDSN); err == nil { + if err := os.Remove(TestDSN); err != nil { + panic(fmt.Sprintf("unable to remove %s: %v", TestDSN, err)) + } + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + var err error + + db, err = openDB(ctx, TestDSN) + if err != nil { + panic(fmt.Sprintf("opening DB: %v", err)) + } + + err = db.migrate(ctx) + if err != nil { + panic(fmt.Sprintf("migrating DB: %v", err)) + } + + closer = func() { + if err := db.Close(); err != nil { + panic(fmt.Sprintf("closing DB: %v", err)) + } + } + + return db, closer +} + +// persistenceTestFixtures creates a test database and returns it and a context. +// Tests should call the returned cancel function when they're done. +func persistenceTestFixtures(testContextTimeout time.Duration) (context.Context, context.CancelFunc, *DB) { + db, dbCloser := createTestDB() + + var ( + ctx context.Context + ctxCancel context.CancelFunc + ) + if testContextTimeout > 0 { + ctx, ctxCancel = context.WithTimeout(context.Background(), testContextTimeout) + } else { + ctx = context.Background() + ctxCancel = func() {} + } + + cancel := func() { + ctxCancel() + dbCloser() + } + + return ctx, cancel, db +} diff --git a/internal/worker/persistence/upstream_buffer_test.go b/internal/worker/persistence/upstream_buffer_test.go new file mode 100644 index 00000000..93a683e2 --- /dev/null +++ b/internal/worker/persistence/upstream_buffer_test.go @@ -0,0 +1,104 @@ +package persistence + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "projects.blender.org/studio/flamenco/internal/worker/persistence/sqlc" + "projects.blender.org/studio/flamenco/pkg/api" +) + +const defaultTimeout = 1 * time.Second + +func TestUpstreamBufferQueueEmpty(t *testing.T) { + ctx, cancel, db := persistenceTestFixtures(defaultTimeout) + defer cancel() + + // Queue should be empty at first. + size, err := db.UpstreamBufferQueueSize(ctx) + require.NoError(t, err) + assert.Zero(t, size) + + // Getting the first queued item on an empty queue should work. + first, err := db.UpstreamBufferFrontItem(ctx) + require.NoError(t, err) + require.Nil(t, first) +} + +func TestUpstreamBufferQueue(t *testing.T) { + ctx, cancel, db := persistenceTestFixtures(defaultTimeout) + defer cancel() + + // Mock the clock so we can compare 'created at' timestamps. + // The database should order by ID anyway, and not by timestamp. + fixedNow := db.now() + db.nowfunc = func() time.Time { return fixedNow } + + // Queue an update. + taskUUID := "3d1e2419-ca9d-4500-bd4f-1e14b5e82947" + update1 := api.TaskUpdateJSONRequestBody{ + Activity: ptr("Tešt active"), + TaskStatus: ptr(api.TaskStatusActive), + } + require.NoError(t, db.UpstreamBufferQueue(ctx, taskUUID, update1)) + + // Queue should have grown. + size, err := db.UpstreamBufferQueueSize(ctx) + require.NoError(t, err) + assert.Equal(t, 1, size) + + // Queue another update. + update2 := api.TaskUpdateJSONRequestBody{ + TaskStatus: ptr(api.TaskStatusCompleted), + } + require.NoError(t, db.UpstreamBufferQueue(ctx, taskUUID, update2)) + + // Queue should have grown again. + size, err = db.UpstreamBufferQueueSize(ctx) + require.NoError(t, err) + assert.Equal(t, 2, size) + + // First update should be at the front of the queue. + first, err := db.UpstreamBufferFrontItem(ctx) + require.NoError(t, err) + { + expect := TaskUpdate{ + TaskUpdate: sqlc.TaskUpdate{ + ID: 1, + CreatedAt: fixedNow, + TaskID: taskUUID, + Payload: []byte(`{"activity":"Tešt active","taskStatus":"active"}`), + }, + } + assert.Equal(t, &expect, first) + } + + // Deleting should work, and should move the next queued item tot the front of + // the queue. + require.NoError(t, db.UpstreamBufferDiscard(ctx, first)) + size, err = db.UpstreamBufferQueueSize(ctx) + require.NoError(t, err) + assert.Equal(t, 1, size) + second, err := db.UpstreamBufferFrontItem(ctx) + require.NoError(t, err) + { + expect := TaskUpdate{ + TaskUpdate: sqlc.TaskUpdate{ + ID: 2, + CreatedAt: fixedNow, + TaskID: taskUUID, + Payload: []byte(`{"taskStatus":"completed"}`), + }, + } + assert.Equal(t, &expect, second) + } +} + +func ptr[T any](value T) *T { + return &value +}