Move task scheduler into persistence layer

This commit is contained in:
Sybren A. Stüvel 2022-02-14 16:53:21 +01:00
parent edfcbb1e40
commit eea219c3e2
3 changed files with 26 additions and 43 deletions

View File

@ -42,6 +42,10 @@ type PersistenceService interface {
CreateWorker(ctx context.Context, w *persistence.Worker) error CreateWorker(ctx context.Context, w *persistence.Worker) error
FetchWorker(ctx context.Context, uuid string) (*persistence.Worker, error) FetchWorker(ctx context.Context, uuid string) (*persistence.Worker, error)
SaveWorkerStatus(ctx context.Context, w *persistence.Worker) error SaveWorkerStatus(ctx context.Context, w *persistence.Worker) error
// ScheduleTask finds a task to execute by the given worker, and assigns it to that worker.
// If no task is available, (nil, nil) is returned, as this is not an error situation.
ScheduleTask(w *persistence.Worker) (*persistence.Task, error)
} }
type JobCompiler interface { type JobCompiler interface {

View File

@ -1,5 +1,4 @@
// Package task_scheduler can choose which task to assign to a worker. package persistence
package task_scheduler
/* ***** BEGIN GPL LICENSE BLOCK ***** /* ***** BEGIN GPL LICENSE BLOCK *****
* *
@ -25,7 +24,6 @@ import (
"errors" "errors"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"gitlab.com/blender/flamenco-ng-poc/internal/manager/persistence"
"gitlab.com/blender/flamenco-ng-poc/pkg/api" "gitlab.com/blender/flamenco-ng-poc/pkg/api"
"gorm.io/gorm" "gorm.io/gorm"
) )
@ -36,22 +34,10 @@ var (
schedulableJobStatuses = []api.JobStatus{api.JobStatusActive, api.JobStatusQueued, api.JobStatusRequeued} schedulableJobStatuses = []api.JobStatus{api.JobStatusActive, api.JobStatusQueued, api.JobStatusRequeued}
) )
type TaskScheduler struct {
db PersistenceService
}
type PersistenceService interface {
GormDB() *gorm.DB
}
func NewTaskScheduler(db PersistenceService) *TaskScheduler {
return &TaskScheduler{db}
}
// ScheduleTask finds a task to execute by the given worker. // ScheduleTask finds a task to execute by the given worker.
// If no task is available, (nil, nil) is returned, as this is not an error situation. // If no task is available, (nil, nil) is returned, as this is not an error situation.
func (ts *TaskScheduler) ScheduleTask(w *persistence.Worker) (*persistence.Task, error) { func (db *DB) ScheduleTask(w *Worker) (*Task, error) {
task, err := ts.findTaskForWorker(w) task, err := db.findTaskForWorker(w)
// TODO: Mark the task as Active, and push the status change to whatever I think up to handle those changes. // TODO: Mark the task as Active, and push the status change to whatever I think up to handle those changes.
// TODO: Store in the database that this task is assigned to this worker. // TODO: Store in the database that this task is assigned to this worker.
@ -59,14 +45,14 @@ func (ts *TaskScheduler) ScheduleTask(w *persistence.Worker) (*persistence.Task,
return task, err return task, err
} }
func (ts *TaskScheduler) findTaskForWorker(w *persistence.Worker) (*persistence.Task, error) { func (db *DB) findTaskForWorker(w *Worker) (*Task, error) {
logger := log.With().Str("worker", w.UUID).Logger() logger := log.With().Str("worker", w.UUID).Logger()
logger.Debug().Msg("finding task for worker") logger.Debug().Msg("finding task for worker")
task := persistence.Task{} task := Task{}
db := ts.db.GormDB() gormDB := db.GormDB()
tx := db.Debug(). tx := gormDB.Debug().
Model(&task). Model(&task).
Joins("left join jobs on tasks.job_id = jobs.id"). Joins("left join jobs on tasks.job_id = jobs.id").
Joins("left join task_dependencies on tasks.id = task_dependencies.task_id"). Joins("left join task_dependencies on tasks.id = task_dependencies.task_id").

View File

@ -1,5 +1,4 @@
// Package task_scheduler can choose which task to assign to a worker. package persistence
package task_scheduler
/* ***** BEGIN GPL LICENSE BLOCK ***** /* ***** BEGIN GPL LICENSE BLOCK *****
* *
@ -29,30 +28,27 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"gitlab.com/blender/flamenco-ng-poc/internal/manager/job_compilers" "gitlab.com/blender/flamenco-ng-poc/internal/manager/job_compilers"
"gitlab.com/blender/flamenco-ng-poc/internal/manager/persistence"
"gitlab.com/blender/flamenco-ng-poc/pkg/api" "gitlab.com/blender/flamenco-ng-poc/pkg/api"
) )
func TestNoTasks(t *testing.T) { func TestNoTasks(t *testing.T) {
db := persistence.CreateTestDB(t) db := CreateTestDB(t)
ts := NewTaskScheduler(db)
w := linuxWorker() w := linuxWorker()
task, err := ts.ScheduleTask(&w) task, err := db.ScheduleTask(&w)
assert.Nil(t, task) assert.Nil(t, task)
assert.NoError(t, err) assert.NoError(t, err)
} }
func TestOneJobOneTask(t *testing.T) { func TestOneJobOneTask(t *testing.T) {
db := persistence.CreateTestDB(t) db := CreateTestDB(t)
ts := NewTaskScheduler(db)
w := linuxWorker() w := linuxWorker()
authTask := authorTestTask("the task", "blender-render") authTask := authorTestTask("the task", "blender-render")
atj := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask) atj := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask)
job := constructTestJob(t, db, atj) job := constructTestJob(t, db, atj)
task, err := ts.ScheduleTask(&w) task, err := db.ScheduleTask(&w)
assert.NoError(t, err) assert.NoError(t, err)
if task == nil { if task == nil {
t.Fatal("task is nil") t.Fatal("task is nil")
@ -61,8 +57,7 @@ func TestOneJobOneTask(t *testing.T) {
} }
func TestOneJobThreeTasksByPrio(t *testing.T) { func TestOneJobThreeTasksByPrio(t *testing.T) {
db := persistence.CreateTestDB(t) db := CreateTestDB(t)
ts := NewTaskScheduler(db)
w := linuxWorker() w := linuxWorker()
att1 := authorTestTask("1 low-prio task", "blender-render") att1 := authorTestTask("1 low-prio task", "blender-render")
@ -76,7 +71,7 @@ func TestOneJobThreeTasksByPrio(t *testing.T) {
job := constructTestJob(t, db, atj) job := constructTestJob(t, db, atj)
task, err := ts.ScheduleTask(&w) task, err := db.ScheduleTask(&w)
assert.NoError(t, err) assert.NoError(t, err)
if task == nil { if task == nil {
t.Fatal("task is nil") t.Fatal("task is nil")
@ -91,8 +86,7 @@ func TestOneJobThreeTasksByPrio(t *testing.T) {
} }
func TestOneJobThreeTasksByDependencies(t *testing.T) { func TestOneJobThreeTasksByDependencies(t *testing.T) {
db := persistence.CreateTestDB(t) db := CreateTestDB(t)
ts := NewTaskScheduler(db)
w := linuxWorker() w := linuxWorker()
att1 := authorTestTask("1 low-prio task", "blender-render") att1 := authorTestTask("1 low-prio task", "blender-render")
@ -106,7 +100,7 @@ func TestOneJobThreeTasksByDependencies(t *testing.T) {
att1, att2, att3) att1, att2, att3)
job := constructTestJob(t, db, atj) job := constructTestJob(t, db, atj)
task, err := ts.ScheduleTask(&w) task, err := db.ScheduleTask(&w)
assert.NoError(t, err) assert.NoError(t, err)
if task == nil { if task == nil {
t.Fatal("task is nil") t.Fatal("task is nil")
@ -116,8 +110,7 @@ func TestOneJobThreeTasksByDependencies(t *testing.T) {
} }
func TestTwoJobsThreeTasks(t *testing.T) { func TestTwoJobsThreeTasks(t *testing.T) {
db := persistence.CreateTestDB(t) db := CreateTestDB(t)
ts := NewTaskScheduler(db)
w := linuxWorker() w := linuxWorker()
att1_1 := authorTestTask("1.1 low-prio task", "blender-render") att1_1 := authorTestTask("1.1 low-prio task", "blender-render")
@ -145,7 +138,7 @@ func TestTwoJobsThreeTasks(t *testing.T) {
constructTestJob(t, db, atj1) constructTestJob(t, db, atj1)
job2 := constructTestJob(t, db, atj2) job2 := constructTestJob(t, db, atj2)
task, err := ts.ScheduleTask(&w) task, err := db.ScheduleTask(&w)
assert.NoError(t, err) assert.NoError(t, err)
if task == nil { if task == nil {
t.Fatal("task is nil") t.Fatal("task is nil")
@ -164,8 +157,8 @@ func TestTwoJobsThreeTasks(t *testing.T) {
// To test: variable replacement // To test: variable replacement
func constructTestJob( func constructTestJob(
t *testing.T, db *persistence.DB, authoredJob job_compilers.AuthoredJob, t *testing.T, db *DB, authoredJob job_compilers.AuthoredJob,
) *persistence.Job { ) *Job {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel() defer cancel()
@ -208,8 +201,8 @@ func authorTestTask(name, taskType string, dependencies ...*job_compilers.Author
return task return task
} }
func linuxWorker() persistence.Worker { func linuxWorker() Worker {
w := persistence.Worker{ w := Worker{
UUID: "b13b8322-3e96-41c3-940a-3d581008a5f8", UUID: "b13b8322-3e96-41c3-940a-3d581008a5f8",
Name: "Linux", Name: "Linux",
Platform: "linux", Platform: "linux",