package persistence // SPDX-License-Identifier: GPL-3.0-or-later import ( "context" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "projects.blender.org/studio/flamenco/internal/manager/job_compilers" "projects.blender.org/studio/flamenco/internal/uuid" "projects.blender.org/studio/flamenco/pkg/api" ) const schedulerTestTimeout = 100 * time.Millisecond const schedulerTestTimeoutlong = 5000 * time.Millisecond func TestNoTasks(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() w := linuxWorker(t, db) task, err := db.ScheduleTask(ctx, &w) assert.Nil(t, task) require.NoError(t, err) } func TestOneJobOneTask(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() w := linuxWorker(t, db) authTask := authorTestTask("the task", "blender") atj := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask) job := constructTestJob(ctx, t, db, atj) task, err := db.ScheduleTask(ctx, &w) require.NoError(t, err) // Check the returned task. if task == nil { t.Fatal("task is nil") } assert.Equal(t, job.ID, task.JobID) if task.WorkerID == nil { t.Fatal("no worker assigned to task") } assert.Equal(t, w.ID, *task.WorkerID, "task must be assigned to the requesting worker") // Check the task in the database. now := db.gormDB.NowFunc() dbTask, err := db.FetchTask(context.Background(), authTask.UUID) require.NoError(t, err) if dbTask == nil { t.Fatal("task cannot be fetched from database") } if dbTask.WorkerID == nil { t.Fatal("no worker assigned to task") } assert.Equal(t, w.ID, *dbTask.WorkerID, "task must be assigned to the requesting worker") assert.WithinDuration(t, now, dbTask.LastTouchedAt, time.Second, "task must be 'touched' by the worker after scheduling") } func TestOneJobThreeTasksByPrio(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() w := linuxWorker(t, db) att1 := authorTestTask("1 low-prio task", "blender") att2 := authorTestTask("2 high-prio task", "ffmpeg") att2.Priority = 100 att3 := authorTestTask("3 low-prio task", "blender") atj := authorTestJob( "1295757b-e668-4c49-8b89-f73db8270e42", "simple-blender-render", att1, att2, att3) job := constructTestJob(ctx, t, db, atj) task, err := db.ScheduleTask(ctx, &w) require.NoError(t, err) if task == nil { t.Fatal("task is nil") } assert.Equal(t, job.ID, task.JobID) if task.Job == nil { t.Fatal("task.Job is nil") } assert.Equal(t, att2.Name, task.Name, "the high-prio task should have been chosen") } func TestOneJobThreeTasksByDependencies(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() w := linuxWorker(t, db) att1 := authorTestTask("1 low-prio task", "blender") att2 := authorTestTask("2 high-prio task", "ffmpeg") att2.Priority = 100 att2.Dependencies = []*job_compilers.AuthoredTask{&att1} att3 := authorTestTask("3 low-prio task", "blender") atj := authorTestJob( "1295757b-e668-4c49-8b89-f73db8270e42", "simple-blender-render", att1, att2, att3) job := constructTestJob(ctx, t, db, atj) task, err := db.ScheduleTask(ctx, &w) require.NoError(t, err) if task == nil { t.Fatal("task is nil") } assert.Equal(t, job.ID, task.JobID) assert.Equal(t, att1.Name, task.Name, "the first task should have been chosen") } func TestTwoJobsThreeTasks(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() w := linuxWorker(t, db) att1_1 := authorTestTask("1.1 low-prio task", "blender") att1_2 := authorTestTask("1.2 high-prio task", "ffmpeg") att1_2.Priority = 100 att1_2.Dependencies = []*job_compilers.AuthoredTask{&att1_1} att1_3 := authorTestTask("1.3 low-prio task", "blender") atj1 := authorTestJob( "1295757b-e668-4c49-8b89-f73db8270e42", "simple-blender-render", att1_1, att1_2, att1_3) att2_1 := authorTestTask("2.1 low-prio task", "blender") att2_2 := authorTestTask("2.2 high-prio task", "ffmpeg") att2_2.Priority = 100 att2_2.Dependencies = []*job_compilers.AuthoredTask{&att2_1} att2_3 := authorTestTask("2.3 highest-prio task", "blender") att2_3.Priority = 150 atj2 := authorTestJob( "7180617b-da70-411c-8b38-b972ab2bae8d", "simple-blender-render", att2_1, att2_2, att2_3) atj2.Priority = 100 // Increase priority over job 1. constructTestJob(ctx, t, db, atj1) job2 := constructTestJob(ctx, t, db, atj2) task, err := db.ScheduleTask(ctx, &w) require.NoError(t, err) if task == nil { t.Fatal("task is nil") } assert.Equal(t, job2.ID, task.JobID) assert.Equal(t, att2_3.Name, task.Name, "the 3rd task of the 2nd job should have been chosen") } func TestSomeButNotAllDependenciesCompleted(t *testing.T) { // There was a bug in the task scheduler query, where it would schedule a task // if any of its dependencies was completed (instead of all dependencies). // This test reproduces that problematic scenario. ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() att1 := authorTestTask("1.1 completed task", "blender") att2 := authorTestTask("1.2 queued task of unsupported type", "unsupported") att3 := authorTestTask("1.3 queued task with queued dependency", "ffmpeg") att3.Dependencies = []*job_compilers.AuthoredTask{&att1, &att2} atj := authorTestJob("1295757b-e668-4c49-8b89-f73db8270e42", "simple-blender-render", att1, att2, att3) constructTestJob(ctx, t, db, atj) // Complete the first task. The other two are still `queued`. setTaskStatus(t, db, att1.UUID, api.TaskStatusCompleted) w := linuxWorker(t, db) task, err := db.ScheduleTask(ctx, &w) require.NoError(t, err) if task != nil { t.Fatalf("there should not be any task assigned, but received %q", task.Name) } } func TestAlreadyAssigned(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() w := linuxWorker(t, db) att1 := authorTestTask("1 low-prio task", "blender") att2 := authorTestTask("2 high-prio task", "ffmpeg") att2.Priority = 100 att3 := authorTestTask("3 low-prio task", "blender") atj := authorTestJob( "1295757b-e668-4c49-8b89-f73db8270e42", "simple-blender-render", att1, att2, att3) constructTestJob(ctx, t, db, atj) // Assign the task to the worker, and mark it as Active. // This should make it get returned by the scheduler, even when there is // another, higher-prio task to be done. dbTask3, err := db.FetchTask(ctx, att3.UUID) require.NoError(t, err) dbTask3.WorkerID = &w.ID dbTask3.Status = api.TaskStatusActive err = db.SaveTask(ctx, dbTask3) require.NoError(t, err) task, err := db.ScheduleTask(ctx, &w) require.NoError(t, err) if task == nil { t.Fatal("task is nil") } assert.Equal(t, att3.Name, task.Name, "the already-assigned task should have been chosen") } func TestAssignedToOtherWorker(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() w := linuxWorker(t, db) w2 := windowsWorker(t, db) att1 := authorTestTask("1 low-prio task", "blender") att2 := authorTestTask("2 high-prio task", "ffmpeg") att2.Priority = 100 atj := authorTestJob( "1295757b-e668-4c49-8b89-f73db8270e42", "simple-blender-render", att1, att2) constructTestJob(ctx, t, db, atj) // Assign the high-prio task to the other worker. Because the task is queued, // it shouldn't matter which worker it's assigned to. dbTask2, err := db.FetchTask(ctx, att2.UUID) require.NoError(t, err) dbTask2.WorkerID = &w2.ID dbTask2.Status = api.TaskStatusQueued err = db.SaveTask(ctx, dbTask2) require.NoError(t, err) task, err := db.ScheduleTask(ctx, &w) require.NoError(t, err) if task == nil { t.Fatal("task is nil") } assert.Equal(t, att2.Name, task.Name, "the high-prio task should have been chosen") assert.Equal(t, *task.WorkerID, w.ID, "the task should now be assigned to the worker it was scheduled for") } func TestPreviouslyFailed(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() w := linuxWorker(t, db) att1 := authorTestTask("1 failed task", "blender") att2 := authorTestTask("2 expected task", "blender") atj := authorTestJob( "1295757b-e668-4c49-8b89-f73db8270e42", "simple-blender-render", att1, att2) job := constructTestJob(ctx, t, db, atj) // Mimick that this worker already failed the first task. tasks, err := db.FetchTasksOfJob(ctx, job) require.NoError(t, err) numFailed, err := db.AddWorkerToTaskFailedList(ctx, tasks[0], &w) require.NoError(t, err) assert.Equal(t, 1, numFailed) // This should assign the 2nd task. task, err := db.ScheduleTask(ctx, &w) require.NoError(t, err) if task == nil { t.Fatal("task is nil") } assert.Equal(t, att2.Name, task.Name, "the second task should have been chosen") } func TestWorkerTagJobWithTag(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() // Create worker tags: tag1 := WorkerTag{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Tag 1"} tag2 := WorkerTag{UUID: "2f71dba1-cf92-4752-8386-f5926affabd5", Name: "Tag 2"} require.NoError(t, db.CreateWorkerTag(ctx, &tag1)) require.NoError(t, db.CreateWorkerTag(ctx, &tag2)) // Create a worker in tag1: workerC := linuxWorker(t, db, func(w *Worker) { w.Tags = []*WorkerTag{&tag1} }) // Create a worker without tag: workerNC := linuxWorker(t, db, func(w *Worker) { w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e" w.Tags = nil }) { // Test job with different tag: authTask := authorTestTask("the task", "blender") job := authorTestJob("499cf0f8-e83d-4cb1-837a-df94789d07db", "simple-blender-render", authTask) job.WorkerTagUUID = tag2.UUID constructTestJob(ctx, t, db, job) task, err := db.ScheduleTask(ctx, &workerC) require.NoError(t, err) assert.Nil(t, task, "job with different tag should not be scheduled") } { // Test job with matching tag: authTask := authorTestTask("the task", "blender") job := authorTestJob("5d4c2321-0bb7-4c13-a9dd-32a2c0cd156e", "simple-blender-render", authTask) job.WorkerTagUUID = tag1.UUID constructTestJob(ctx, t, db, job) task, err := db.ScheduleTask(ctx, &workerC) require.NoError(t, err) require.NotNil(t, task, "job with matching tag should be scheduled") assert.Equal(t, authTask.UUID, task.UUID) task, err = db.ScheduleTask(ctx, &workerNC) require.NoError(t, err) assert.Nil(t, task, "job with tag should not be scheduled for worker without tag") } } func TestWorkerTagJobWithoutTag(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() // Create worker tag: tag1 := WorkerTag{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Tag 1"} require.NoError(t, db.CreateWorkerTag(ctx, &tag1)) // Create a worker in tag1: workerC := linuxWorker(t, db, func(w *Worker) { w.Tags = []*WorkerTag{&tag1} }) // Create a worker without tag: workerNC := linuxWorker(t, db, func(w *Worker) { w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e" w.Tags = nil }) // Test tag-less job: authTask := authorTestTask("the task", "blender") job := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask) constructTestJob(ctx, t, db, job) task, err := db.ScheduleTask(ctx, &workerC) require.NoError(t, err) require.NotNil(t, task, "job without tag should always be scheduled to worker in some tag") assert.Equal(t, authTask.UUID, task.UUID) task, err = db.ScheduleTask(ctx, &workerNC) require.NoError(t, err) require.NotNil(t, task, "job without tag should always be scheduled to worker without tag") assert.Equal(t, authTask.UUID, task.UUID) } func TestBlocklisted(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() w := linuxWorker(t, db) att1 := authorTestTask("1 blocked task", "blender") att2 := authorTestTask("2 expected task", "ffmpeg") atj := authorTestJob( "1295757b-e668-4c49-8b89-f73db8270e42", "simple-blender-render", att1, att2) job := constructTestJob(ctx, t, db, atj) // Mimick that this worker was already blocked for 'blender' tasks of this job. err := db.AddWorkerToJobBlocklist(ctx, job, &w, "blender") require.NoError(t, err) // This should assign the 2nd task. task, err := db.ScheduleTask(ctx, &w) require.NoError(t, err) if task == nil { t.Fatal("task is nil") } assert.Equal(t, att2.Name, task.Name, "the second task should have been chosen") } // To test: blocklists // To test: variable replacement func constructTestJob( ctx context.Context, t *testing.T, db *DB, authoredJob job_compilers.AuthoredJob, ) *Job { err := db.StoreAuthoredJob(ctx, authoredJob) if err != nil { t.Fatalf("storing authored job: %v", err) } dbJob, err := db.FetchJob(ctx, authoredJob.JobID) if err != nil { t.Fatalf("fetching authored job: %v", err) } // Queue the job. dbJob.Status = api.JobStatusQueued err = db.SaveJobStatus(ctx, dbJob) if err != nil { t.Fatalf("queueing job: %v", err) } return dbJob } func authorTestJob(jobUUID, jobType string, tasks ...job_compilers.AuthoredTask) job_compilers.AuthoredJob { job := job_compilers.AuthoredJob{ JobID: jobUUID, Name: "test job", JobType: jobType, Priority: 50, Status: api.JobStatusQueued, Created: time.Now(), Tasks: tasks, } return job } func authorTestTask(name, taskType string, dependencies ...*job_compilers.AuthoredTask) job_compilers.AuthoredTask { task := job_compilers.AuthoredTask{ Name: name, Type: taskType, UUID: uuid.New(), Priority: 50, Commands: make([]job_compilers.AuthoredCommand, 0), Dependencies: dependencies, } return task } func setTaskStatus(t *testing.T, db *DB, taskUUID string, status api.TaskStatus) { ctx := context.Background() task, err := db.FetchTask(ctx, taskUUID) if err != nil { t.Fatal(err) } task.Status = status err = db.SaveTask(ctx, task) if err != nil { t.Fatal(err) } } func linuxWorker(t *testing.T, db *DB, updaters ...func(worker *Worker)) Worker { w := Worker{ UUID: "b13b8322-3e96-41c3-940a-3d581008a5f8", Name: "Linux", Platform: "linux", Status: api.WorkerStatusAwake, SupportedTaskTypes: "blender,ffmpeg,file-management,misc", } for _, updater := range updaters { updater(&w) } err := db.gormDB.Save(&w).Error if err != nil { t.Logf("cannot save Linux worker: %v", err) t.FailNow() } return w } func windowsWorker(t *testing.T, db *DB) Worker { w := Worker{ UUID: "4f6ee45e-c8fc-4c31-bf5c-922f2415deb1", Name: "Windows", Platform: "windows", Status: api.WorkerStatusAwake, SupportedTaskTypes: "blender,ffmpeg,file-management,misc", } err := db.gormDB.Save(&w).Error if err != nil { t.Logf("cannot save Windows worker: %v", err) t.FailNow() } return w }