Correctly handle workers assigned to tasks + simple task updates

This commit is contained in:
Sybren A. Stüvel 2022-02-17 17:30:52 +01:00
parent 31e39e2137
commit 399c8af750
4 changed files with 67 additions and 51 deletions

View File

@ -154,15 +154,13 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error {
logger.Warn().Err(err).Msg("bad request received") logger.Warn().Err(err).Msg("bad request received")
return sendAPIError(e, http.StatusBadRequest, "invalid format") return sendAPIError(e, http.StatusBadRequest, "invalid format")
} }
if dbTask.Worker == nil { if dbTask.WorkerID == nil {
logger.Warn(). logger.Warn().
Msg("worker trying to update task that's not assigned to any worker") Msg("worker trying to update task that's not assigned to any worker")
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID) return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID)
} }
if dbTask.Worker.UUID != worker.UUID { if *dbTask.WorkerID != worker.ID {
logger.Warn(). logger.Warn().Msg("worker trying to update task that's assigned to another worker")
Str("assignedWorkerID", dbTask.Worker.UUID).
Msg("worker trying to update task that's assigned to another worker")
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID) return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID)
} }
@ -194,7 +192,7 @@ func (f *Flamenco) doTaskUpdate(
} }
if update.Activity != nil { if update.Activity != nil {
dbTask.Worker.LastActivity = *update.Activity dbTask.Activity = *update.Activity
} }
if update.Log != nil { if update.Log != nil {

View File

@ -67,6 +67,7 @@ type Task struct {
Dependencies []*Task `gorm:"many2many:task_dependencies;constraint:OnDelete:CASCADE"` Dependencies []*Task `gorm:"many2many:task_dependencies;constraint:OnDelete:CASCADE"`
Commands Commands `gorm:"type:jsonb"` Commands Commands `gorm:"type:jsonb"`
Activity string `gorm:"type:varchar(255);not null;default:\"\""`
} }
type Commands []Command type Commands []Command

View File

@ -21,7 +21,6 @@ package persistence
* ***** END GPL LICENSE BLOCK ***** */ * ***** END GPL LICENSE BLOCK ***** */
import ( import (
"errors"
"fmt" "fmt"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -30,79 +29,91 @@ import (
) )
var ( var (
schedulableTaskStatuses = []api.TaskStatus{api.TaskStatusQueued, api.TaskStatusSoftFailed} schedulableTaskStatuses = []api.TaskStatus{api.TaskStatusQueued, api.TaskStatusSoftFailed, api.TaskStatusActive}
completedTaskStatuses = []api.TaskStatus{api.TaskStatusCompleted} completedTaskStatuses = []api.TaskStatus{api.TaskStatusCompleted}
schedulableJobStatuses = []api.JobStatus{api.JobStatusActive, api.JobStatusQueued, api.JobStatusRequeued} schedulableJobStatuses = []api.JobStatus{api.JobStatusActive, api.JobStatusQueued, api.JobStatusRequeued}
) )
// ScheduleTask finds a task to execute by the given worker. // ScheduleTask finds a task to execute by the given worker.
// If no task is available, (nil, nil) is returned, as this is not an error situation. // If no task is available, (nil, nil) is returned, as this is not an error situation.
// NOTE: this does not also fetch returnedTask.Worker, but returnedTask.WorkerID is set.
func (db *DB) ScheduleTask(w *Worker) (*Task, error) { func (db *DB) ScheduleTask(w *Worker) (*Task, error) {
task, err := db.findTaskForWorker(w)
// TODO: Mark the task as Active, and push the status change to whatever I think up to handle those changes.
// TODO: Store in the database that this task is assigned to this worker.
return task, err
}
func (db *DB) findTaskForWorker(w *Worker) (*Task, error) {
logger := log.With().Str("worker", w.UUID).Logger() logger := log.With().Str("worker", w.UUID).Logger()
logger.Debug().Msg("finding task for worker") logger.Debug().Msg("finding task for worker")
task := Task{}
// Run two queries in one transaction: // Run two queries in one transaction:
// 1. find task, and // 1. find task, and
// 2. assign the task to the worker. // 2. assign the task to the worker.
err := db.gormDB.Transaction(func(tx *gorm.DB) error { var task *Task
findTaskResult := tx.Debug(). txErr := db.gormDB.Transaction(func(tx *gorm.DB) error {
Model(&task). var err error
Joins("left join jobs on tasks.job_id = jobs.id"). task, err = findTaskForWorker(tx, w)
Joins("left join task_dependencies on tasks.id = task_dependencies.task_id"). if err == gorm.ErrRecordNotFound {
Joins("left join tasks as tdeps on tdeps.id = task_dependencies.dependency_id"). // Not finding a task is not an error.
Where("tasks.status in ?", schedulableTaskStatuses). // Schedulable task statuses return nil
Where("tdeps.status in ? or tdeps.status is NULL", completedTaskStatuses). // Dependencies completed }
Where("jobs.status in ?", schedulableJobStatuses). // Schedulable job statuses if err != nil {
Where("tasks.type in ?", w.TaskTypes()). // Supported task types return fmt.Errorf("error finding task for worker: %w", err)
Where("tasks.worker_id = ? or tasks.worker_id is NULL", w.ID). // assigned to this worker or not assigned at all
// TODO: Non-blacklisted
Order("jobs.priority desc"). // Highest job priority
Order("priority desc"). // Highest task priority
Limit(1).
Preload("Job").
First(&task)
if findTaskResult.Error != nil {
return findTaskResult.Error
} }
// Found a task, now assign it to the requesting worker. // Found a task, now assign it to the requesting worker.
// Without the Select() call, Gorm will try and also store task.Job in the jobs database, which is not what we want. // Without the Select() call, Gorm will try and also store task.Job in the jobs database, which is not what we want.
if err := tx.Debug().Model(&task).Select("worker_id").Updates(Task{WorkerID: &w.ID}).Error; err != nil { if err := assignTaskToWorker(tx, w, task); err != nil {
logger.Warn(). logger.Warn().
Str("taskID", task.UUID). Str("taskID", task.UUID).
Err(err). Err(err).
Msg("error assigning task to worker") Msg("error assigning task to worker")
return fmt.Errorf("error assigning task to worker: %v", err) return fmt.Errorf("error assigning task to worker: %w", err)
} }
return nil return nil
}) })
if err != nil { if txErr != nil {
if errors.Is(err, gorm.ErrRecordNotFound) { logger.Error().Err(txErr).Msg("error finding task for worker")
logger.Debug().Msg("no task for worker") return nil, fmt.Errorf("error finding task for worker: %w", txErr)
return nil, nil }
}
logger.Error().Err(err).Msg("error finding task for worker") if task == nil {
return nil, fmt.Errorf("error finding task for worker: %w", err) logger.Debug().Msg("no task for worker")
return nil, nil
} }
logger.Info(). logger.Info().
Str("taskID", task.UUID). Str("taskID", task.UUID).
Msg("assigned task to worker") Msg("assigned task to worker")
return task, nil
}
func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) {
task := Task{}
findTaskResult := tx.Debug().
Model(&task).
Joins("left join jobs on tasks.job_id = jobs.id").
Joins("left join task_dependencies on tasks.id = task_dependencies.task_id").
Joins("left join tasks as tdeps on tdeps.id = task_dependencies.dependency_id").
Where("tasks.status in ?", schedulableTaskStatuses). // Schedulable task statuses
Where("tdeps.status in ? or tdeps.status is NULL", completedTaskStatuses). // Dependencies completed
Where("jobs.status in ?", schedulableJobStatuses). // Schedulable job statuses
Where("tasks.type in ?", w.TaskTypes()). // Supported task types
Where("tasks.worker_id = ? or tasks.worker_id is NULL", w.ID). // assigned to this worker or not assigned at all
// TODO: Non-blacklisted
Order("jobs.priority desc"). // Highest job priority
Order("priority desc"). // Highest task priority
Limit(1).
Preload("Job").
First(&task)
if findTaskResult.Error != nil {
return nil, findTaskResult.Error
}
return &task, nil return &task, nil
} }
func assignTaskToWorker(tx *gorm.DB, w *Worker, t *Task) error {
// Without the Select() call, Gorm will try and also store task.Job in the
// jobs database, which is not what we want.
return tx.Debug().Model(t).Select("worker_id").Updates(Task{WorkerID: &w.ID}).Error
}

View File

@ -50,12 +50,18 @@ func TestOneJobOneTask(t *testing.T) {
task, err := db.ScheduleTask(&w) task, err := db.ScheduleTask(&w)
assert.NoError(t, err) assert.NoError(t, err)
// Check the returned task.
if task == nil { if task == nil {
t.Fatal("task is nil") t.Fatal("task is nil")
} }
assert.Equal(t, job.ID, task.JobID) assert.Equal(t, job.ID, task.JobID)
if task.WorkerID == nil {
t.Fatal("no worker assigned to task")
}
assert.Equal(t, w.ID, *task.WorkerID, "task must be assigned to the requesting worker")
// Test that the task has been assigned to this worker. // Check the task in the database.
dbTask, err := db.FetchTask(context.Background(), authTask.UUID) dbTask, err := db.FetchTask(context.Background(), authTask.UUID)
assert.NoError(t, err) assert.NoError(t, err)
if dbTask == nil { if dbTask == nil {
@ -64,7 +70,7 @@ func TestOneJobOneTask(t *testing.T) {
if dbTask.WorkerID == nil { if dbTask.WorkerID == nil {
t.Fatal("no worker assigned to task") t.Fatal("no worker assigned to task")
} }
assert.Equal(t, w.ID, *dbTask.WorkerID) assert.Equal(t, w.ID, *dbTask.WorkerID, "task must be assigned to the requesting worker")
} }
func TestOneJobThreeTasksByPrio(t *testing.T) { func TestOneJobThreeTasksByPrio(t *testing.T) {