diff --git a/internal/manager/persistence/task_scheduler.go b/internal/manager/persistence/task_scheduler.go index 261c9597..39c8c391 100644 --- a/internal/manager/persistence/task_scheduler.go +++ b/internal/manager/persistence/task_scheduler.go @@ -65,8 +65,8 @@ func (db *DB) findTaskForWorker(w *Worker) (*Task, error) { Where("tasks.status in ?", schedulableTaskStatuses). // Schedulable task statuses Where("tdeps.status in ? or tdeps.status is NULL", completedTaskStatuses). // Dependencies completed Where("jobs.status in ?", schedulableJobStatuses). // Schedulable job statuses - // TODO: Supported task types - // TODO: assigned to this worker or not assigned at all + Where("tasks.type in ?", w.TaskTypes()). // Supported task types + Where("tasks.worker_id = ? or tasks.worker_id is NULL", w.ID). // assigned to this worker or not assigned at all // TODO: Non-blacklisted Order("jobs.priority desc"). // Highest job priority Order("priority desc"). // Highest task priority diff --git a/internal/manager/persistence/task_scheduler_test.go b/internal/manager/persistence/task_scheduler_test.go index 2158304d..2ee6af64 100644 --- a/internal/manager/persistence/task_scheduler_test.go +++ b/internal/manager/persistence/task_scheduler_test.go @@ -44,7 +44,7 @@ func TestOneJobOneTask(t *testing.T) { db := CreateTestDB(t) w := linuxWorker(t, db) - authTask := authorTestTask("the task", "blender-render") + authTask := authorTestTask("the task", "blender") atj := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask) job := constructTestJob(t, db, atj) @@ -71,10 +71,10 @@ func TestOneJobThreeTasksByPrio(t *testing.T) { db := CreateTestDB(t) w := linuxWorker(t, db) - att1 := authorTestTask("1 low-prio task", "blender-render") - att2 := authorTestTask("2 high-prio task", "render-preview") + att1 := authorTestTask("1 low-prio task", "blender") + att2 := authorTestTask("2 high-prio task", "ffmpeg") att2.Priority = 100 - att3 := authorTestTask("3 low-prio task", "blender-render") + att3 := authorTestTask("3 low-prio task", "blender") atj := authorTestJob( "1295757b-e668-4c49-8b89-f73db8270e42", "simple-blender-render", @@ -100,11 +100,11 @@ func TestOneJobThreeTasksByDependencies(t *testing.T) { db := CreateTestDB(t) w := linuxWorker(t, db) - att1 := authorTestTask("1 low-prio task", "blender-render") - att2 := authorTestTask("2 high-prio task", "render-preview") + att1 := authorTestTask("1 low-prio task", "blender") + att2 := authorTestTask("2 high-prio task", "ffmpeg") att2.Priority = 100 att2.Dependencies = []*job_compilers.AuthoredTask{&att1} - att3 := authorTestTask("3 low-prio task", "blender-render") + att3 := authorTestTask("3 low-prio task", "blender") atj := authorTestJob( "1295757b-e668-4c49-8b89-f73db8270e42", "simple-blender-render", @@ -124,21 +124,21 @@ func TestTwoJobsThreeTasks(t *testing.T) { db := CreateTestDB(t) w := linuxWorker(t, db) - att1_1 := authorTestTask("1.1 low-prio task", "blender-render") - att1_2 := authorTestTask("1.2 high-prio task", "render-preview") + att1_1 := authorTestTask("1.1 low-prio task", "blender") + att1_2 := authorTestTask("1.2 high-prio task", "ffmpeg") att1_2.Priority = 100 att1_2.Dependencies = []*job_compilers.AuthoredTask{&att1_1} - att1_3 := authorTestTask("1.3 low-prio task", "blender-render") + att1_3 := authorTestTask("1.3 low-prio task", "blender") atj1 := authorTestJob( "1295757b-e668-4c49-8b89-f73db8270e42", "simple-blender-render", att1_1, att1_2, att1_3) - att2_1 := authorTestTask("2.1 low-prio task", "blender-render") - att2_2 := authorTestTask("2.2 high-prio task", "render-preview") + att2_1 := authorTestTask("2.1 low-prio task", "blender") + att2_2 := authorTestTask("2.2 high-prio task", "ffmpeg") att2_2.Priority = 100 att2_2.Dependencies = []*job_compilers.AuthoredTask{&att2_1} - att2_3 := authorTestTask("2.3 highest-prio task", "blender-render") + att2_3 := authorTestTask("2.3 highest-prio task", "blender") att2_3.Priority = 150 atj2 := authorTestJob( "7180617b-da70-411c-8b38-b972ab2bae8d", @@ -218,7 +218,7 @@ func linuxWorker(t *testing.T, db *DB) Worker { Name: "Linux", Platform: "linux", Status: api.WorkerStatusAwake, - SupportedTaskTypes: "blender,ffmpeg,file-management", + SupportedTaskTypes: "blender,ffmpeg,file-management,misc", } err := db.gormDB.Save(&w).Error diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index 45653440..a3a53215 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -23,6 +23,7 @@ package persistence import ( "context" "fmt" + "strings" "gitlab.com/blender/flamenco-ng-poc/pkg/api" "gorm.io/gorm" @@ -44,6 +45,11 @@ type Worker struct { SupportedTaskTypes string `gorm:"type:varchar(255);not null"` // comma-separated list of task types. } +// TaskTypes returns the worker's supported task types as list of strings. +func (w *Worker) TaskTypes() []string { + return strings.Split(w.SupportedTaskTypes, ",") +} + func (db *DB) CreateWorker(ctx context.Context, w *Worker) error { if err := db.gormDB.Create(w).Error; err != nil { return fmt.Errorf("error creating new worker: %v", err)