flamenco/internal/manager/persistence/task_scheduler_test.go
Anish Bharadwaj (he) 0502498dfa Fix #104201: Task Limit error in Flamenco Manager
Insert tasks in batches so that the required SQL query stays within the limits of SQLite.

No changes to the API, only to the persistence layer.

Reviewed-on: https://projects.blender.org/studio/flamenco/pulls/104205
2023-04-24 15:10:59 +02:00

511 lines
15 KiB
Go

package persistence
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"git.blender.org/flamenco/internal/manager/job_compilers"
"git.blender.org/flamenco/internal/uuid"
"git.blender.org/flamenco/pkg/api"
)
const schedulerTestTimeout = 100 * time.Millisecond
const schedulerTestTimeoutlong = 5000 * time.Millisecond
func TestNoTasks(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
task, err := db.ScheduleTask(ctx, &w)
assert.Nil(t, task)
assert.NoError(t, err)
}
func TestOneJobOneTask(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
authTask := authorTestTask("the task", "blender")
atj := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask)
job := constructTestJob(ctx, t, db, atj)
task, err := db.ScheduleTask(ctx, &w)
assert.NoError(t, err)
// Check the returned task.
if task == nil {
t.Fatal("task is nil")
}
assert.Equal(t, job.ID, task.JobID)
if task.WorkerID == nil {
t.Fatal("no worker assigned to task")
}
assert.Equal(t, w.ID, *task.WorkerID, "task must be assigned to the requesting worker")
// Check the task in the database.
now := db.gormDB.NowFunc()
dbTask, err := db.FetchTask(context.Background(), authTask.UUID)
assert.NoError(t, err)
if dbTask == nil {
t.Fatal("task cannot be fetched from database")
}
if dbTask.WorkerID == nil {
t.Fatal("no worker assigned to task")
}
assert.Equal(t, w.ID, *dbTask.WorkerID, "task must be assigned to the requesting worker")
assert.WithinDuration(t, now, dbTask.LastTouchedAt, time.Second, "task must be 'touched' by the worker after scheduling")
}
func TestOneJobThreeTasksByPrio(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
att1 := authorTestTask("1 low-prio task", "blender")
att2 := authorTestTask("2 high-prio task", "ffmpeg")
att2.Priority = 100
att3 := authorTestTask("3 low-prio task", "blender")
atj := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render",
att1, att2, att3)
job := constructTestJob(ctx, t, db, atj)
task, err := db.ScheduleTask(ctx, &w)
assert.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
assert.Equal(t, job.ID, task.JobID)
if task.Job == nil {
t.Fatal("task.Job is nil")
}
assert.Equal(t, att2.Name, task.Name, "the high-prio task should have been chosen")
}
func TestOneJobThreeTasksByDependencies(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
att1 := authorTestTask("1 low-prio task", "blender")
att2 := authorTestTask("2 high-prio task", "ffmpeg")
att2.Priority = 100
att2.Dependencies = []*job_compilers.AuthoredTask{&att1}
att3 := authorTestTask("3 low-prio task", "blender")
atj := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render",
att1, att2, att3)
job := constructTestJob(ctx, t, db, atj)
task, err := db.ScheduleTask(ctx, &w)
assert.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
assert.Equal(t, job.ID, task.JobID)
assert.Equal(t, att1.Name, task.Name, "the first task should have been chosen")
}
func TestTwoJobsThreeTasks(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
att1_1 := authorTestTask("1.1 low-prio task", "blender")
att1_2 := authorTestTask("1.2 high-prio task", "ffmpeg")
att1_2.Priority = 100
att1_2.Dependencies = []*job_compilers.AuthoredTask{&att1_1}
att1_3 := authorTestTask("1.3 low-prio task", "blender")
atj1 := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render",
att1_1, att1_2, att1_3)
att2_1 := authorTestTask("2.1 low-prio task", "blender")
att2_2 := authorTestTask("2.2 high-prio task", "ffmpeg")
att2_2.Priority = 100
att2_2.Dependencies = []*job_compilers.AuthoredTask{&att2_1}
att2_3 := authorTestTask("2.3 highest-prio task", "blender")
att2_3.Priority = 150
atj2 := authorTestJob(
"7180617b-da70-411c-8b38-b972ab2bae8d",
"simple-blender-render",
att2_1, att2_2, att2_3)
atj2.Priority = 100 // Increase priority over job 1.
constructTestJob(ctx, t, db, atj1)
job2 := constructTestJob(ctx, t, db, atj2)
task, err := db.ScheduleTask(ctx, &w)
assert.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
assert.Equal(t, job2.ID, task.JobID)
assert.Equal(t, att2_3.Name, task.Name, "the 3rd task of the 2nd job should have been chosen")
}
func TestSomeButNotAllDependenciesCompleted(t *testing.T) {
// There was a bug in the task scheduler query, where it would schedule a task
// if any of its dependencies was completed (instead of all dependencies).
// This test reproduces that problematic scenario.
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
defer cancel()
att1 := authorTestTask("1.1 completed task", "blender")
att2 := authorTestTask("1.2 queued task of unsupported type", "unsupported")
att3 := authorTestTask("1.3 queued task with queued dependency", "ffmpeg")
att3.Dependencies = []*job_compilers.AuthoredTask{&att1, &att2}
atj := authorTestJob("1295757b-e668-4c49-8b89-f73db8270e42", "simple-blender-render", att1, att2, att3)
constructTestJob(ctx, t, db, atj)
// Complete the first task. The other two are still `queued`.
setTaskStatus(t, db, att1.UUID, api.TaskStatusCompleted)
w := linuxWorker(t, db)
task, err := db.ScheduleTask(ctx, &w)
assert.NoError(t, err)
if task != nil {
t.Fatalf("there should not be any task assigned, but received %q", task.Name)
}
}
func TestAlreadyAssigned(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
att1 := authorTestTask("1 low-prio task", "blender")
att2 := authorTestTask("2 high-prio task", "ffmpeg")
att2.Priority = 100
att3 := authorTestTask("3 low-prio task", "blender")
atj := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render",
att1, att2, att3)
constructTestJob(ctx, t, db, atj)
// Assign the task to the worker, and mark it as Active.
// This should make it get returned by the scheduler, even when there is
// another, higher-prio task to be done.
dbTask3, err := db.FetchTask(ctx, att3.UUID)
assert.NoError(t, err)
dbTask3.WorkerID = &w.ID
dbTask3.Status = api.TaskStatusActive
err = db.SaveTask(ctx, dbTask3)
assert.NoError(t, err)
task, err := db.ScheduleTask(ctx, &w)
assert.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
assert.Equal(t, att3.Name, task.Name, "the already-assigned task should have been chosen")
}
func TestAssignedToOtherWorker(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
w2 := windowsWorker(t, db)
att1 := authorTestTask("1 low-prio task", "blender")
att2 := authorTestTask("2 high-prio task", "ffmpeg")
att2.Priority = 100
atj := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render",
att1, att2)
constructTestJob(ctx, t, db, atj)
// Assign the high-prio task to the other worker. Because the task is queued,
// it shouldn't matter which worker it's assigned to.
dbTask2, err := db.FetchTask(ctx, att2.UUID)
assert.NoError(t, err)
dbTask2.WorkerID = &w2.ID
dbTask2.Status = api.TaskStatusQueued
err = db.SaveTask(ctx, dbTask2)
assert.NoError(t, err)
task, err := db.ScheduleTask(ctx, &w)
assert.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
assert.Equal(t, att2.Name, task.Name, "the high-prio task should have been chosen")
assert.Equal(t, *task.WorkerID, w.ID, "the task should now be assigned to the worker it was scheduled for")
}
func TestPreviouslyFailed(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
att1 := authorTestTask("1 failed task", "blender")
att2 := authorTestTask("2 expected task", "blender")
atj := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render",
att1, att2)
job := constructTestJob(ctx, t, db, atj)
// Mimick that this worker already failed the first task.
tasks, err := db.FetchTasksOfJob(ctx, job)
assert.NoError(t, err)
numFailed, err := db.AddWorkerToTaskFailedList(ctx, tasks[0], &w)
assert.NoError(t, err)
assert.Equal(t, 1, numFailed)
// This should assign the 2nd task.
task, err := db.ScheduleTask(ctx, &w)
assert.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
assert.Equal(t, att2.Name, task.Name, "the second task should have been chosen")
}
func TestWorkerClusterJobWithCluster(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
defer cancel()
// Create worker clusters:
cluster1 := WorkerCluster{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Cluster 1"}
cluster2 := WorkerCluster{UUID: "2f71dba1-cf92-4752-8386-f5926affabd5", Name: "Cluster 2"}
require.NoError(t, db.CreateWorkerCluster(ctx, &cluster1))
require.NoError(t, db.CreateWorkerCluster(ctx, &cluster2))
// Create a worker in cluster1:
workerC := linuxWorker(t, db, func(w *Worker) {
w.Clusters = []*WorkerCluster{&cluster1}
})
// Create a worker without cluster:
workerNC := linuxWorker(t, db, func(w *Worker) {
w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e"
w.Clusters = nil
})
{ // Test job with different cluster:
authTask := authorTestTask("the task", "blender")
job := authorTestJob("499cf0f8-e83d-4cb1-837a-df94789d07db", "simple-blender-render", authTask)
job.WorkerClusterUUID = cluster2.UUID
constructTestJob(ctx, t, db, job)
task, err := db.ScheduleTask(ctx, &workerC)
require.NoError(t, err)
assert.Nil(t, task, "job with different cluster should not be scheduled")
}
{ // Test job with matching cluster:
authTask := authorTestTask("the task", "blender")
job := authorTestJob("5d4c2321-0bb7-4c13-a9dd-32a2c0cd156e", "simple-blender-render", authTask)
job.WorkerClusterUUID = cluster1.UUID
constructTestJob(ctx, t, db, job)
task, err := db.ScheduleTask(ctx, &workerC)
require.NoError(t, err)
require.NotNil(t, task, "job with matching cluster should be scheduled")
assert.Equal(t, authTask.UUID, task.UUID)
task, err = db.ScheduleTask(ctx, &workerNC)
require.NoError(t, err)
assert.Nil(t, task, "job with cluster should not be scheduled for worker without cluster")
}
}
func TestWorkerClusterJobWithoutCluster(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
defer cancel()
// Create worker cluster:
cluster1 := WorkerCluster{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Cluster 1"}
require.NoError(t, db.CreateWorkerCluster(ctx, &cluster1))
// Create a worker in cluster1:
workerC := linuxWorker(t, db, func(w *Worker) {
w.Clusters = []*WorkerCluster{&cluster1}
})
// Create a worker without cluster:
workerNC := linuxWorker(t, db, func(w *Worker) {
w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e"
w.Clusters = nil
})
// Test cluster-less job:
authTask := authorTestTask("the task", "blender")
job := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask)
constructTestJob(ctx, t, db, job)
task, err := db.ScheduleTask(ctx, &workerC)
require.NoError(t, err)
require.NotNil(t, task, "job without cluster should always be scheduled to worker in some cluster")
assert.Equal(t, authTask.UUID, task.UUID)
task, err = db.ScheduleTask(ctx, &workerNC)
require.NoError(t, err)
require.NotNil(t, task, "job without cluster should always be scheduled to worker without cluster")
assert.Equal(t, authTask.UUID, task.UUID)
}
func TestBlocklisted(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
att1 := authorTestTask("1 blocked task", "blender")
att2 := authorTestTask("2 expected task", "ffmpeg")
atj := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render",
att1, att2)
job := constructTestJob(ctx, t, db, atj)
// Mimick that this worker was already blocked for 'blender' tasks of this job.
err := db.AddWorkerToJobBlocklist(ctx, job, &w, "blender")
assert.NoError(t, err)
// This should assign the 2nd task.
task, err := db.ScheduleTask(ctx, &w)
assert.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
assert.Equal(t, att2.Name, task.Name, "the second task should have been chosen")
}
// To test: blocklists
// To test: variable replacement
func constructTestJob(
ctx context.Context, t *testing.T, db *DB, authoredJob job_compilers.AuthoredJob,
) *Job {
err := db.StoreAuthoredJob(ctx, authoredJob)
if err != nil {
t.Fatalf("storing authored job: %v", err)
}
dbJob, err := db.FetchJob(ctx, authoredJob.JobID)
if err != nil {
t.Fatalf("fetching authored job: %v", err)
}
// Queue the job.
dbJob.Status = api.JobStatusQueued
err = db.SaveJobStatus(ctx, dbJob)
if err != nil {
t.Fatalf("queueing job: %v", err)
}
return dbJob
}
func authorTestJob(jobUUID, jobType string, tasks ...job_compilers.AuthoredTask) job_compilers.AuthoredJob {
job := job_compilers.AuthoredJob{
JobID: jobUUID,
Name: "test job",
JobType: jobType,
Priority: 50,
Status: api.JobStatusQueued,
Created: time.Now(),
Tasks: tasks,
}
return job
}
func authorTestTask(name, taskType string, dependencies ...*job_compilers.AuthoredTask) job_compilers.AuthoredTask {
task := job_compilers.AuthoredTask{
Name: name,
Type: taskType,
UUID: uuid.New(),
Priority: 50,
Commands: make([]job_compilers.AuthoredCommand, 0),
Dependencies: dependencies,
}
return task
}
func setTaskStatus(t *testing.T, db *DB, taskUUID string, status api.TaskStatus) {
ctx := context.Background()
task, err := db.FetchTask(ctx, taskUUID)
if err != nil {
t.Fatal(err)
}
task.Status = status
err = db.SaveTask(ctx, task)
if err != nil {
t.Fatal(err)
}
}
func linuxWorker(t *testing.T, db *DB, updaters ...func(worker *Worker)) Worker {
w := Worker{
UUID: "b13b8322-3e96-41c3-940a-3d581008a5f8",
Name: "Linux",
Platform: "linux",
Status: api.WorkerStatusAwake,
SupportedTaskTypes: "blender,ffmpeg,file-management,misc",
}
for _, updater := range updaters {
updater(&w)
}
err := db.gormDB.Save(&w).Error
if err != nil {
t.Logf("cannot save Linux worker: %v", err)
t.FailNow()
}
return w
}
func windowsWorker(t *testing.T, db *DB) Worker {
w := Worker{
UUID: "4f6ee45e-c8fc-4c31-bf5c-922f2415deb1",
Name: "Windows",
Platform: "windows",
Status: api.WorkerStatusAwake,
SupportedTaskTypes: "blender,ffmpeg,file-management,misc",
}
err := db.gormDB.Save(&w).Error
if err != nil {
t.Logf("cannot save Windows worker: %v", err)
t.FailNow()
}
return w
}