flamenco/internal/manager/persistence/task_scheduler_test.go
Sybren A. Stüvel 531a0184f7 Transition from ex-GORM structs to sqlc structs (5/5)
Replace old used-to-be-GORM datastructures (#104305) with sqlc-generated
structs. This also makes it possible to use more specific structs that
are more taylored to the specific queries, increasing efficiency.

This commit deals with the remaining areas, like the job deleter, task
timeout checker, and task state machine. And anything else to get things
running again.

Functional changes are kept to a minimum, as the API still serves the
same data.

Because this work covers so much of Flamenco's code, it's been split up
into different commits. Each commit brings Flamenco to a state where it
compiles and unit tests pass. Only the result of the final commit has
actually been tested properly.

Ref: #104343
2024-12-04 14:00:22 +01:00

554 lines
18 KiB
Go

package persistence
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"database/sql"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"projects.blender.org/studio/flamenco/internal/manager/job_compilers"
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
"projects.blender.org/studio/flamenco/internal/uuid"
"projects.blender.org/studio/flamenco/pkg/api"
)
const schedulerTestTimeout = 100 * time.Hour
const schedulerTestTimeoutlong = 5000 * time.Millisecond
func TestNoTasks(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
scheduledTask, err := db.ScheduleTask(ctx, &w)
assert.Nil(t, scheduledTask)
require.NoError(t, err)
}
func TestOneJobOneTask(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
authTask := authorTestTask("the task", "blender")
atj := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask)
job := constructTestJob(ctx, t, db, atj)
scheduledTask, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
// Check the returned task.
require.NotNil(t, scheduledTask)
assert.Equal(t, job.ID, scheduledTask.Task.JobID)
require.True(t, scheduledTask.Task.WorkerID.Valid, "no worker assigned to returned task")
assert.Equal(t, w.ID, scheduledTask.Task.WorkerID.Int64, "task must be assigned to the requesting worker")
// Check the task in the database.
now := db.now()
taskJobWorker, err := db.FetchTask(context.Background(), authTask.UUID)
require.NoError(t, err)
require.NotNil(t, taskJobWorker)
require.True(t, taskJobWorker.Task.WorkerID.Valid, "no worker assigned to task in database")
require.NotZero(t, taskJobWorker.WorkerUUID, "no worker fetched from database even though assigned assigned to task")
assert.Equal(t, w.ID, taskJobWorker.Task.WorkerID.Int64, "task must be assigned to the requesting worker")
assert.WithinDuration(t, now, taskJobWorker.Task.LastTouchedAt.Time, time.Second, "task must be 'touched' by the worker after scheduling")
}
func TestOneJobThreeTasksByPrio(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
att1 := authorTestTask("1 low-prio task", "blender")
att2 := authorTestTask("2 high-prio task", "ffmpeg")
att2.Priority = 100
att3 := authorTestTask("3 low-prio task", "blender")
atj := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render",
att1, att2, att3)
job := constructTestJob(ctx, t, db, atj)
scheduledTask, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
require.NotNil(t, scheduledTask)
assert.Equal(t, job.ID, scheduledTask.Task.JobID)
assert.Equal(t, job.UUID, scheduledTask.JobUUID)
assert.Equal(t, att2.Name, scheduledTask.Task.Name, "the high-prio task should have been chosen")
}
func TestOneJobThreeTasksByDependencies(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
att1 := authorTestTask("1 low-prio task", "blender")
att2 := authorTestTask("2 high-prio task", "ffmpeg")
att2.Priority = 100
att2.Dependencies = []*job_compilers.AuthoredTask{&att1}
att3 := authorTestTask("3 low-prio task", "blender")
atj := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render",
att1, att2, att3)
job := constructTestJob(ctx, t, db, atj)
scheduledTask, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
require.NotNil(t, scheduledTask)
assert.Equal(t, job.ID, scheduledTask.Task.JobID)
assert.Equal(t, job.UUID, scheduledTask.JobUUID)
assert.Equal(t, att1.Name, scheduledTask.Task.Name, "the first task should have been chosen")
}
func TestTwoJobsThreeTasks(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
att1_1 := authorTestTask("1.1 low-prio task", "blender")
att1_2 := authorTestTask("1.2 high-prio task", "ffmpeg")
att1_2.Priority = 100
att1_2.Dependencies = []*job_compilers.AuthoredTask{&att1_1}
att1_3 := authorTestTask("1.3 low-prio task", "blender")
atj1 := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render",
att1_1, att1_2, att1_3)
att2_1 := authorTestTask("2.1 low-prio task", "blender")
att2_2 := authorTestTask("2.2 high-prio task", "ffmpeg")
att2_2.Priority = 100
att2_2.Dependencies = []*job_compilers.AuthoredTask{&att2_1}
att2_3 := authorTestTask("2.3 highest-prio task", "blender")
att2_3.Priority = 150
atj2 := authorTestJob(
"7180617b-da70-411c-8b38-b972ab2bae8d",
"simple-blender-render",
att2_1, att2_2, att2_3)
atj2.Priority = 100 // Increase priority over job 1.
constructTestJob(ctx, t, db, atj1)
job2 := constructTestJob(ctx, t, db, atj2)
scheduledTask, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
require.NotNil(t, scheduledTask)
assert.Equal(t, job2.ID, scheduledTask.Task.JobID)
assert.Equal(t, att2_3.Name, scheduledTask.Task.Name, "the 3rd task of the 2nd job should have been chosen")
}
// TestFanOutFanIn tests one starting task, then multiple tasks that depend on
// it that can run in parallel (fan-out), then one task that depends on all the
// parallel tasks (fan-in), and finally one last task that depends on the fan-in
// task.
func TestFanOutFanIn(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
// Single start task.
task1 := authorTestTask("1 start", "blender")
// Fan out.
task2_1 := authorTestTask("2.1 parallel", "blender")
task2_1.Dependencies = []*job_compilers.AuthoredTask{&task1}
task2_2 := authorTestTask("2.2 parallel", "blender")
task2_2.Dependencies = []*job_compilers.AuthoredTask{&task1}
task2_3 := authorTestTask("2.3 parallel", "blender")
task2_3.Dependencies = []*job_compilers.AuthoredTask{&task1}
// Fan in.
task3 := authorTestTask("3 fan-in", "blender")
task3.Dependencies = []*job_compilers.AuthoredTask{&task2_1, &task2_2, &task2_3}
// Final task.
task4 := authorTestTask("4 final", "ffmpeg")
task4.Dependencies = []*job_compilers.AuthoredTask{&task3}
// Construct the job, with the tasks not in execution order, to root out
// potential issues with the dependency resolution.
atj := authorTestJob(
"92e75ecf-7d2a-461c-8443-2fbe6a8b559d",
"fan-out-fan-in",
task4, task3, task2_1, task2_2, task1, task2_3)
require.NotNil(t, constructTestJob(ctx, t, db, atj))
// Check the order in which tasks are handed out.
executionOrder := []string{} // Slice of task names.
for index := range 6 {
scheduledTask, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
require.NotNil(t, scheduledTask, "task #%d is nil", index)
executionOrder = append(executionOrder, scheduledTask.Task.Name)
// Fake that the task has been completed by the worker.
scheduledTask.Task.Status = api.TaskStatusCompleted
require.NoError(t, db.SaveTaskStatus(ctx, &scheduledTask.Task))
}
expectedOrder := []string{
"1 start",
"2.1 parallel",
"2.2 parallel",
"2.3 parallel",
"3 fan-in",
"4 final",
}
assert.Equal(t, expectedOrder, executionOrder)
}
func TestSomeButNotAllDependenciesCompleted(t *testing.T) {
// There was a bug in the task scheduler query, where it would schedule a task
// if any of its dependencies was completed (instead of all dependencies).
// This test reproduces that problematic scenario.
ctx, cancel, db := persistenceTestFixtures(schedulerTestTimeout)
defer cancel()
att1 := authorTestTask("1.1 completed task", "blender")
att2 := authorTestTask("1.2 queued task of unsupported type", "unsupported")
att3 := authorTestTask("1.3 queued task with queued dependency", "ffmpeg")
att3.Dependencies = []*job_compilers.AuthoredTask{&att1, &att2}
atj := authorTestJob("1295757b-e668-4c49-8b89-f73db8270e42", "simple-blender-render", att1, att2, att3)
constructTestJob(ctx, t, db, atj)
// Complete the first task. The other two are still `queued`.
setTaskStatus(t, db, att1.UUID, api.TaskStatusCompleted)
w := linuxWorker(t, db)
scheduledTask, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
if scheduledTask != nil {
t.Fatalf("there should not be any task assigned, but received %q", scheduledTask.Task.Name)
}
}
func TestAlreadyAssigned(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
att1 := authorTestTask("1 low-prio task", "blender")
att2 := authorTestTask("2 high-prio task", "ffmpeg")
att2.Priority = 100
att3 := authorTestTask("3 low-prio task", "blender")
atj := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render",
att1, att2, att3)
constructTestJob(ctx, t, db, atj)
// Assign the task to the worker, and mark it as Active.
// This should make it get returned by the scheduler, even when there is
// another, higher-prio task to be done.
dbTask3, err := db.FetchTask(ctx, att3.UUID)
require.NoError(t, err)
dbTask3.Task.WorkerID = sql.NullInt64{Int64: w.ID, Valid: true}
dbTask3.Task.Status = api.TaskStatusActive
err = db.SaveTask(ctx, &dbTask3.Task)
require.NoError(t, err)
scheduledTask, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
require.NotNil(t, scheduledTask)
assert.Equal(t, att3.Name, scheduledTask.Task.Name, "the already-assigned task should have been chosen")
}
func TestAssignedToOtherWorker(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
w2 := windowsWorker(t, db)
att1 := authorTestTask("1 low-prio task", "blender")
att2 := authorTestTask("2 high-prio task", "ffmpeg")
att2.Priority = 100
atj := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render",
att1, att2)
constructTestJob(ctx, t, db, atj)
// Assign the high-prio task to the other worker. Because the task is queued,
// it shouldn't matter which worker it's assigned to.
dbTask2, err := db.FetchTask(ctx, att2.UUID)
require.NoError(t, err)
dbTask2.Task.WorkerID = sql.NullInt64{Int64: w2.ID, Valid: true}
dbTask2.Task.Status = api.TaskStatusQueued
err = db.SaveTask(ctx, &dbTask2.Task)
require.NoError(t, err)
scheduledTask, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
require.NotNil(t, scheduledTask)
assert.Equal(t, att2.Name, scheduledTask.Task.Name, "the high-prio task should have been chosen")
assert.Equal(t, w.ID, scheduledTask.Task.WorkerID.Int64, "the task should now be assigned to the worker it was scheduled for")
}
func TestPreviouslyFailed(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
att1 := authorTestTask("1 failed task", "blender")
att2 := authorTestTask("2 expected task", "blender")
atj := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render",
att1, att2)
job := constructTestJob(ctx, t, db, atj)
// Mimick that this worker already failed the first task.
taskJobWorkers, err := db.FetchTasksOfJob(ctx, job)
require.NoError(t, err)
numFailed, err := db.AddWorkerToTaskFailedList(ctx, &taskJobWorkers[0].Task, &w)
require.NoError(t, err)
assert.Equal(t, 1, numFailed)
// This should assign the 2nd scheduledTask.
scheduledTask, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
require.NotNil(t, scheduledTask)
assert.Equal(t, att2.Name, scheduledTask.Task.Name, "the second task should have been chosen")
}
func TestWorkerTagJobWithTag(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(schedulerTestTimeout)
defer cancel()
// Create worker tags:
tag1 := WorkerTag{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Tag 1"}
tag2 := WorkerTag{UUID: "2f71dba1-cf92-4752-8386-f5926affabd5", Name: "Tag 2"}
require.NoError(t, db.CreateWorkerTag(ctx, &tag1))
require.NoError(t, db.CreateWorkerTag(ctx, &tag2))
// Create a worker in tag1:
workerC := linuxWorker(t, db)
require.NoError(t, db.WorkerSetTags(ctx, &workerC, []string{tag1.UUID}))
// Create a worker without tag:
workerNC := linuxWorker(t, db, func(w *Worker) {
w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e"
})
{ // Test job with different tag:
authTask := authorTestTask("the task", "blender")
job := authorTestJob("499cf0f8-e83d-4cb1-837a-df94789d07db", "simple-blender-render", authTask)
job.WorkerTagUUID = tag2.UUID
constructTestJob(ctx, t, db, job)
task, err := db.ScheduleTask(ctx, &workerC)
require.NoError(t, err)
assert.Nil(t, task, "job with different tag should not be scheduled")
}
{ // Test job with matching tag:
authTask := authorTestTask("the task", "blender")
job := authorTestJob("5d4c2321-0bb7-4c13-a9dd-32a2c0cd156e", "simple-blender-render", authTask)
job.WorkerTagUUID = tag1.UUID
constructTestJob(ctx, t, db, job)
scheduledTask, err := db.ScheduleTask(ctx, &workerC)
require.NoError(t, err)
require.NotNil(t, scheduledTask, "job with matching tag should be scheduled")
assert.Equal(t, authTask.UUID, scheduledTask.Task.UUID)
scheduledTask, err = db.ScheduleTask(ctx, &workerNC)
require.NoError(t, err)
assert.Nil(t, scheduledTask, "job with tag should not be scheduled for worker without tag")
}
}
func TestWorkerTagJobWithoutTag(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(schedulerTestTimeout)
defer cancel()
// Create worker tag:
tag1 := WorkerTag{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Tag 1"}
require.NoError(t, db.CreateWorkerTag(ctx, &tag1))
// Create a worker in tag1:
workerC := linuxWorker(t, db)
require.NoError(t, db.WorkerSetTags(ctx, &workerC, []string{tag1.UUID}))
// Create a worker without tag:
workerNC := linuxWorker(t, db, func(w *Worker) {
w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e"
})
// Test tag-less job:
authTask := authorTestTask("the task", "blender")
job := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask)
constructTestJob(ctx, t, db, job)
scheduledTask, err := db.ScheduleTask(ctx, &workerC)
require.NoError(t, err)
require.NotNil(t, scheduledTask, "job without tag should always be scheduled to worker in some tag")
assert.Equal(t, authTask.UUID, scheduledTask.Task.UUID)
scheduledTask, err = db.ScheduleTask(ctx, &workerNC)
require.NoError(t, err)
require.NotNil(t, scheduledTask, "job without tag should always be scheduled to worker without tag")
assert.Equal(t, authTask.UUID, scheduledTask.Task.UUID)
}
func TestBlocklisted(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
att1 := authorTestTask("1 blocked task", "blender")
att2 := authorTestTask("2 expected task", "ffmpeg")
atj := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render",
att1, att2)
job := constructTestJob(ctx, t, db, atj)
// Mimick that this worker was already blocked for 'blender' tasks of this job.
err := db.AddWorkerToJobBlocklist(ctx, job.ID, w.ID, "blender")
require.NoError(t, err)
// This should assign the 2nd scheduledTask.
scheduledTask, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
require.NotNil(t, scheduledTask)
assert.Equal(t, att2.Name, scheduledTask.Task.Name, "the second task should have been chosen")
}
// To test: blocklists
// To test: variable replacement
func constructTestJob(
ctx context.Context, t *testing.T, db *DB, authoredJob job_compilers.AuthoredJob,
) *Job {
err := db.StoreAuthoredJob(ctx, authoredJob)
require.NoError(t, err, "storing authored job")
dbJob, err := db.FetchJob(ctx, authoredJob.JobID)
require.NoError(t, err, "fetching authored job")
// Queue the job.
dbJob.Status = api.JobStatusQueued
err = db.SaveJobStatus(ctx, dbJob)
require.NoError(t, err, "queueing job")
return dbJob
}
func authorTestJob(jobUUID, jobType string, tasks ...job_compilers.AuthoredTask) job_compilers.AuthoredJob {
job := job_compilers.AuthoredJob{
JobID: jobUUID,
Name: "test job",
JobType: jobType,
Priority: 50,
Status: api.JobStatusQueued,
Created: time.Now(),
Tasks: tasks,
}
return job
}
func authorTestTask(name, taskType string, dependencies ...*job_compilers.AuthoredTask) job_compilers.AuthoredTask {
task := job_compilers.AuthoredTask{
Name: name,
Type: taskType,
UUID: uuid.New(),
Priority: 50,
Commands: make([]job_compilers.AuthoredCommand, 0),
Dependencies: dependencies,
}
return task
}
func setTaskStatus(t *testing.T, db *DB, taskUUID string, status api.TaskStatus) {
ctx := context.Background()
taskJobWorker, err := db.FetchTask(ctx, taskUUID)
require.NoError(t, err)
taskJobWorker.Task.Status = status
require.NoError(t, db.SaveTask(ctx, &taskJobWorker.Task))
}
func linuxWorker(t *testing.T, db *DB, updaters ...func(worker *Worker)) Worker {
w := Worker{
UUID: "b13b8322-3e96-41c3-940a-3d581008a5f8",
Name: "Linux",
Platform: "linux",
Status: api.WorkerStatusAwake,
SupportedTaskTypes: "blender,ffmpeg,file-management,misc",
}
for _, updater := range updaters {
updater(&w)
}
saveTestWorker(t, db, &w)
return w
}
func windowsWorker(t *testing.T, db *DB) Worker {
w := Worker{
UUID: "4f6ee45e-c8fc-4c31-bf5c-922f2415deb1",
Name: "Windows",
Platform: "windows",
Status: api.WorkerStatusAwake,
SupportedTaskTypes: "blender,ffmpeg,file-management,misc",
}
saveTestWorker(t, db, &w)
return w
}
func saveTestWorker(t *testing.T, db *DB, worker *Worker) {
params := sqlc.CreateWorkerParams{
CreatedAt: db.now(),
UUID: worker.UUID,
Secret: worker.Secret,
Name: worker.Name,
Address: worker.Address,
Platform: worker.Platform,
Software: worker.Software,
Status: worker.Status,
LastSeenAt: nullTimeToUTC(worker.LastSeenAt),
StatusRequested: worker.StatusRequested,
LazyStatusRequest: worker.LazyStatusRequest,
SupportedTaskTypes: worker.SupportedTaskTypes,
DeletedAt: sql.NullTime(worker.DeletedAt),
CanRestart: worker.CanRestart,
}
queries := db.queries()
id, err := queries.CreateWorker(context.TODO(), params)
require.NoError(t, err, "cannot save worker %q", worker.Name)
worker.ID = id
worker.CreatedAt = params.CreatedAt
}