diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index 4f3a7e49..fa671b6c 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -154,15 +154,13 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error { logger.Warn().Err(err).Msg("bad request received") return sendAPIError(e, http.StatusBadRequest, "invalid format") } - if dbTask.Worker == nil { + if dbTask.WorkerID == nil { logger.Warn(). Msg("worker trying to update task that's not assigned to any worker") return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID) } - if dbTask.Worker.UUID != worker.UUID { - logger.Warn(). - Str("assignedWorkerID", dbTask.Worker.UUID). - Msg("worker trying to update task that's assigned to another worker") + if *dbTask.WorkerID != worker.ID { + logger.Warn().Msg("worker trying to update task that's assigned to another worker") return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID) } @@ -194,7 +192,7 @@ func (f *Flamenco) doTaskUpdate( } if update.Activity != nil { - dbTask.Worker.LastActivity = *update.Activity + dbTask.Activity = *update.Activity } if update.Log != nil { diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index c6294826..eb184292 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -67,6 +67,7 @@ type Task struct { Dependencies []*Task `gorm:"many2many:task_dependencies;constraint:OnDelete:CASCADE"` Commands Commands `gorm:"type:jsonb"` + Activity string `gorm:"type:varchar(255);not null;default:\"\""` } type Commands []Command diff --git a/internal/manager/persistence/task_scheduler.go b/internal/manager/persistence/task_scheduler.go index 39c8c391..3971362d 100644 --- a/internal/manager/persistence/task_scheduler.go +++ b/internal/manager/persistence/task_scheduler.go @@ -21,7 +21,6 @@ package persistence * ***** END GPL LICENSE BLOCK ***** */ import ( - "errors" "fmt" "github.com/rs/zerolog/log" @@ -30,79 +29,91 @@ import ( ) var ( - schedulableTaskStatuses = []api.TaskStatus{api.TaskStatusQueued, api.TaskStatusSoftFailed} + schedulableTaskStatuses = []api.TaskStatus{api.TaskStatusQueued, api.TaskStatusSoftFailed, api.TaskStatusActive} completedTaskStatuses = []api.TaskStatus{api.TaskStatusCompleted} schedulableJobStatuses = []api.JobStatus{api.JobStatusActive, api.JobStatusQueued, api.JobStatusRequeued} ) // ScheduleTask finds a task to execute by the given worker. // If no task is available, (nil, nil) is returned, as this is not an error situation. +// NOTE: this does not also fetch returnedTask.Worker, but returnedTask.WorkerID is set. func (db *DB) ScheduleTask(w *Worker) (*Task, error) { - task, err := db.findTaskForWorker(w) - - // TODO: Mark the task as Active, and push the status change to whatever I think up to handle those changes. - // TODO: Store in the database that this task is assigned to this worker. - - return task, err -} - -func (db *DB) findTaskForWorker(w *Worker) (*Task, error) { - logger := log.With().Str("worker", w.UUID).Logger() logger.Debug().Msg("finding task for worker") - task := Task{} - // Run two queries in one transaction: // 1. find task, and // 2. assign the task to the worker. - err := db.gormDB.Transaction(func(tx *gorm.DB) error { - findTaskResult := tx.Debug(). - Model(&task). - Joins("left join jobs on tasks.job_id = jobs.id"). - Joins("left join task_dependencies on tasks.id = task_dependencies.task_id"). - Joins("left join tasks as tdeps on tdeps.id = task_dependencies.dependency_id"). - Where("tasks.status in ?", schedulableTaskStatuses). // Schedulable task statuses - Where("tdeps.status in ? or tdeps.status is NULL", completedTaskStatuses). // Dependencies completed - Where("jobs.status in ?", schedulableJobStatuses). // Schedulable job statuses - Where("tasks.type in ?", w.TaskTypes()). // Supported task types - Where("tasks.worker_id = ? or tasks.worker_id is NULL", w.ID). // assigned to this worker or not assigned at all - // TODO: Non-blacklisted - Order("jobs.priority desc"). // Highest job priority - Order("priority desc"). // Highest task priority - Limit(1). - Preload("Job"). - First(&task) - - if findTaskResult.Error != nil { - return findTaskResult.Error + var task *Task + txErr := db.gormDB.Transaction(func(tx *gorm.DB) error { + var err error + task, err = findTaskForWorker(tx, w) + if err == gorm.ErrRecordNotFound { + // Not finding a task is not an error. + return nil + } + if err != nil { + return fmt.Errorf("error finding task for worker: %w", err) } // Found a task, now assign it to the requesting worker. // Without the Select() call, Gorm will try and also store task.Job in the jobs database, which is not what we want. - if err := tx.Debug().Model(&task).Select("worker_id").Updates(Task{WorkerID: &w.ID}).Error; err != nil { + if err := assignTaskToWorker(tx, w, task); err != nil { logger.Warn(). Str("taskID", task.UUID). Err(err). Msg("error assigning task to worker") - return fmt.Errorf("error assigning task to worker: %v", err) + return fmt.Errorf("error assigning task to worker: %w", err) } return nil }) - if err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - logger.Debug().Msg("no task for worker") - return nil, nil - } - logger.Error().Err(err).Msg("error finding task for worker") - return nil, fmt.Errorf("error finding task for worker: %w", err) + if txErr != nil { + logger.Error().Err(txErr).Msg("error finding task for worker") + return nil, fmt.Errorf("error finding task for worker: %w", txErr) + } + + if task == nil { + logger.Debug().Msg("no task for worker") + return nil, nil } logger.Info(). Str("taskID", task.UUID). Msg("assigned task to worker") + return task, nil +} + +func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) { + task := Task{} + findTaskResult := tx.Debug(). + Model(&task). + Joins("left join jobs on tasks.job_id = jobs.id"). + Joins("left join task_dependencies on tasks.id = task_dependencies.task_id"). + Joins("left join tasks as tdeps on tdeps.id = task_dependencies.dependency_id"). + Where("tasks.status in ?", schedulableTaskStatuses). // Schedulable task statuses + Where("tdeps.status in ? or tdeps.status is NULL", completedTaskStatuses). // Dependencies completed + Where("jobs.status in ?", schedulableJobStatuses). // Schedulable job statuses + Where("tasks.type in ?", w.TaskTypes()). // Supported task types + Where("tasks.worker_id = ? or tasks.worker_id is NULL", w.ID). // assigned to this worker or not assigned at all + // TODO: Non-blacklisted + Order("jobs.priority desc"). // Highest job priority + Order("priority desc"). // Highest task priority + Limit(1). + Preload("Job"). + First(&task) + + if findTaskResult.Error != nil { + return nil, findTaskResult.Error + } + return &task, nil } + +func assignTaskToWorker(tx *gorm.DB, w *Worker, t *Task) error { + // Without the Select() call, Gorm will try and also store task.Job in the + // jobs database, which is not what we want. + return tx.Debug().Model(t).Select("worker_id").Updates(Task{WorkerID: &w.ID}).Error +} diff --git a/internal/manager/persistence/task_scheduler_test.go b/internal/manager/persistence/task_scheduler_test.go index 1a5db628..3ce0286e 100644 --- a/internal/manager/persistence/task_scheduler_test.go +++ b/internal/manager/persistence/task_scheduler_test.go @@ -50,12 +50,18 @@ func TestOneJobOneTask(t *testing.T) { task, err := db.ScheduleTask(&w) assert.NoError(t, err) + + // Check the returned task. if task == nil { t.Fatal("task is nil") } assert.Equal(t, job.ID, task.JobID) + if task.WorkerID == nil { + t.Fatal("no worker assigned to task") + } + assert.Equal(t, w.ID, *task.WorkerID, "task must be assigned to the requesting worker") - // Test that the task has been assigned to this worker. + // Check the task in the database. dbTask, err := db.FetchTask(context.Background(), authTask.UUID) assert.NoError(t, err) if dbTask == nil { @@ -64,7 +70,7 @@ func TestOneJobOneTask(t *testing.T) { if dbTask.WorkerID == nil { t.Fatal("no worker assigned to task") } - assert.Equal(t, w.ID, *dbTask.WorkerID) + assert.Equal(t, w.ID, *dbTask.WorkerID, "task must be assigned to the requesting worker") } func TestOneJobThreeTasksByPrio(t *testing.T) {