diff --git a/internal/manager/persistence/task_scheduler.go b/internal/manager/persistence/task_scheduler.go index 900738b1..cc5c714c 100644 --- a/internal/manager/persistence/task_scheduler.go +++ b/internal/manager/persistence/task_scheduler.go @@ -111,6 +111,11 @@ func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) { Where("tasks2.id = tasks.id"). Where("dep.status is not NULL and dep.status != ?", api.TaskStatusCompleted) + blockedTaskTypesQuery := tx.Model(&JobBlock{}). + Select("job_blocks.task_type"). + Where("job_blocks.worker_id = ?", w.ID). + Where("job_blocks.job_id = jobs.id") + // Note that this query doesn't check for the assigned worker. Tasks that have // a 'schedulable' status might have been assigned to a worker, representing // the last worker to touch it -- it's not meant to indicate "ownership" of @@ -119,14 +124,14 @@ func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) { Model(&task). Joins("left join jobs on tasks.job_id = jobs.id"). Joins("left join task_failures TF on tasks.id = TF.task_id and TF.worker_id=?", w.ID). - Where("tasks.status in ?", schedulableTaskStatuses). // Schedulable task statuses - Where("jobs.status in ?", schedulableJobStatuses). // Schedulable job statuses - Where("tasks.type in ?", w.TaskTypes()). // Supported task types - Where("tasks.id not in (?)", incompleteDepsQuery). // Dependencies completed - Where("TF.worker_id is NULL"). // Not failed before - // TODO: Non-blocklisted - Order("jobs.priority desc"). // Highest job priority - Order("tasks.priority desc"). // Highest task priority + Where("tasks.status in ?", schedulableTaskStatuses). // Schedulable task statuses + Where("jobs.status in ?", schedulableJobStatuses). // Schedulable job statuses + Where("tasks.type in ?", w.TaskTypes()). // Supported task types + Where("tasks.id not in (?)", incompleteDepsQuery). // Dependencies completed + Where("TF.worker_id is NULL"). // Not failed before + Where("tasks.type not in (?)", blockedTaskTypesQuery). // Non-blocklisted + Order("jobs.priority desc"). // Highest job priority + Order("tasks.priority desc"). // Highest task priority Limit(1). Preload("Job"). Find(&task) diff --git a/internal/manager/persistence/task_scheduler_test.go b/internal/manager/persistence/task_scheduler_test.go index 5732a495..c9fc1de9 100644 --- a/internal/manager/persistence/task_scheduler_test.go +++ b/internal/manager/persistence/task_scheduler_test.go @@ -289,6 +289,33 @@ func TestPreviouslyFailed(t *testing.T) { assert.Equal(t, att2.Name, task.Name, "the second task should have been chosen") } +func TestBlocklisted(t *testing.T) { + ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) + defer cancel() + + w := linuxWorker(t, db) + + att1 := authorTestTask("1 blocked task", "blender") + att2 := authorTestTask("2 expected task", "ffmpeg") + atj := authorTestJob( + "1295757b-e668-4c49-8b89-f73db8270e42", + "simple-blender-render", + att1, att2) + job := constructTestJob(ctx, t, db, atj) + + // Mimick that this worker was already blocked for 'blender' tasks of this job. + err := db.AddWorkerToJobBlocklist(ctx, job, &w, "blender") + assert.NoError(t, err) + + // This should assign the 2nd task. + task, err := db.ScheduleTask(ctx, &w) + assert.NoError(t, err) + if task == nil { + t.Fatal("task is nil") + } + assert.Equal(t, att2.Name, task.Name, "the second task should have been chosen") +} + // To test: blocklists // To test: variable replacement