From eea219c3e21dcfacc109e60c8a1ab1744d60300d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Mon, 14 Feb 2022 16:53:21 +0100 Subject: [PATCH] Move task scheduler into persistence layer --- internal/manager/api_impl/api_impl.go | 4 ++ .../task_scheduler.go | 28 ++++---------- .../task_scheduler_test.go | 37 ++++++++----------- 3 files changed, 26 insertions(+), 43 deletions(-) rename internal/manager/{task_scheduler => persistence}/task_scheduler.go (80%) rename internal/manager/{task_scheduler => persistence}/task_scheduler_test.go (87%) diff --git a/internal/manager/api_impl/api_impl.go b/internal/manager/api_impl/api_impl.go index 7db3bab5..28f36cf0 100644 --- a/internal/manager/api_impl/api_impl.go +++ b/internal/manager/api_impl/api_impl.go @@ -42,6 +42,10 @@ type PersistenceService interface { CreateWorker(ctx context.Context, w *persistence.Worker) error FetchWorker(ctx context.Context, uuid string) (*persistence.Worker, error) SaveWorkerStatus(ctx context.Context, w *persistence.Worker) error + + // ScheduleTask finds a task to execute by the given worker, and assigns it to that worker. + // If no task is available, (nil, nil) is returned, as this is not an error situation. + ScheduleTask(w *persistence.Worker) (*persistence.Task, error) } type JobCompiler interface { diff --git a/internal/manager/task_scheduler/task_scheduler.go b/internal/manager/persistence/task_scheduler.go similarity index 80% rename from internal/manager/task_scheduler/task_scheduler.go rename to internal/manager/persistence/task_scheduler.go index f5483bdb..23b453cd 100644 --- a/internal/manager/task_scheduler/task_scheduler.go +++ b/internal/manager/persistence/task_scheduler.go @@ -1,5 +1,4 @@ -// Package task_scheduler can choose which task to assign to a worker. -package task_scheduler +package persistence /* ***** BEGIN GPL LICENSE BLOCK ***** * @@ -25,7 +24,6 @@ import ( "errors" "github.com/rs/zerolog/log" - "gitlab.com/blender/flamenco-ng-poc/internal/manager/persistence" "gitlab.com/blender/flamenco-ng-poc/pkg/api" "gorm.io/gorm" ) @@ -36,22 +34,10 @@ var ( schedulableJobStatuses = []api.JobStatus{api.JobStatusActive, api.JobStatusQueued, api.JobStatusRequeued} ) -type TaskScheduler struct { - db PersistenceService -} - -type PersistenceService interface { - GormDB() *gorm.DB -} - -func NewTaskScheduler(db PersistenceService) *TaskScheduler { - return &TaskScheduler{db} -} - // ScheduleTask finds a task to execute by the given worker. // If no task is available, (nil, nil) is returned, as this is not an error situation. -func (ts *TaskScheduler) ScheduleTask(w *persistence.Worker) (*persistence.Task, error) { - task, err := ts.findTaskForWorker(w) +func (db *DB) ScheduleTask(w *Worker) (*Task, error) { + task, err := db.findTaskForWorker(w) // TODO: Mark the task as Active, and push the status change to whatever I think up to handle those changes. // TODO: Store in the database that this task is assigned to this worker. @@ -59,14 +45,14 @@ func (ts *TaskScheduler) ScheduleTask(w *persistence.Worker) (*persistence.Task, return task, err } -func (ts *TaskScheduler) findTaskForWorker(w *persistence.Worker) (*persistence.Task, error) { +func (db *DB) findTaskForWorker(w *Worker) (*Task, error) { logger := log.With().Str("worker", w.UUID).Logger() logger.Debug().Msg("finding task for worker") - task := persistence.Task{} - db := ts.db.GormDB() - tx := db.Debug(). + task := Task{} + gormDB := db.GormDB() + tx := gormDB.Debug(). Model(&task). Joins("left join jobs on tasks.job_id = jobs.id"). Joins("left join task_dependencies on tasks.id = task_dependencies.task_id"). diff --git a/internal/manager/task_scheduler/task_scheduler_test.go b/internal/manager/persistence/task_scheduler_test.go similarity index 87% rename from internal/manager/task_scheduler/task_scheduler_test.go rename to internal/manager/persistence/task_scheduler_test.go index b0eaf8ad..a853b58e 100644 --- a/internal/manager/task_scheduler/task_scheduler_test.go +++ b/internal/manager/persistence/task_scheduler_test.go @@ -1,5 +1,4 @@ -// Package task_scheduler can choose which task to assign to a worker. -package task_scheduler +package persistence /* ***** BEGIN GPL LICENSE BLOCK ***** * @@ -29,30 +28,27 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" "gitlab.com/blender/flamenco-ng-poc/internal/manager/job_compilers" - "gitlab.com/blender/flamenco-ng-poc/internal/manager/persistence" "gitlab.com/blender/flamenco-ng-poc/pkg/api" ) func TestNoTasks(t *testing.T) { - db := persistence.CreateTestDB(t) - ts := NewTaskScheduler(db) + db := CreateTestDB(t) w := linuxWorker() - task, err := ts.ScheduleTask(&w) + task, err := db.ScheduleTask(&w) assert.Nil(t, task) assert.NoError(t, err) } func TestOneJobOneTask(t *testing.T) { - db := persistence.CreateTestDB(t) - ts := NewTaskScheduler(db) + db := CreateTestDB(t) w := linuxWorker() authTask := authorTestTask("the task", "blender-render") atj := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask) job := constructTestJob(t, db, atj) - task, err := ts.ScheduleTask(&w) + task, err := db.ScheduleTask(&w) assert.NoError(t, err) if task == nil { t.Fatal("task is nil") @@ -61,8 +57,7 @@ func TestOneJobOneTask(t *testing.T) { } func TestOneJobThreeTasksByPrio(t *testing.T) { - db := persistence.CreateTestDB(t) - ts := NewTaskScheduler(db) + db := CreateTestDB(t) w := linuxWorker() att1 := authorTestTask("1 low-prio task", "blender-render") @@ -76,7 +71,7 @@ func TestOneJobThreeTasksByPrio(t *testing.T) { job := constructTestJob(t, db, atj) - task, err := ts.ScheduleTask(&w) + task, err := db.ScheduleTask(&w) assert.NoError(t, err) if task == nil { t.Fatal("task is nil") @@ -91,8 +86,7 @@ func TestOneJobThreeTasksByPrio(t *testing.T) { } func TestOneJobThreeTasksByDependencies(t *testing.T) { - db := persistence.CreateTestDB(t) - ts := NewTaskScheduler(db) + db := CreateTestDB(t) w := linuxWorker() att1 := authorTestTask("1 low-prio task", "blender-render") @@ -106,7 +100,7 @@ func TestOneJobThreeTasksByDependencies(t *testing.T) { att1, att2, att3) job := constructTestJob(t, db, atj) - task, err := ts.ScheduleTask(&w) + task, err := db.ScheduleTask(&w) assert.NoError(t, err) if task == nil { t.Fatal("task is nil") @@ -116,8 +110,7 @@ func TestOneJobThreeTasksByDependencies(t *testing.T) { } func TestTwoJobsThreeTasks(t *testing.T) { - db := persistence.CreateTestDB(t) - ts := NewTaskScheduler(db) + db := CreateTestDB(t) w := linuxWorker() att1_1 := authorTestTask("1.1 low-prio task", "blender-render") @@ -145,7 +138,7 @@ func TestTwoJobsThreeTasks(t *testing.T) { constructTestJob(t, db, atj1) job2 := constructTestJob(t, db, atj2) - task, err := ts.ScheduleTask(&w) + task, err := db.ScheduleTask(&w) assert.NoError(t, err) if task == nil { t.Fatal("task is nil") @@ -164,8 +157,8 @@ func TestTwoJobsThreeTasks(t *testing.T) { // To test: variable replacement func constructTestJob( - t *testing.T, db *persistence.DB, authoredJob job_compilers.AuthoredJob, -) *persistence.Job { + t *testing.T, db *DB, authoredJob job_compilers.AuthoredJob, +) *Job { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() @@ -208,8 +201,8 @@ func authorTestTask(name, taskType string, dependencies ...*job_compilers.Author return task } -func linuxWorker() persistence.Worker { - w := persistence.Worker{ +func linuxWorker() Worker { + w := Worker{ UUID: "b13b8322-3e96-41c3-940a-3d581008a5f8", Name: "Linux", Platform: "linux",