Manager: use blocklist to actually block workers

Actually use the blocklist in the task scheduler to block workers from
doing blocked job types.
This commit is contained in:
Sybren A. Stüvel 2022-06-21 17:59:14 +02:00
parent 20a2092ddf
commit 87f1959e26
2 changed files with 40 additions and 8 deletions

View File

@ -111,6 +111,11 @@ func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) {
Where("tasks2.id = tasks.id"). Where("tasks2.id = tasks.id").
Where("dep.status is not NULL and dep.status != ?", api.TaskStatusCompleted) Where("dep.status is not NULL and dep.status != ?", api.TaskStatusCompleted)
blockedTaskTypesQuery := tx.Model(&JobBlock{}).
Select("job_blocks.task_type").
Where("job_blocks.worker_id = ?", w.ID).
Where("job_blocks.job_id = jobs.id")
// Note that this query doesn't check for the assigned worker. Tasks that have // Note that this query doesn't check for the assigned worker. Tasks that have
// a 'schedulable' status might have been assigned to a worker, representing // a 'schedulable' status might have been assigned to a worker, representing
// the last worker to touch it -- it's not meant to indicate "ownership" of // the last worker to touch it -- it's not meant to indicate "ownership" of
@ -119,14 +124,14 @@ func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) {
Model(&task). Model(&task).
Joins("left join jobs on tasks.job_id = jobs.id"). Joins("left join jobs on tasks.job_id = jobs.id").
Joins("left join task_failures TF on tasks.id = TF.task_id and TF.worker_id=?", w.ID). Joins("left join task_failures TF on tasks.id = TF.task_id and TF.worker_id=?", w.ID).
Where("tasks.status in ?", schedulableTaskStatuses). // Schedulable task statuses Where("tasks.status in ?", schedulableTaskStatuses). // Schedulable task statuses
Where("jobs.status in ?", schedulableJobStatuses). // Schedulable job statuses Where("jobs.status in ?", schedulableJobStatuses). // Schedulable job statuses
Where("tasks.type in ?", w.TaskTypes()). // Supported task types Where("tasks.type in ?", w.TaskTypes()). // Supported task types
Where("tasks.id not in (?)", incompleteDepsQuery). // Dependencies completed Where("tasks.id not in (?)", incompleteDepsQuery). // Dependencies completed
Where("TF.worker_id is NULL"). // Not failed before Where("TF.worker_id is NULL"). // Not failed before
// TODO: Non-blocklisted Where("tasks.type not in (?)", blockedTaskTypesQuery). // Non-blocklisted
Order("jobs.priority desc"). // Highest job priority Order("jobs.priority desc"). // Highest job priority
Order("tasks.priority desc"). // Highest task priority Order("tasks.priority desc"). // Highest task priority
Limit(1). Limit(1).
Preload("Job"). Preload("Job").
Find(&task) Find(&task)

View File

@ -289,6 +289,33 @@ func TestPreviouslyFailed(t *testing.T) {
assert.Equal(t, att2.Name, task.Name, "the second task should have been chosen") assert.Equal(t, att2.Name, task.Name, "the second task should have been chosen")
} }
func TestBlocklisted(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
att1 := authorTestTask("1 blocked task", "blender")
att2 := authorTestTask("2 expected task", "ffmpeg")
atj := authorTestJob(
"1295757b-e668-4c49-8b89-f73db8270e42",
"simple-blender-render",
att1, att2)
job := constructTestJob(ctx, t, db, atj)
// Mimick that this worker was already blocked for 'blender' tasks of this job.
err := db.AddWorkerToJobBlocklist(ctx, job, &w, "blender")
assert.NoError(t, err)
// This should assign the 2nd task.
task, err := db.ScheduleTask(ctx, &w)
assert.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
assert.Equal(t, att2.Name, task.Name, "the second task should have been chosen")
}
// To test: blocklists // To test: blocklists
// To test: variable replacement // To test: variable replacement