diff --git a/internal/manager/persistence/db.go b/internal/manager/persistence/db.go index add9eaca..02da5eab 100644 --- a/internal/manager/persistence/db.go +++ b/internal/manager/persistence/db.go @@ -64,3 +64,10 @@ func openDB(ctx context.Context, uri string) (*DB, error) { } return &db, nil } + +// GormDB returns the GORM interface. +// This should only be used for the one Task Scheduler Monster Query. Other +// operations should just be implemented as a function on DB. +func (db *DB) GormDB() *gorm.DB { + return db.gormDB +} diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index e9be15f3..daa794d9 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -126,6 +126,7 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au return fmt.Errorf("error storing job: %v", err) } + uuidToTask := make(map[string]*Task) for _, authoredTask := range authoredJob.Tasks { var commands []Command for _, authoredCommand := range authoredTask.Commands { @@ -141,13 +142,41 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au UUID: authoredTask.UUID, Job: &dbJob, Priority: authoredTask.Priority, - Status: string(api.TaskStatusProcessing), // TODO: is this the right place to set the default status? - // TODO: store dependencies + Status: string(api.TaskStatusQueued), Commands: commands, + // dependencies are stored below. } if err := db.gormDB.Create(&dbTask).Error; err != nil { return fmt.Errorf("error storing task: %v", err) } + + uuidToTask[authoredTask.UUID] = &dbTask + } + + // Store the dependencies between tasks. + for _, authoredTask := range authoredJob.Tasks { + if len(authoredTask.Dependencies) == 0 { + continue + } + + dbTask, ok := uuidToTask[authoredTask.UUID] + if !ok { + return fmt.Errorf("unable to find task %q in the database, even though it was just authored", authoredTask.UUID) + } + + deps := make([]*Task, len(authoredTask.Dependencies)) + for i, t := range authoredTask.Dependencies { + depTask, ok := uuidToTask[t.UUID] + if !ok { + return fmt.Errorf("error finding task with UUID %q; a task depends on a task that is not part of this job", t.UUID) + } + deps[i] = depTask + } + + dbTask.Dependencies = deps + if err := db.gormDB.Save(dbTask).Error; err != nil { + return fmt.Errorf("unable to store dependencies of task %q: %w", authoredTask.UUID, err) + } } return nil @@ -163,3 +192,10 @@ func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) { return &dbJob, nil } + +func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error { + if err := db.gormDB.Model(j).Updates(Job{Status: j.Status}).Error; err != nil { + return fmt.Errorf("error saving job status: %v", err) + } + return nil +} diff --git a/internal/manager/persistence/test_support.go b/internal/manager/persistence/test_support.go index ae36c01c..afbd7e9e 100644 --- a/internal/manager/persistence/test_support.go +++ b/internal/manager/persistence/test_support.go @@ -42,7 +42,10 @@ func CreateTestDB(t *testing.T) *DB { // Erase everything in the database. var tx *gorm.DB - tx = db.gormDB.Exec("DROP SCHEMA public CASCADE") + tx = db.gormDB.Exec("DROP SCHEMA IF EXISTS public CASCADE") + if tx.Error != nil { + t.Fatalf("error dropping database schema: %v", tx.Error) + } assert.NoError(t, tx.Error) tx = db.gormDB.Exec("CREATE SCHEMA public") assert.NoError(t, tx.Error) diff --git a/internal/manager/task_scheduler/task_scheduler.go b/internal/manager/task_scheduler/task_scheduler.go new file mode 100644 index 00000000..29b0c0f2 --- /dev/null +++ b/internal/manager/task_scheduler/task_scheduler.go @@ -0,0 +1,95 @@ +// Package task_scheduler can choose which task to assign to a worker. +package task_scheduler + +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + +import ( + "errors" + + "github.com/rs/zerolog/log" + "gitlab.com/blender/flamenco-ng-poc/internal/manager/persistence" + "gitlab.com/blender/flamenco-ng-poc/pkg/api" + "gorm.io/gorm" +) + +var ( + schedulableTaskStatuses = []api.TaskStatus{api.TaskStatusQueued, api.TaskStatusSoftFailed} + completedTaskStatuses = []api.TaskStatus{api.TaskStatusCompleted} + schedulableJobStatuses = []api.JobStatus{api.JobStatusActive, api.JobStatusQueued, api.JobStatusRequeued} +) + +type TaskScheduler struct { + db PersistenceService +} + +type PersistenceService interface { + GormDB() *gorm.DB +} + +func NewTaskScheduler(db PersistenceService) *TaskScheduler { + return &TaskScheduler{db} +} + +// ScheduleTask finds a task to execute by the given worker. +// If no task is available, (nil, nil) is returned, as this is not an error situation. +func (ts *TaskScheduler) ScheduleTask(w *persistence.Worker) (*persistence.Task, error) { + task, err := ts.findTaskForWorker(w) + + // TODO: Mark the task as Active, and push the status change to whatever I think up to handle those changes. + // TODO: Store in the database that this task is assigned to this worker. + + return task, err +} + +func (ts *TaskScheduler) findTaskForWorker(w *persistence.Worker) (*persistence.Task, error) { + + logger := log.With().Str("worker", w.UUID).Logger() + logger.Debug().Msg("finding task for worker") + + task := persistence.Task{} + db := ts.db.GormDB() + tx := db.Debug(). + Model(&task). + Joins("left join jobs on tasks.job_id = jobs.id"). + Joins("left join task_dependencies on tasks.id = task_dependencies.task_id"). + Joins("left join tasks as tdeps on tdeps.id = task_dependencies.dependency_id"). + Where("tasks.status in ?", schedulableTaskStatuses). // Schedulable task statuses + Where("tdeps.status in ? or tdeps.status is NULL", completedTaskStatuses). // Dependencies completed + Where("jobs.status in ?", schedulableJobStatuses). // Schedulable job statuses + // TODO: Supported task types + // TODO: Non-blacklisted + // TODO: Highest job priority + Order("priority desc"). // Highest task priority + Limit(1). + Preload("Job"). + First(&task) + + if tx.Error != nil { + if errors.Is(tx.Error, gorm.ErrRecordNotFound) { + logger.Debug().Msg("no task for worker") + return nil, nil + } + logger.Error().Err(tx.Error).Msg("error finding task for worker") + return nil, tx.Error + } + + return &task, nil +} diff --git a/internal/manager/task_scheduler/task_scheduler_test.go b/internal/manager/task_scheduler/task_scheduler_test.go new file mode 100644 index 00000000..8547a59e --- /dev/null +++ b/internal/manager/task_scheduler/task_scheduler_test.go @@ -0,0 +1,181 @@ +// Package task_scheduler can choose which task to assign to a worker. +package task_scheduler + +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "gitlab.com/blender/flamenco-ng-poc/internal/manager/job_compilers" + "gitlab.com/blender/flamenco-ng-poc/internal/manager/persistence" + "gitlab.com/blender/flamenco-ng-poc/pkg/api" +) + +func TestNoTasks(t *testing.T) { + db := persistence.CreateTestDB(t) + ts := NewTaskScheduler(db) + w := linuxWorker() + + task, err := ts.ScheduleTask(&w) + assert.Nil(t, task) + assert.NoError(t, err) +} + +func TestOneJobOneTask(t *testing.T) { + db := persistence.CreateTestDB(t) + ts := NewTaskScheduler(db) + w := linuxWorker() + + authTask := authorTestTask("the task", "blender-render") + atj := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask) + job := constructTestJob(t, db, atj) + + task, err := ts.ScheduleTask(&w) + assert.NoError(t, err) + if task == nil { + t.Fatal("task is nil") + } + assert.Equal(t, job.ID, task.JobID) +} + +func TestOneJobThreeTasksByPrio(t *testing.T) { + db := persistence.CreateTestDB(t) + ts := NewTaskScheduler(db) + w := linuxWorker() + + att1 := authorTestTask("1 low-prio task", "blender-render") + att2 := authorTestTask("2 high-prio task", "render-preview") + att2.Priority = 100 + att3 := authorTestTask("3 low-prio task", "blender-render") + atj := authorTestJob( + "1295757b-e668-4c49-8b89-f73db8270e42", + "simple-blender-render", + att1, att2, att3) + + job := constructTestJob(t, db, atj) + + task, err := ts.ScheduleTask(&w) + assert.NoError(t, err) + if task == nil { + t.Fatal("task is nil") + } + + assert.Equal(t, job.ID, task.JobID) + if task.Job == nil { + t.Fatal("task.Job is nil") + } + + assert.Equal(t, att2.Name, task.Name, "the high-prio task should have been chosen") +} + +func TestOneJobThreeTasksByDependencies(t *testing.T) { + db := persistence.CreateTestDB(t) + ts := NewTaskScheduler(db) + w := linuxWorker() + + att1 := authorTestTask("1 low-prio task", "blender-render") + att2 := authorTestTask("2 high-prio task", "render-preview") + att2.Priority = 100 + att2.Dependencies = []*job_compilers.AuthoredTask{&att1} + att3 := authorTestTask("3 low-prio task", "blender-render") + atj := authorTestJob( + "1295757b-e668-4c49-8b89-f73db8270e42", + "simple-blender-render", + att1, att2, att3) + job := constructTestJob(t, db, atj) + + task, err := ts.ScheduleTask(&w) + assert.NoError(t, err) + if task == nil { + t.Fatal("task is nil") + } + assert.Equal(t, job.ID, task.JobID) + assert.Equal(t, att1.Name, task.Name, "the first task should have been chosen") +} + +// To test: worker with non-active state. +// Unlike Flamenco v2, this Manager shouldn't change a worker's status +// simply because it requests a task. New tasks for non-awake workers +// should be rejected. + +// To test: blacklists + +// To test: variable replacement + +func constructTestJob( + t *testing.T, db *persistence.DB, authoredJob job_compilers.AuthoredJob, +) *persistence.Job { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + err := db.StoreAuthoredJob(ctx, authoredJob) + assert.NoError(t, err) + + dbJob, err := db.FetchJob(ctx, authoredJob.JobID) + assert.NoError(t, err) + + // Queue the job. + dbJob.Status = string(api.JobStatusQueued) + err = db.SaveJobStatus(ctx, dbJob) + assert.NoError(t, err) + + return dbJob +} + +func authorTestJob(jobUUID, jobType string, tasks ...job_compilers.AuthoredTask) job_compilers.AuthoredJob { + job := job_compilers.AuthoredJob{ + JobID: jobUUID, + Name: "test job", + JobType: jobType, + Priority: 50, + Status: api.JobStatusQueued, + Created: time.Now(), + Tasks: tasks, + } + return job +} + +func authorTestTask(name, taskType string, dependencies ...*job_compilers.AuthoredTask) job_compilers.AuthoredTask { + task := job_compilers.AuthoredTask{ + Name: name, + Type: taskType, + UUID: uuid.NewString(), + Priority: 50, + Commands: make([]job_compilers.AuthoredCommand, 0), + Dependencies: dependencies, + } + return task +} + +func linuxWorker() persistence.Worker { + w := persistence.Worker{ + UUID: "b13b8322-3e96-41c3-940a-3d581008a5f8", + Name: "Linux", + Platform: "linux", + Status: api.WorkerStatusAwake, + SupportedTaskTypes: "blender,ffmpeg,file-management", + } + return w +} diff --git a/pkg/api/flamenco-manager.yaml b/pkg/api/flamenco-manager.yaml index 2366d460..0c39e0a4 100644 --- a/pkg/api/flamenco-manager.yaml +++ b/pkg/api/flamenco-manager.yaml @@ -296,7 +296,7 @@ components: TaskStatus: type: string - enum: [active, canceled, completed, failed, queued, soft-failed, cancel-requested, fail-requested, paused, processing] + enum: [active, canceled, completed, failed, queued, soft-failed, cancel-requested, fail-requested, paused] Command: type: object diff --git a/pkg/api/openapi_spec.gen.go b/pkg/api/openapi_spec.gen.go index 4d1ccc8b..717b26ea 100644 --- a/pkg/api/openapi_spec.gen.go +++ b/pkg/api/openapi_spec.gen.go @@ -47,28 +47,28 @@ var swaggerSpec = []string{ "kN9+vv7l+tfrv13/8tvP13+//vX6r+12y/wPB93AH05ZpjmL5tGH8PMKNZiV8mJp+J8hmh8iT1bT1C5p", "ybiqIAed0uX082im3cqZWc3OVYIGDBIePT6cui3boeT4zUv8WZho/vhJHK0wjTXRPHo0eXSA6XRO12CW", "Si8vOQOFOYJ7E8WRKm1RWl9KwHsL0ni9TAsHOZ6CpZ/VJckfUhPV8gvDUVWTwPjEL/FduK51NXrcE2vr", - "uHbbHl9dC6NyRhp+LXXtC/PV1FatfrMzBGcOXbaaqjHfaLUU7xBP6shRQz36fhNZbhMn6qBTaJWCwXA9", - "Ggk8WPp4oKl32j5MfASaQ6rBjg99JCr3lBJO6gDq6BEtQB7TWAgefC3f3lUS98xRK27cGu+b0AfPMirX", - "MGTBR55lYyt3iqZ9qfc3uxVRbBdV90DLHgq6jmgs1dbnXnRDL1yINgIA03hwITOOTFZapjaucQUmzFar", - "leASRjzKG70Luguk2rO3cQQsaYm4PyhiDGhUNOHGJc9+Mjl6HpOCGrNRmlVD3sp9K51QW03VLffFYOrk", - "5Xps1PC0SVQya4voCmnkcqV8rSktTW1T9EZVUCYnQNGJSi3CSjOfzVZVyOZqNqwtvvMdxBdU5yT3TQTy", - "9PgIkxmegjTQOufl8beXh4P9N5vNdC1LjOCzsMbM1oWYHE4PpiCnmc190s+t6FAbjovi6BJ0iHGPpgfT", - "A5ytCpC04Bju3SuER5s5zcxowV30dTapjBMFWqYT5hHzXcScW19eBkv/SrFtJT6Qbg0tCsFTt2p2bjyM", - "ervdZ9XdWvpqIFXX4VIhdYraRo8ZhfMCUyiUFJ70+ODgk1K2oYaYMk3BrEohtsR/XwBGuLSKcMn4JWcl", - "Ff6TxLT3PeZeyPTZ7Qh9boBUyavzzTLPqd7WWiWUSNi4bhjWrrU5hRZYq2fkPmBQTClckwqL8vZ2r6qe", - "ukHjIyBZobi0jt/axmZ1TFjDiKG9BFs37h5Qq8Mu4Yjo6klNp7AnwJdgiRh0E12jLQOue83WG0TXHFWL", - "/7z5yNiR34dzlSw5u9opwhdg08y7anO+62Zx5Cr02gME+c0GHhW35Liv4jt9QD3d4HQOvrvqcJy7AUIT", - "/7HL6e4WdusXSRZANEfKK7H7CDPToWEw2TT9glGwrDoLoa/wMIg5krSOCMrPQheuqP+k4DnosYyQKNG8", - "BKlo+KTgWEp4X0CKtTiEOW3DqMgPCLmp9FnZUnhxOrLIqwRxoVlp+hZl+FpO1Gp1Q9zFJHy1GkLhk2EO", - "9fsTZEgCHfZ00r8fTxE1Gpm9pvqinfdRQ6r0co+0n1ER2rHewtwtCgHe9asIdiHd12DYfq6BrJW/x+G2", - "n46rRO7RiHxQpw5H7HbnuqvwKX15WFf9VzjzrW3waWkzkBaJAvKuPDh4/EeC1lDdHNjUH8ru2SA1ULbF", - "Wbif/1BrFfEfQEIeUSl8aK6ojp2ZQEtl0X/aMhylJHXjpCmWr+JdYEZ2r/h9m9TdzSPNIL0gm+r6SgYa", - "/BWT7Q4hjNvBJG21FkbBa6QN8aBA1j5oRLxv6tDo+bwFnv1vxb2A50FvXghTcpJxQ1J3vy1x11JoioAh", - "gPnE1N9gCljSdDw7thITpRG5KqlU+AJ6IlRKhYM2Ksx949kldLgpzcBUbbjAuyO8phmwUsCJ/wD0cAVg", - "+zrxiGLdReJ25bsLqN6ocNGwe3nKXSKp7lZcxdGTg8P7a0l0vmiNEH8MuirCn4PkHjSfPD78tIhfGTeV", - "UlmiEku5dNmwk1dMktL6K1hr5W6DSuXgzzvBHR3prd+d1vu3dLfPwp2qTbA73W5bOBL0ZVU++zbdLGqB", - "fN8avkILwP/OCNyFP6RkDdbV8/V1g4SKRNBOGW7cHZJeB+L4qNuTaQcNleel9OmKuxvcb9xMm+0D31en", - "V/8KAAD//8XqVAxiLwAA", + "uHbbHl9dC6NyRhp+LXXtC/PV1FatfrMzBGcOXbaaqjHfaLUU7xBP6shRQz36fhNZbhMnQtAZA3+Pjz4E", + "aOr9tI8MHwHgkGqw40MfCcQ9PYSTOhg6ekQLg8eUFOIFX8u3d5XEPXPUChW3hvgm2sGzjMo1DFnwwWbZ", + "mMedAmhf6v3NbkUU20XVPdCyh4Ku7xlLtfXpFt3QCxeVjQDAzB1clIwjk5WWqY3rVYEJs9VqJbiEEY/y", + "Ru/i7AKp9uxtHAFLWiLUD+oWAxoVTbhx+bKfTI6ex6SgxmyUZtWQt3LfPSfUVlN1y30xfjp5ubYaNTxt", + "cpPM2iK6Qhq5XClfXkpLU9vUuVEVh8kJUHSiUouw0sxns1UVpbmaDcuJ73zT8AXVOcl934A8PT7C/IWn", + "IA20znl5/O3l4WD/zWYzXcsSg/YsrDGzdSEmh9ODKchpZnOf53MrOtSG46I4ugQdwtqj6cH0AGerAiQt", + "OEZ49woR0WZOMzNacBdwnU0q40SBlumEecR84zDn1leUwdK/UmxbiQ+kW0OLQvDUrZqdGw+j3m73WXW3", + "fL4aSNU1tVTIlqK20WMS4bzAFAolhSc9Pjj4pJRtqCGmTFMwq1KILfGfFIARLq0iXDJ+yVlJhf8KMe19", + "grkXMn1CO0KfGyBVvup8s8xzqre1VgklEjauAYblam1OoevVahO5bxYUswjXl8I6vL3dq6qNbtD4CEhW", + "KC6t47e2sVkdE9YwYmgvwda9ugfU6rAxOCK6elLTHOwJ8CVYIgYNRNdby4DrXn/1BtE1R9XiP2++K3bk", + "9+FcJUvOrnaK8AXYNPOu2pzvGlgcuQrt9QBBfrOBR8UtOe4r8k4fUE83OJ2D7646HOdugNDEf99yuruF", + "3fpFkgUQzZHySuw+wsx06BFMNk2LYBQsq2ZCaCU8DGKOJK0jgvKz0IUr6j8peA7aKiMkSjQvQSoaPik4", + "lhLeF5Bi+Q1hTtswKvIDQm4qfVa2FF6cjizyKkFcaFaavkUZvpYTtVrdEHcxCV+thlD4ZJhD/f4EGZJA", + "hz2d9O/HU0SNRmavqb5o533UkCq93CPtZ1SEDqy3MHdxQoB3/SqCXUj3ARi2n2sga+Wvbrjtp+MqkXs0", + "Ih/UqcMRu925biR8Sl8e1lX/Fc58axt8WtoMpEWigLwrDw4e/5GgNVSXBTb1t7F7NkgNlG1xFu7nv81a", + "Rfw3j5BHVAofmiuqY2cm0FJZ9J+2DEcpSd04aYrlq3gXmJHdK37fJnV380gzSC/IprqxkoEGf6tku0MI", + "43YwSVuthVHwGmlDPCiQtQ8aEe+bOjR6Pm+BZ/9bcS/gedCbF8KUnGTckNRdaUvcTRSaImAIYD4x9ZeW", + "ApY0Tc6OrcREaUSuSioVvoCeCJVS4aCNCnPfeHYJHW5KMzBVG+7s7givaQasFHDiv/k8XAHYvkE8olh3", + "d7hd+e4Cqjcq3C3s3pdy90aq6xRXcfTk4PD+WhKdj1gjxB+Drorw5yC5B80njw8/LeJXxk2lVJaoxFIu", + "XTbs5BWTpLT+1tVauQugUjn4805wR0d663en9f4t3e2zcKdqE+xOt9sWjgR9WZXPvk03i1og37eGr9AC", + "8L8zAnfHDylZg3X1fH3DIKEiEbRThht3baTXgTg+6vZk2kFD5XkpfbrirgP3GzfTZvvA99Xp1b8CAAD/", + "/6TghGFVLwAA", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/pkg/api/openapi_types.gen.go b/pkg/api/openapi_types.gen.go index d0ea72e4..208d13d3 100644 --- a/pkg/api/openapi_types.gen.go +++ b/pkg/api/openapi_types.gen.go @@ -82,8 +82,6 @@ const ( TaskStatusPaused TaskStatus = "paused" - TaskStatusProcessing TaskStatus = "processing" - TaskStatusQueued TaskStatus = "queued" TaskStatusSoftFailed TaskStatus = "soft-failed"