Worker: add unit test for upstream buffer
No functional changes, just a test.
This commit is contained in:
parent
aa964ac205
commit
71ae57977d
71
internal/worker/persistence/test_support.go
Normal file
71
internal/worker/persistence/test_support.go
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
// Package persistence provides the database interface for Flamenco Manager.
|
||||||
|
package persistence
|
||||||
|
|
||||||
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Change this to a filename if you want to run a single test and inspect the
|
||||||
|
// resulting database.
|
||||||
|
const TestDSN = "file::memory:"
|
||||||
|
|
||||||
|
func createTestDB() (db *DB, closer func()) {
|
||||||
|
// Delete the SQLite file if it exists on disk.
|
||||||
|
if _, err := os.Stat(TestDSN); err == nil {
|
||||||
|
if err := os.Remove(TestDSN); err != nil {
|
||||||
|
panic(fmt.Sprintf("unable to remove %s: %v", TestDSN, err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
var err error
|
||||||
|
|
||||||
|
db, err = openDB(ctx, TestDSN)
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Sprintf("opening DB: %v", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.migrate(ctx)
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Sprintf("migrating DB: %v", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
closer = func() {
|
||||||
|
if err := db.Close(); err != nil {
|
||||||
|
panic(fmt.Sprintf("closing DB: %v", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return db, closer
|
||||||
|
}
|
||||||
|
|
||||||
|
// persistenceTestFixtures creates a test database and returns it and a context.
|
||||||
|
// Tests should call the returned cancel function when they're done.
|
||||||
|
func persistenceTestFixtures(testContextTimeout time.Duration) (context.Context, context.CancelFunc, *DB) {
|
||||||
|
db, dbCloser := createTestDB()
|
||||||
|
|
||||||
|
var (
|
||||||
|
ctx context.Context
|
||||||
|
ctxCancel context.CancelFunc
|
||||||
|
)
|
||||||
|
if testContextTimeout > 0 {
|
||||||
|
ctx, ctxCancel = context.WithTimeout(context.Background(), testContextTimeout)
|
||||||
|
} else {
|
||||||
|
ctx = context.Background()
|
||||||
|
ctxCancel = func() {}
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel := func() {
|
||||||
|
ctxCancel()
|
||||||
|
dbCloser()
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx, cancel, db
|
||||||
|
}
|
104
internal/worker/persistence/upstream_buffer_test.go
Normal file
104
internal/worker/persistence/upstream_buffer_test.go
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
package persistence
|
||||||
|
|
||||||
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"projects.blender.org/studio/flamenco/internal/worker/persistence/sqlc"
|
||||||
|
"projects.blender.org/studio/flamenco/pkg/api"
|
||||||
|
)
|
||||||
|
|
||||||
|
const defaultTimeout = 1 * time.Second
|
||||||
|
|
||||||
|
func TestUpstreamBufferQueueEmpty(t *testing.T) {
|
||||||
|
ctx, cancel, db := persistenceTestFixtures(defaultTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Queue should be empty at first.
|
||||||
|
size, err := db.UpstreamBufferQueueSize(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Zero(t, size)
|
||||||
|
|
||||||
|
// Getting the first queued item on an empty queue should work.
|
||||||
|
first, err := db.UpstreamBufferFrontItem(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Nil(t, first)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpstreamBufferQueue(t *testing.T) {
|
||||||
|
ctx, cancel, db := persistenceTestFixtures(defaultTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Mock the clock so we can compare 'created at' timestamps.
|
||||||
|
// The database should order by ID anyway, and not by timestamp.
|
||||||
|
fixedNow := db.now()
|
||||||
|
db.nowfunc = func() time.Time { return fixedNow }
|
||||||
|
|
||||||
|
// Queue an update.
|
||||||
|
taskUUID := "3d1e2419-ca9d-4500-bd4f-1e14b5e82947"
|
||||||
|
update1 := api.TaskUpdateJSONRequestBody{
|
||||||
|
Activity: ptr("Tešt active"),
|
||||||
|
TaskStatus: ptr(api.TaskStatusActive),
|
||||||
|
}
|
||||||
|
require.NoError(t, db.UpstreamBufferQueue(ctx, taskUUID, update1))
|
||||||
|
|
||||||
|
// Queue should have grown.
|
||||||
|
size, err := db.UpstreamBufferQueueSize(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, 1, size)
|
||||||
|
|
||||||
|
// Queue another update.
|
||||||
|
update2 := api.TaskUpdateJSONRequestBody{
|
||||||
|
TaskStatus: ptr(api.TaskStatusCompleted),
|
||||||
|
}
|
||||||
|
require.NoError(t, db.UpstreamBufferQueue(ctx, taskUUID, update2))
|
||||||
|
|
||||||
|
// Queue should have grown again.
|
||||||
|
size, err = db.UpstreamBufferQueueSize(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, 2, size)
|
||||||
|
|
||||||
|
// First update should be at the front of the queue.
|
||||||
|
first, err := db.UpstreamBufferFrontItem(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
{
|
||||||
|
expect := TaskUpdate{
|
||||||
|
TaskUpdate: sqlc.TaskUpdate{
|
||||||
|
ID: 1,
|
||||||
|
CreatedAt: fixedNow,
|
||||||
|
TaskID: taskUUID,
|
||||||
|
Payload: []byte(`{"activity":"Tešt active","taskStatus":"active"}`),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
assert.Equal(t, &expect, first)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deleting should work, and should move the next queued item tot the front of
|
||||||
|
// the queue.
|
||||||
|
require.NoError(t, db.UpstreamBufferDiscard(ctx, first))
|
||||||
|
size, err = db.UpstreamBufferQueueSize(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, 1, size)
|
||||||
|
second, err := db.UpstreamBufferFrontItem(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
{
|
||||||
|
expect := TaskUpdate{
|
||||||
|
TaskUpdate: sqlc.TaskUpdate{
|
||||||
|
ID: 2,
|
||||||
|
CreatedAt: fixedNow,
|
||||||
|
TaskID: taskUUID,
|
||||||
|
Payload: []byte(`{"taskStatus":"completed"}`),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
assert.Equal(t, &expect, second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ptr[T any](value T) *T {
|
||||||
|
return &value
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user