Initial task scheduler implementation

This commit is contained in:
Sybren A. Stüvel 2022-02-01 17:17:11 +01:00
parent 2ee66af6d4
commit 97ab93d996
8 changed files with 348 additions and 28 deletions

View File

@ -64,3 +64,10 @@ func openDB(ctx context.Context, uri string) (*DB, error) {
} }
return &db, nil return &db, nil
} }
// GormDB returns the GORM interface.
// This should only be used for the one Task Scheduler Monster Query. Other
// operations should just be implemented as a function on DB.
func (db *DB) GormDB() *gorm.DB {
return db.gormDB
}

View File

@ -126,6 +126,7 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au
return fmt.Errorf("error storing job: %v", err) return fmt.Errorf("error storing job: %v", err)
} }
uuidToTask := make(map[string]*Task)
for _, authoredTask := range authoredJob.Tasks { for _, authoredTask := range authoredJob.Tasks {
var commands []Command var commands []Command
for _, authoredCommand := range authoredTask.Commands { for _, authoredCommand := range authoredTask.Commands {
@ -141,13 +142,41 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au
UUID: authoredTask.UUID, UUID: authoredTask.UUID,
Job: &dbJob, Job: &dbJob,
Priority: authoredTask.Priority, Priority: authoredTask.Priority,
Status: string(api.TaskStatusProcessing), // TODO: is this the right place to set the default status? Status: string(api.TaskStatusQueued),
// TODO: store dependencies
Commands: commands, Commands: commands,
// dependencies are stored below.
} }
if err := db.gormDB.Create(&dbTask).Error; err != nil { if err := db.gormDB.Create(&dbTask).Error; err != nil {
return fmt.Errorf("error storing task: %v", err) return fmt.Errorf("error storing task: %v", err)
} }
uuidToTask[authoredTask.UUID] = &dbTask
}
// Store the dependencies between tasks.
for _, authoredTask := range authoredJob.Tasks {
if len(authoredTask.Dependencies) == 0 {
continue
}
dbTask, ok := uuidToTask[authoredTask.UUID]
if !ok {
return fmt.Errorf("unable to find task %q in the database, even though it was just authored", authoredTask.UUID)
}
deps := make([]*Task, len(authoredTask.Dependencies))
for i, t := range authoredTask.Dependencies {
depTask, ok := uuidToTask[t.UUID]
if !ok {
return fmt.Errorf("error finding task with UUID %q; a task depends on a task that is not part of this job", t.UUID)
}
deps[i] = depTask
}
dbTask.Dependencies = deps
if err := db.gormDB.Save(dbTask).Error; err != nil {
return fmt.Errorf("unable to store dependencies of task %q: %w", authoredTask.UUID, err)
}
} }
return nil return nil
@ -163,3 +192,10 @@ func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) {
return &dbJob, nil return &dbJob, nil
} }
func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error {
if err := db.gormDB.Model(j).Updates(Job{Status: j.Status}).Error; err != nil {
return fmt.Errorf("error saving job status: %v", err)
}
return nil
}

View File

@ -42,7 +42,10 @@ func CreateTestDB(t *testing.T) *DB {
// Erase everything in the database. // Erase everything in the database.
var tx *gorm.DB var tx *gorm.DB
tx = db.gormDB.Exec("DROP SCHEMA public CASCADE") tx = db.gormDB.Exec("DROP SCHEMA IF EXISTS public CASCADE")
if tx.Error != nil {
t.Fatalf("error dropping database schema: %v", tx.Error)
}
assert.NoError(t, tx.Error) assert.NoError(t, tx.Error)
tx = db.gormDB.Exec("CREATE SCHEMA public") tx = db.gormDB.Exec("CREATE SCHEMA public")
assert.NoError(t, tx.Error) assert.NoError(t, tx.Error)

View File

@ -0,0 +1,95 @@
// Package task_scheduler can choose which task to assign to a worker.
package task_scheduler
/* ***** BEGIN GPL LICENSE BLOCK *****
*
* Original Code Copyright (C) 2022 Blender Foundation.
*
* This file is part of Flamenco.
*
* Flamenco is free software: you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free Software
* Foundation, either version 3 of the License, or (at your option) any later
* version.
*
* Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Flamenco. If not, see <https://www.gnu.org/licenses/>.
*
* ***** END GPL LICENSE BLOCK ***** */
import (
"errors"
"github.com/rs/zerolog/log"
"gitlab.com/blender/flamenco-ng-poc/internal/manager/persistence"
"gitlab.com/blender/flamenco-ng-poc/pkg/api"
"gorm.io/gorm"
)
var (
schedulableTaskStatuses = []api.TaskStatus{api.TaskStatusQueued, api.TaskStatusSoftFailed}
completedTaskStatuses = []api.TaskStatus{api.TaskStatusCompleted}
schedulableJobStatuses = []api.JobStatus{api.JobStatusActive, api.JobStatusQueued, api.JobStatusRequeued}
)
type TaskScheduler struct {
db PersistenceService
}
type PersistenceService interface {
GormDB() *gorm.DB
}
func NewTaskScheduler(db PersistenceService) *TaskScheduler {
return &TaskScheduler{db}
}
// ScheduleTask finds a task to execute by the given worker.
// If no task is available, (nil, nil) is returned, as this is not an error situation.
func (ts *TaskScheduler) ScheduleTask(w *persistence.Worker) (*persistence.Task, error) {
task, err := ts.findTaskForWorker(w)
// TODO: Mark the task as Active, and push the status change to whatever I think up to handle those changes.
// TODO: Store in the database that this task is assigned to this worker.
return task, err
}
func (ts *TaskScheduler) findTaskForWorker(w *persistence.Worker) (*persistence.Task, error) {
logger := log.With().Str("worker", w.UUID).Logger()
logger.Debug().Msg("finding task for worker")
task := persistence.Task{}
db := ts.db.GormDB()
tx := db.Debug().
Model(&task).
Joins("left join jobs on tasks.job_id = jobs.id").
Joins("left join task_dependencies on tasks.id = task_dependencies.task_id").
Joins("left join tasks as tdeps on tdeps.id = task_dependencies.dependency_id").
Where("tasks.status in ?", schedulableTaskStatuses). // Schedulable task statuses
Where("tdeps.status in ? or tdeps.status is NULL", completedTaskStatuses). // Dependencies completed
Where("jobs.status in ?", schedulableJobStatuses). // Schedulable job statuses
// TODO: Supported task types
// TODO: Non-blacklisted
// TODO: Highest job priority
Order("priority desc"). // Highest task priority
Limit(1).
Preload("Job").
First(&task)
if tx.Error != nil {
if errors.Is(tx.Error, gorm.ErrRecordNotFound) {
logger.Debug().Msg("no task for worker")
return nil, nil
}
logger.Error().Err(tx.Error).Msg("error finding task for worker")
return nil, tx.Error
}
return &task, nil
}

View File

@ -0,0 +1,181 @@
// Package task_scheduler can choose which task to assign to a worker.
package task_scheduler
/* ***** BEGIN GPL LICENSE BLOCK *****
*
* Original Code Copyright (C) 2022 Blender Foundation.
*
* This file is part of Flamenco.
*
* Flamenco is free software: you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free Software
* Foundation, either version 3 of the License, or (at your option) any later
* version.
*
* Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Flamenco. If not, see <https://www.gnu.org/licenses/>.
*
* ***** END GPL LICENSE BLOCK ***** */
import (
"context"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"gitlab.com/blender/flamenco-ng-poc/internal/manager/job_compilers"
"gitlab.com/blender/flamenco-ng-poc/internal/manager/persistence"
"gitlab.com/blender/flamenco-ng-poc/pkg/api"
)
func TestNoTasks(t *testing.T) {
db := persistence.CreateTestDB(t)
ts := NewTaskScheduler(db)
w := linuxWorker()
task, err := ts.ScheduleTask(&w)
assert.Nil(t, task)
assert.NoError(t, err)
}
func TestOneJobOneTask(t *testing.T) {
db := persistence.CreateTestDB(t)
ts := NewTaskScheduler(db)
w := linuxWorker()
authTask := authorTestTask("the task", "blender-render")
atj := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask)
job := constructTestJob(t, db, atj)
task, err := ts.ScheduleTask(&w)
assert.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
assert.Equal(t, job.ID, task.JobID)
}
func TestOneJobThreeTasksByPrio(t *testing.T) {
db := persistence.CreateTestDB(t)
ts := NewTaskScheduler(db)
w := linuxWorker()
att1 := authorTestTask("1 low-prio task", "blender-render")
att2 := authorTestTask("2 high-prio task", "render-preview")
att2.Priority = 100
att3 := authorTestTask("3 low-prio task", "blender-render")
atj := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render",
att1, att2, att3)
job := constructTestJob(t, db, atj)
task, err := ts.ScheduleTask(&w)
assert.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
assert.Equal(t, job.ID, task.JobID)
if task.Job == nil {
t.Fatal("task.Job is nil")
}
assert.Equal(t, att2.Name, task.Name, "the high-prio task should have been chosen")
}
func TestOneJobThreeTasksByDependencies(t *testing.T) {
db := persistence.CreateTestDB(t)
ts := NewTaskScheduler(db)
w := linuxWorker()
att1 := authorTestTask("1 low-prio task", "blender-render")
att2 := authorTestTask("2 high-prio task", "render-preview")
att2.Priority = 100
att2.Dependencies = []*job_compilers.AuthoredTask{&att1}
att3 := authorTestTask("3 low-prio task", "blender-render")
atj := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render",
att1, att2, att3)
job := constructTestJob(t, db, atj)
task, err := ts.ScheduleTask(&w)
assert.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
assert.Equal(t, job.ID, task.JobID)
assert.Equal(t, att1.Name, task.Name, "the first task should have been chosen")
}
// To test: worker with non-active state.
// Unlike Flamenco v2, this Manager shouldn't change a worker's status
// simply because it requests a task. New tasks for non-awake workers
// should be rejected.
// To test: blacklists
// To test: variable replacement
func constructTestJob(
t *testing.T, db *persistence.DB, authoredJob job_compilers.AuthoredJob,
) *persistence.Job {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err := db.StoreAuthoredJob(ctx, authoredJob)
assert.NoError(t, err)
dbJob, err := db.FetchJob(ctx, authoredJob.JobID)
assert.NoError(t, err)
// Queue the job.
dbJob.Status = string(api.JobStatusQueued)
err = db.SaveJobStatus(ctx, dbJob)
assert.NoError(t, err)
return dbJob
}
func authorTestJob(jobUUID, jobType string, tasks ...job_compilers.AuthoredTask) job_compilers.AuthoredJob {
job := job_compilers.AuthoredJob{
JobID: jobUUID,
Name: "test job",
JobType: jobType,
Priority: 50,
Status: api.JobStatusQueued,
Created: time.Now(),
Tasks: tasks,
}
return job
}
func authorTestTask(name, taskType string, dependencies ...*job_compilers.AuthoredTask) job_compilers.AuthoredTask {
task := job_compilers.AuthoredTask{
Name: name,
Type: taskType,
UUID: uuid.NewString(),
Priority: 50,
Commands: make([]job_compilers.AuthoredCommand, 0),
Dependencies: dependencies,
}
return task
}
func linuxWorker() persistence.Worker {
w := persistence.Worker{
UUID: "b13b8322-3e96-41c3-940a-3d581008a5f8",
Name: "Linux",
Platform: "linux",
Status: api.WorkerStatusAwake,
SupportedTaskTypes: "blender,ffmpeg,file-management",
}
return w
}

View File

@ -296,7 +296,7 @@ components:
TaskStatus: TaskStatus:
type: string type: string
enum: [active, canceled, completed, failed, queued, soft-failed, cancel-requested, fail-requested, paused, processing] enum: [active, canceled, completed, failed, queued, soft-failed, cancel-requested, fail-requested, paused]
Command: Command:
type: object type: object

View File

@ -47,28 +47,28 @@ var swaggerSpec = []string{
"kN9+vv7l+tfrv13/8tvP13+//vX6r+12y/wPB93AH05ZpjmL5tGH8PMKNZiV8mJp+J8hmh8iT1bT1C5p", "kN9+vv7l+tfrv13/8tvP13+//vX6r+12y/wPB93AH05ZpjmL5tGH8PMKNZiV8mJp+J8hmh8iT1bT1C5p",
"ybiqIAed0uX082im3cqZWc3OVYIGDBIePT6cui3boeT4zUv8WZho/vhJHK0wjTXRPHo0eXSA6XRO12CW", "ybiqIAed0uX082im3cqZWc3OVYIGDBIePT6cui3boeT4zUv8WZho/vhJHK0wjTXRPHo0eXSA6XRO12CW",
"Si8vOQOFOYJ7E8WRKm1RWl9KwHsL0ni9TAsHOZ6CpZ/VJckfUhPV8gvDUVWTwPjEL/FduK51NXrcE2vr", "Si8vOQOFOYJ7E8WRKm1RWl9KwHsL0ni9TAsHOZ6CpZ/VJckfUhPV8gvDUVWTwPjEL/FduK51NXrcE2vr",
"uHbbHl9dC6NyRhp+LXXtC/PV1FatfrMzBGcOXbaaqjHfaLUU7xBP6shRQz36fhNZbhMn6qBTaJWCwXA9", "uHbbHl9dC6NyRhp+LXXtC/PV1FatfrMzBGcOXbaaqjHfaLUU7xBP6shRQz36fhNZbhMnQtAZA3+Pjz4E",
"Ggk8WPp4oKl32j5MfASaQ6rBjg99JCr3lBJO6gDq6BEtQB7TWAgefC3f3lUS98xRK27cGu+b0AfPMirX", "aOr9tI8MHwHgkGqw40MfCcQ9PYSTOhg6ekQLg8eUFOIFX8u3d5XEPXPUChW3hvgm2sGzjMo1DFnwwWbZ",
"MGTBR55lYyt3iqZ9qfc3uxVRbBdV90DLHgq6jmgs1dbnXnRDL1yINgIA03hwITOOTFZapjaucQUmzFar", "mMedAmhf6v3NbkUU20XVPdCyh4Ku7xlLtfXpFt3QCxeVjQDAzB1clIwjk5WWqY3rVYEJs9VqJbiEEY/y",
"leASRjzKG70Luguk2rO3cQQsaYm4PyhiDGhUNOHGJc9+Mjl6HpOCGrNRmlVD3sp9K51QW03VLffFYOrk", "Ru/i7AKp9uxtHAFLWiLUD+oWAxoVTbhx+bKfTI6ex6SgxmyUZtWQt3LfPSfUVlN1y30xfjp5ubYaNTxt",
"5Xps1PC0SVQya4voCmnkcqV8rSktTW1T9EZVUCYnQNGJSi3CSjOfzVZVyOZqNqwtvvMdxBdU5yT3TQTy", "cpPM2iK6Qhq5XClfXkpLU9vUuVEVh8kJUHSiUouw0sxns1UVpbmaDcuJ73zT8AXVOcl934A8PT7C/IWn",
"9PgIkxmegjTQOufl8beXh4P9N5vNdC1LjOCzsMbM1oWYHE4PpiCnmc190s+t6FAbjovi6BJ0iHGPpgfT", "IA20znl5/O3l4WD/zWYzXcsSg/YsrDGzdSEmh9ODKchpZnOf53MrOtSG46I4ugQdwtqj6cH0AGerAiQt",
"A5ytCpC04Bju3SuER5s5zcxowV30dTapjBMFWqYT5hHzXcScW19eBkv/SrFtJT6Qbg0tCsFTt2p2bjyM", "OEZ49woR0WZOMzNacBdwnU0q40SBlumEecR84zDn1leUwdK/UmxbiQ+kW0OLQvDUrZqdGw+j3m73WXW3",
"ervdZ9XdWvpqIFXX4VIhdYraRo8ZhfMCUyiUFJ70+ODgk1K2oYaYMk3BrEohtsR/XwBGuLSKcMn4JWcl", "fL4aSNU1tVTIlqK20WMS4bzAFAolhSc9Pjj4pJRtqCGmTFMwq1KILfGfFIARLq0iXDJ+yVlJhf8KMe19",
"Ff6TxLT3PeZeyPTZ7Qh9boBUyavzzTLPqd7WWiWUSNi4bhjWrrU5hRZYq2fkPmBQTClckwqL8vZ2r6qe", "grkXMn1CO0KfGyBVvup8s8xzqre1VgklEjauAYblam1OoevVahO5bxYUswjXl8I6vL3dq6qNbtD4CEhW",
"ukHjIyBZobi0jt/axmZ1TFjDiKG9BFs37h5Qq8Mu4Yjo6klNp7AnwJdgiRh0E12jLQOue83WG0TXHFWL", "KC6t47e2sVkdE9YwYmgvwda9ugfU6rAxOCK6elLTHOwJ8CVYIgYNRNdby4DrXn/1BtE1R9XiP2++K3bk",
"/7z5yNiR34dzlSw5u9opwhdg08y7anO+62Zx5Cr02gME+c0GHhW35Liv4jt9QD3d4HQOvrvqcJy7AUIT", "9+FcJUvOrnaK8AXYNPOu2pzvGlgcuQrt9QBBfrOBR8UtOe4r8k4fUE83OJ2D7646HOdugNDEf99yuruF",
"/7HL6e4WdusXSRZANEfKK7H7CDPToWEw2TT9glGwrDoLoa/wMIg5krSOCMrPQheuqP+k4DnosYyQKNG8", "3fpFkgUQzZHySuw+wsx06BFMNk2LYBQsq2ZCaCU8DGKOJK0jgvKz0IUr6j8peA7aKiMkSjQvQSoaPik4",
"BKlo+KTgWEp4X0CKtTiEOW3DqMgPCLmp9FnZUnhxOrLIqwRxoVlp+hZl+FpO1Gp1Q9zFJHy1GkLhk2EO", "lhLeF5Bi+Q1hTtswKvIDQm4qfVa2FF6cjizyKkFcaFaavkUZvpYTtVrdEHcxCV+thlD4ZJhD/f4EGZJA",
"9fsTZEgCHfZ00r8fTxE1Gpm9pvqinfdRQ6r0co+0n1ER2rHewtwtCgHe9asIdiHd12DYfq6BrJW/x+G2", "hz2d9O/HU0SNRmavqb5o533UkCq93CPtZ1SEDqy3MHdxQoB3/SqCXUj3ARi2n2sga+Wvbrjtp+MqkXs0",
"n46rRO7RiHxQpw5H7HbnuqvwKX15WFf9VzjzrW3waWkzkBaJAvKuPDh4/EeC1lDdHNjUH8ru2SA1ULbF", "Ih/UqcMRu925biR8Sl8e1lX/Fc58axt8WtoMpEWigLwrDw4e/5GgNVSXBTb1t7F7NkgNlG1xFu7nv81a",
"Wbif/1BrFfEfQEIeUSl8aK6ojp2ZQEtl0X/aMhylJHXjpCmWr+JdYEZ2r/h9m9TdzSPNIL0gm+r6SgYa", "Rfw3j5BHVAofmiuqY2cm0FJZ9J+2DEcpSd04aYrlq3gXmJHdK37fJnV380gzSC/IprqxkoEGf6tku0MI",
"/BWT7Q4hjNvBJG21FkbBa6QN8aBA1j5oRLxv6tDo+bwFnv1vxb2A50FvXghTcpJxQ1J3vy1x11JoioAh", "43YwSVuthVHwGmlDPCiQtQ8aEe+bOjR6Pm+BZ/9bcS/gedCbF8KUnGTckNRdaUvcTRSaImAIYD4x9ZeW",
"gPnE1N9gCljSdDw7thITpRG5KqlU+AJ6IlRKhYM2Ksx949kldLgpzcBUbbjAuyO8phmwUsCJ/wD0cAVg", "ApY0Tc6OrcREaUSuSioVvoCeCJVS4aCNCnPfeHYJHW5KMzBVG+7s7givaQasFHDiv/k8XAHYvkE8olh3",
"+zrxiGLdReJ25bsLqN6ocNGwe3nKXSKp7lZcxdGTg8P7a0l0vmiNEH8MuirCn4PkHjSfPD78tIhfGTeV", "d7hd+e4Cqjcq3C3s3pdy90aq6xRXcfTk4PD+WhKdj1gjxB+Drorw5yC5B80njw8/LeJXxk2lVJaoxFIu",
"UlmiEku5dNmwk1dMktL6K1hr5W6DSuXgzzvBHR3prd+d1vu3dLfPwp2qTbA73W5bOBL0ZVU++zbdLGqB", "XTbs5BWTpLT+1tVauQugUjn4805wR0d663en9f4t3e2zcKdqE+xOt9sWjgR9WZXPvk03i1og37eGr9AC",
"fN8avkILwP/OCNyFP6RkDdbV8/V1g4SKRNBOGW7cHZJeB+L4qNuTaQcNleel9OmKuxvcb9xMm+0D31en", "8L8zAnfHDylZg3X1fH3DIKEiEbRThht3baTXgTg+6vZk2kFD5XkpfbrirgP3GzfTZvvA99Xp1b8CAAD/",
"V/8KAAD//8XqVAxiLwAA", "/6TghGFVLwAA",
} }
// GetSwagger returns the content of the embedded swagger specification file // GetSwagger returns the content of the embedded swagger specification file

View File

@ -82,8 +82,6 @@ const (
TaskStatusPaused TaskStatus = "paused" TaskStatusPaused TaskStatus = "paused"
TaskStatusProcessing TaskStatus = "processing"
TaskStatusQueued TaskStatus = "queued" TaskStatusQueued TaskStatus = "queued"
TaskStatusSoftFailed TaskStatus = "soft-failed" TaskStatusSoftFailed TaskStatus = "soft-failed"