Scheduler: filter on supported task types & assigned worker ID

This commit is contained in:
Sybren A. Stüvel 2022-02-14 18:00:43 +01:00
parent 4aafb782ac
commit 0457809641
3 changed files with 22 additions and 16 deletions

View File

@ -65,8 +65,8 @@ func (db *DB) findTaskForWorker(w *Worker) (*Task, error) {
Where("tasks.status in ?", schedulableTaskStatuses). // Schedulable task statuses Where("tasks.status in ?", schedulableTaskStatuses). // Schedulable task statuses
Where("tdeps.status in ? or tdeps.status is NULL", completedTaskStatuses). // Dependencies completed Where("tdeps.status in ? or tdeps.status is NULL", completedTaskStatuses). // Dependencies completed
Where("jobs.status in ?", schedulableJobStatuses). // Schedulable job statuses Where("jobs.status in ?", schedulableJobStatuses). // Schedulable job statuses
// TODO: Supported task types Where("tasks.type in ?", w.TaskTypes()). // Supported task types
// TODO: assigned to this worker or not assigned at all Where("tasks.worker_id = ? or tasks.worker_id is NULL", w.ID). // assigned to this worker or not assigned at all
// TODO: Non-blacklisted // TODO: Non-blacklisted
Order("jobs.priority desc"). // Highest job priority Order("jobs.priority desc"). // Highest job priority
Order("priority desc"). // Highest task priority Order("priority desc"). // Highest task priority

View File

@ -44,7 +44,7 @@ func TestOneJobOneTask(t *testing.T) {
db := CreateTestDB(t) db := CreateTestDB(t)
w := linuxWorker(t, db) w := linuxWorker(t, db)
authTask := authorTestTask("the task", "blender-render") authTask := authorTestTask("the task", "blender")
atj := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask) atj := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask)
job := constructTestJob(t, db, atj) job := constructTestJob(t, db, atj)
@ -71,10 +71,10 @@ func TestOneJobThreeTasksByPrio(t *testing.T) {
db := CreateTestDB(t) db := CreateTestDB(t)
w := linuxWorker(t, db) w := linuxWorker(t, db)
att1 := authorTestTask("1 low-prio task", "blender-render") att1 := authorTestTask("1 low-prio task", "blender")
att2 := authorTestTask("2 high-prio task", "render-preview") att2 := authorTestTask("2 high-prio task", "ffmpeg")
att2.Priority = 100 att2.Priority = 100
att3 := authorTestTask("3 low-prio task", "blender-render") att3 := authorTestTask("3 low-prio task", "blender")
atj := authorTestJob( atj := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42", "1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render", "simple-blender-render",
@ -100,11 +100,11 @@ func TestOneJobThreeTasksByDependencies(t *testing.T) {
db := CreateTestDB(t) db := CreateTestDB(t)
w := linuxWorker(t, db) w := linuxWorker(t, db)
att1 := authorTestTask("1 low-prio task", "blender-render") att1 := authorTestTask("1 low-prio task", "blender")
att2 := authorTestTask("2 high-prio task", "render-preview") att2 := authorTestTask("2 high-prio task", "ffmpeg")
att2.Priority = 100 att2.Priority = 100
att2.Dependencies = []*job_compilers.AuthoredTask{&att1} att2.Dependencies = []*job_compilers.AuthoredTask{&att1}
att3 := authorTestTask("3 low-prio task", "blender-render") att3 := authorTestTask("3 low-prio task", "blender")
atj := authorTestJob( atj := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42", "1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render", "simple-blender-render",
@ -124,21 +124,21 @@ func TestTwoJobsThreeTasks(t *testing.T) {
db := CreateTestDB(t) db := CreateTestDB(t)
w := linuxWorker(t, db) w := linuxWorker(t, db)
att1_1 := authorTestTask("1.1 low-prio task", "blender-render") att1_1 := authorTestTask("1.1 low-prio task", "blender")
att1_2 := authorTestTask("1.2 high-prio task", "render-preview") att1_2 := authorTestTask("1.2 high-prio task", "ffmpeg")
att1_2.Priority = 100 att1_2.Priority = 100
att1_2.Dependencies = []*job_compilers.AuthoredTask{&att1_1} att1_2.Dependencies = []*job_compilers.AuthoredTask{&att1_1}
att1_3 := authorTestTask("1.3 low-prio task", "blender-render") att1_3 := authorTestTask("1.3 low-prio task", "blender")
atj1 := authorTestJob( atj1 := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42", "1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render", "simple-blender-render",
att1_1, att1_2, att1_3) att1_1, att1_2, att1_3)
att2_1 := authorTestTask("2.1 low-prio task", "blender-render") att2_1 := authorTestTask("2.1 low-prio task", "blender")
att2_2 := authorTestTask("2.2 high-prio task", "render-preview") att2_2 := authorTestTask("2.2 high-prio task", "ffmpeg")
att2_2.Priority = 100 att2_2.Priority = 100
att2_2.Dependencies = []*job_compilers.AuthoredTask{&att2_1} att2_2.Dependencies = []*job_compilers.AuthoredTask{&att2_1}
att2_3 := authorTestTask("2.3 highest-prio task", "blender-render") att2_3 := authorTestTask("2.3 highest-prio task", "blender")
att2_3.Priority = 150 att2_3.Priority = 150
atj2 := authorTestJob( atj2 := authorTestJob(
"7180617b-da70-411c-8b38-b972ab2bae8d", "7180617b-da70-411c-8b38-b972ab2bae8d",
@ -218,7 +218,7 @@ func linuxWorker(t *testing.T, db *DB) Worker {
Name: "Linux", Name: "Linux",
Platform: "linux", Platform: "linux",
Status: api.WorkerStatusAwake, Status: api.WorkerStatusAwake,
SupportedTaskTypes: "blender,ffmpeg,file-management", SupportedTaskTypes: "blender,ffmpeg,file-management,misc",
} }
err := db.gormDB.Save(&w).Error err := db.gormDB.Save(&w).Error

View File

@ -23,6 +23,7 @@ package persistence
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"gitlab.com/blender/flamenco-ng-poc/pkg/api" "gitlab.com/blender/flamenco-ng-poc/pkg/api"
"gorm.io/gorm" "gorm.io/gorm"
@ -44,6 +45,11 @@ type Worker struct {
SupportedTaskTypes string `gorm:"type:varchar(255);not null"` // comma-separated list of task types. SupportedTaskTypes string `gorm:"type:varchar(255);not null"` // comma-separated list of task types.
} }
// TaskTypes returns the worker's supported task types as list of strings.
func (w *Worker) TaskTypes() []string {
return strings.Split(w.SupportedTaskTypes, ",")
}
func (db *DB) CreateWorker(ctx context.Context, w *Worker) error { func (db *DB) CreateWorker(ctx context.Context, w *Worker) error {
if err := db.gormDB.Create(w).Error; err != nil { if err := db.gormDB.Create(w).Error; err != nil {
return fmt.Errorf("error creating new worker: %v", err) return fmt.Errorf("error creating new worker: %v", err)