
The task scheduler was handing out tasks for which any dependency (instead of all dependencies) were completed.
272 lines
7.4 KiB
Go
272 lines
7.4 KiB
Go
package persistence
|
|
|
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/stretchr/testify/assert"
|
|
|
|
"git.blender.org/flamenco/internal/manager/job_compilers"
|
|
"git.blender.org/flamenco/pkg/api"
|
|
)
|
|
|
|
const schedulerTestTimeout = 100 * time.Millisecond
|
|
|
|
func TestNoTasks(t *testing.T) {
|
|
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
|
|
defer cancel()
|
|
|
|
w := linuxWorker(t, db)
|
|
|
|
task, err := db.ScheduleTask(ctx, &w)
|
|
assert.Nil(t, task)
|
|
assert.NoError(t, err)
|
|
}
|
|
|
|
func TestOneJobOneTask(t *testing.T) {
|
|
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
|
|
defer cancel()
|
|
|
|
w := linuxWorker(t, db)
|
|
|
|
authTask := authorTestTask("the task", "blender")
|
|
atj := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask)
|
|
job := constructTestJob(ctx, t, db, atj)
|
|
|
|
task, err := db.ScheduleTask(ctx, &w)
|
|
assert.NoError(t, err)
|
|
|
|
// Check the returned task.
|
|
if task == nil {
|
|
t.Fatal("task is nil")
|
|
}
|
|
assert.Equal(t, job.ID, task.JobID)
|
|
if task.WorkerID == nil {
|
|
t.Fatal("no worker assigned to task")
|
|
}
|
|
assert.Equal(t, w.ID, *task.WorkerID, "task must be assigned to the requesting worker")
|
|
|
|
// Check the task in the database.
|
|
dbTask, err := db.FetchTask(context.Background(), authTask.UUID)
|
|
assert.NoError(t, err)
|
|
if dbTask == nil {
|
|
t.Fatal("task cannot be fetched from database")
|
|
}
|
|
if dbTask.WorkerID == nil {
|
|
t.Fatal("no worker assigned to task")
|
|
}
|
|
assert.Equal(t, w.ID, *dbTask.WorkerID, "task must be assigned to the requesting worker")
|
|
}
|
|
|
|
func TestOneJobThreeTasksByPrio(t *testing.T) {
|
|
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
|
|
defer cancel()
|
|
|
|
w := linuxWorker(t, db)
|
|
|
|
att1 := authorTestTask("1 low-prio task", "blender")
|
|
att2 := authorTestTask("2 high-prio task", "ffmpeg")
|
|
att2.Priority = 100
|
|
att3 := authorTestTask("3 low-prio task", "blender")
|
|
atj := authorTestJob(
|
|
"1295757b-e668-4c49-8b89-f73db8270e42",
|
|
"simple-blender-render",
|
|
att1, att2, att3)
|
|
|
|
job := constructTestJob(ctx, t, db, atj)
|
|
|
|
task, err := db.ScheduleTask(ctx, &w)
|
|
assert.NoError(t, err)
|
|
if task == nil {
|
|
t.Fatal("task is nil")
|
|
}
|
|
|
|
assert.Equal(t, job.ID, task.JobID)
|
|
if task.Job == nil {
|
|
t.Fatal("task.Job is nil")
|
|
}
|
|
|
|
assert.Equal(t, att2.Name, task.Name, "the high-prio task should have been chosen")
|
|
}
|
|
|
|
func TestOneJobThreeTasksByDependencies(t *testing.T) {
|
|
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
|
|
defer cancel()
|
|
|
|
w := linuxWorker(t, db)
|
|
|
|
att1 := authorTestTask("1 low-prio task", "blender")
|
|
att2 := authorTestTask("2 high-prio task", "ffmpeg")
|
|
att2.Priority = 100
|
|
att2.Dependencies = []*job_compilers.AuthoredTask{&att1}
|
|
att3 := authorTestTask("3 low-prio task", "blender")
|
|
atj := authorTestJob(
|
|
"1295757b-e668-4c49-8b89-f73db8270e42",
|
|
"simple-blender-render",
|
|
att1, att2, att3)
|
|
job := constructTestJob(ctx, t, db, atj)
|
|
|
|
task, err := db.ScheduleTask(ctx, &w)
|
|
assert.NoError(t, err)
|
|
if task == nil {
|
|
t.Fatal("task is nil")
|
|
}
|
|
assert.Equal(t, job.ID, task.JobID)
|
|
assert.Equal(t, att1.Name, task.Name, "the first task should have been chosen")
|
|
}
|
|
|
|
func TestTwoJobsThreeTasks(t *testing.T) {
|
|
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
|
|
defer cancel()
|
|
|
|
w := linuxWorker(t, db)
|
|
|
|
att1_1 := authorTestTask("1.1 low-prio task", "blender")
|
|
att1_2 := authorTestTask("1.2 high-prio task", "ffmpeg")
|
|
att1_2.Priority = 100
|
|
att1_2.Dependencies = []*job_compilers.AuthoredTask{&att1_1}
|
|
att1_3 := authorTestTask("1.3 low-prio task", "blender")
|
|
atj1 := authorTestJob(
|
|
"1295757b-e668-4c49-8b89-f73db8270e42",
|
|
"simple-blender-render",
|
|
att1_1, att1_2, att1_3)
|
|
|
|
att2_1 := authorTestTask("2.1 low-prio task", "blender")
|
|
att2_2 := authorTestTask("2.2 high-prio task", "ffmpeg")
|
|
att2_2.Priority = 100
|
|
att2_2.Dependencies = []*job_compilers.AuthoredTask{&att2_1}
|
|
att2_3 := authorTestTask("2.3 highest-prio task", "blender")
|
|
att2_3.Priority = 150
|
|
atj2 := authorTestJob(
|
|
"7180617b-da70-411c-8b38-b972ab2bae8d",
|
|
"simple-blender-render",
|
|
att2_1, att2_2, att2_3)
|
|
atj2.Priority = 100 // Increase priority over job 1.
|
|
|
|
constructTestJob(ctx, t, db, atj1)
|
|
job2 := constructTestJob(ctx, t, db, atj2)
|
|
|
|
task, err := db.ScheduleTask(ctx, &w)
|
|
assert.NoError(t, err)
|
|
if task == nil {
|
|
t.Fatal("task is nil")
|
|
}
|
|
assert.Equal(t, job2.ID, task.JobID)
|
|
assert.Equal(t, att2_3.Name, task.Name, "the 3rd task of the 2nd job should have been chosen")
|
|
}
|
|
|
|
func TestSomeButNotAllDependenciesCompleted(t *testing.T) {
|
|
// There was a bug in the task scheduler query, where it would schedule a task
|
|
// if any of its dependencies was completed (instead of all dependencies).
|
|
// This test reproduces that problematic scenario.
|
|
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
|
|
defer cancel()
|
|
|
|
att1 := authorTestTask("1.1 completed task", "blender")
|
|
att2 := authorTestTask("1.2 queued task of unsupported type", "unsupported")
|
|
att3 := authorTestTask("1.3 queued task with queued dependency", "ffmpeg")
|
|
att3.Dependencies = []*job_compilers.AuthoredTask{&att1, &att2}
|
|
|
|
atj := authorTestJob("1295757b-e668-4c49-8b89-f73db8270e42", "simple-blender-render", att1, att2, att3)
|
|
constructTestJob(ctx, t, db, atj)
|
|
|
|
// Complete the first task. The other two are still `queued`.
|
|
setTaskStatus(t, db, att1.UUID, api.TaskStatusCompleted)
|
|
|
|
w := linuxWorker(t, db)
|
|
task, err := db.ScheduleTask(ctx, &w)
|
|
assert.NoError(t, err)
|
|
if task != nil {
|
|
t.Fatalf("there should not be any task assigned, but received %q", task.Name)
|
|
}
|
|
}
|
|
|
|
// To test: blacklists
|
|
|
|
// To test: variable replacement
|
|
|
|
func constructTestJob(
|
|
ctx context.Context, t *testing.T, db *DB, authoredJob job_compilers.AuthoredJob,
|
|
) *Job {
|
|
err := db.StoreAuthoredJob(ctx, authoredJob)
|
|
if err != nil {
|
|
t.Fatalf("storing authored job: %v", err)
|
|
}
|
|
|
|
dbJob, err := db.FetchJob(ctx, authoredJob.JobID)
|
|
if err != nil {
|
|
t.Fatalf("fetching authored job: %v", err)
|
|
}
|
|
|
|
// Queue the job.
|
|
dbJob.Status = api.JobStatusQueued
|
|
err = db.SaveJobStatus(ctx, dbJob)
|
|
if err != nil {
|
|
t.Fatalf("queueing job: %v", err)
|
|
}
|
|
|
|
return dbJob
|
|
}
|
|
|
|
func authorTestJob(jobUUID, jobType string, tasks ...job_compilers.AuthoredTask) job_compilers.AuthoredJob {
|
|
job := job_compilers.AuthoredJob{
|
|
JobID: jobUUID,
|
|
Name: "test job",
|
|
JobType: jobType,
|
|
Priority: 50,
|
|
Status: api.JobStatusQueued,
|
|
Created: time.Now(),
|
|
Tasks: tasks,
|
|
}
|
|
return job
|
|
}
|
|
|
|
func authorTestTask(name, taskType string, dependencies ...*job_compilers.AuthoredTask) job_compilers.AuthoredTask {
|
|
task := job_compilers.AuthoredTask{
|
|
Name: name,
|
|
Type: taskType,
|
|
UUID: uuid.NewString(),
|
|
Priority: 50,
|
|
Commands: make([]job_compilers.AuthoredCommand, 0),
|
|
Dependencies: dependencies,
|
|
}
|
|
return task
|
|
}
|
|
|
|
func setTaskStatus(t *testing.T, db *DB, taskUUID string, status api.TaskStatus) {
|
|
ctx := context.Background()
|
|
task, err := db.FetchTask(ctx, taskUUID)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
task.Status = status
|
|
|
|
err = db.SaveTask(ctx, task)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
func linuxWorker(t *testing.T, db *DB) Worker {
|
|
w := Worker{
|
|
UUID: "b13b8322-3e96-41c3-940a-3d581008a5f8",
|
|
Name: "Linux",
|
|
Platform: "linux",
|
|
Status: api.WorkerStatusAwake,
|
|
SupportedTaskTypes: "blender,ffmpeg,file-management,misc",
|
|
}
|
|
|
|
err := db.gormDB.Save(&w).Error
|
|
if err != nil {
|
|
t.Logf("cannot save Linux worker: %v", err)
|
|
t.FailNow()
|
|
}
|
|
|
|
return w
|
|
}
|