diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 7ce1861d..0d7103db 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -42,6 +42,7 @@ type PersistenceService interface { CreateWorker(ctx context.Context, w *persistence.Worker) error FetchWorker(ctx context.Context, uuid string) (*persistence.Worker, error) FetchWorkers(ctx context.Context) ([]*persistence.Worker, error) + FetchWorkerTask(context.Context, *persistence.Worker) (*persistence.Task, error) SaveWorker(ctx context.Context, w *persistence.Worker) error SaveWorkerStatus(ctx context.Context, w *persistence.Worker) error WorkerSeen(ctx context.Context, w *persistence.Worker) error diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 9240895e..8d7ef3f5 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -216,6 +216,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchWorker(arg0, arg1 interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorker", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorker), arg0, arg1) } +// FetchWorkerTask mocks base method. +func (m *MockPersistenceService) FetchWorkerTask(arg0 context.Context, arg1 *persistence.Worker) (*persistence.Task, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchWorkerTask", arg0, arg1) + ret0, _ := ret[0].(*persistence.Task) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchWorkerTask indicates an expected call of FetchWorkerTask. +func (mr *MockPersistenceServiceMockRecorder) FetchWorkerTask(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerTask", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkerTask), arg0, arg1) +} + // FetchWorkers mocks base method. func (m *MockPersistenceService) FetchWorkers(arg0 context.Context) ([]*persistence.Worker, error) { m.ctrl.T.Helper() diff --git a/internal/manager/api_impl/worker_mgt.go b/internal/manager/api_impl/worker_mgt.go index 9c004b1c..933c2437 100644 --- a/internal/manager/api_impl/worker_mgt.go +++ b/internal/manager/api_impl/worker_mgt.go @@ -40,7 +40,8 @@ func (f *Flamenco) FetchWorker(e echo.Context, workerUUID string) error { return sendAPIError(e, http.StatusBadRequest, "not a valid UUID") } - dbWorker, err := f.persist.FetchWorker(e.Request().Context(), workerUUID) + ctx := e.Request().Context() + dbWorker, err := f.persist.FetchWorker(ctx, workerUUID) if errors.Is(err, persistence.ErrWorkerNotFound) { logger.Debug().Msg("non-existent worker requested") return sendAPIError(e, http.StatusNotFound, "worker %q not found", workerUUID) @@ -50,8 +51,23 @@ func (f *Flamenco) FetchWorker(e echo.Context, workerUUID string) error { return sendAPIError(e, http.StatusInternalServerError, "error fetching worker: %v", err) } + dbTask, err := f.persist.FetchWorkerTask(ctx, dbWorker) + if err != nil { + logger.Error().Err(err).Msg("error fetching task assigned to worker") + return sendAPIError(e, http.StatusInternalServerError, "error fetching task assigned to worker: %v", err) + } + logger.Debug().Msg("fetched worker") apiWorker := workerDBtoAPI(*dbWorker) + + if dbTask != nil { + apiWorkerTask := api.WorkerTask{ + TaskSummary: taskDBtoSummary(dbTask), + JobId: dbTask.Job.UUID, + } + apiWorker.Task = &apiWorkerTask + } + return e.JSON(http.StatusOK, apiWorker) } diff --git a/internal/manager/persistence/task_scheduler.go b/internal/manager/persistence/task_scheduler.go index 15e6af7f..c1d603cc 100644 --- a/internal/manager/persistence/task_scheduler.go +++ b/internal/manager/persistence/task_scheduler.go @@ -85,13 +85,7 @@ func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) { // If a task is alreay active & assigned to this worker, return just that. // Note that this task type could be blocklisted or no longer supported by the // Worker, but since it's active that is unlikely. - assignedTaskResult := tx. - Model(&task). - Joins("left join jobs on tasks.job_id = jobs.id"). - Where("tasks.status = ?", api.TaskStatusActive). - Where("jobs.status in ?", schedulableJobStatuses). - Where("tasks.worker_id = ?", w.ID). // assigned to this worker - Limit(1). + assignedTaskResult := taskAssignedAndRunnableQuery(tx.Model(&task), w). Preload("Job"). Find(&task) if assignedTaskResult.Error != nil { @@ -152,3 +146,14 @@ func assignTaskToWorker(tx *gorm.DB, w *Worker, t *Task) error { Select("WorkerID", "LastTouchedAt"). Updates(Task{WorkerID: &w.ID, LastTouchedAt: tx.NowFunc()}).Error } + +// taskAssignedAndRunnableQuery appends some GORM clauses to query for a task +// that's already assigned to this worker, and is in a runnable state. +func taskAssignedAndRunnableQuery(tx *gorm.DB, w *Worker) *gorm.DB { + return tx. + Joins("left join jobs on tasks.job_id = jobs.id"). + Where("tasks.status = ?", api.TaskStatusActive). + Where("jobs.status in ?", schedulableJobStatuses). + Where("tasks.worker_id = ?", w.ID). // assigned to this worker + Limit(1) +} diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index eba990e8..ccfad354 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -80,6 +80,42 @@ func (db *DB) FetchWorkers(ctx context.Context) ([]*Worker, error) { return workers, nil } +// FetchWorkerTask returns the most recent task assigned to the given Worker. +func (db *DB) FetchWorkerTask(ctx context.Context, worker *Worker) (*Task, error) { + task := Task{} + + // See if there is a task assigned to this worker in the same way that the + // task scheduler does. + query := db.gormDB.WithContext(ctx) + query = taskAssignedAndRunnableQuery(query, worker) + tx := query. + Order("tasks.updated_at"). + Preload("Job"). + Find(&task) + if tx.Error != nil { + return nil, taskError(tx.Error, "fetching task assigned to Worker %s", worker.UUID) + } + if task.ID != 0 { + // Found a task! + return &task, nil + } + + // If not found, just find the last-modified task associated with this Worker. + tx = db.gormDB.WithContext(ctx). + Where("worker_id = ?", worker.ID). + Order("tasks.updated_at DESC"). + Preload("Job"). + Find(&task) + if tx.Error != nil { + return nil, taskError(tx.Error, "fetching task assigned to Worker %s", worker.UUID) + } + if task.ID == 0 { + return nil, nil + } + + return &task, nil +} + func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error { err := db.gormDB.WithContext(ctx). Model(w). diff --git a/internal/manager/persistence/workers_test.go b/internal/manager/persistence/workers_test.go index 98e9cdd6..d422b232 100644 --- a/internal/manager/persistence/workers_test.go +++ b/internal/manager/persistence/workers_test.go @@ -51,6 +51,91 @@ func TestCreateFetchWorker(t *testing.T) { assert.EqualValues(t, w.SupportedTaskTypes, fetchedWorker.SupportedTaskTypes) } +func TestFetchWorkerTask(t *testing.T) { + ctx, cancel, db := persistenceTestFixtures(t, 10000*time.Second) + defer cancel() + + // Worker without task. + w := Worker{ + UUID: uuid.New(), + Name: "дрон", + Address: "fe80::5054:ff:fede:2ad7", + Platform: "linux", + Software: "3.0", + Status: api.WorkerStatusAwake, + SupportedTaskTypes: "blender,ffmpeg,file-management", + } + + err := db.CreateWorker(ctx, &w) + if !assert.NoError(t, err) { + t.FailNow() + } + + { // Test without any task assigned. + task, err := db.FetchWorkerTask(ctx, &w) + if assert.NoError(t, err) { + assert.Nil(t, task) + } + } + + // Create a job with tasks. + authTask1 := authorTestTask("the task", "blender") + authTask2 := authorTestTask("the other task", "blender") + jobUUID := "b6a1d859-122f-4791-8b78-b943329a9989" + atj := authorTestJob(jobUUID, "simple-blender-render", authTask1, authTask2) + constructTestJob(ctx, t, db, atj) + + assignedTask, err := db.ScheduleTask(ctx, &w) + assert.NoError(t, err) + + { // Assigned task should be returned. + foundTask, err := db.FetchWorkerTask(ctx, &w) + if assert.NoError(t, err) && assert.NotNil(t, foundTask) { + assert.Equal(t, assignedTask.UUID, foundTask.UUID) + assert.Equal(t, jobUUID, foundTask.Job.UUID, "the job UUID should be returned as well") + } + } + + // Set the task to 'completed'. + assignedTask.Status = api.TaskStatusCompleted + assert.NoError(t, db.SaveTaskStatus(ctx, assignedTask)) + + { // Completed-but-last-assigned task should be returned. + foundTask, err := db.FetchWorkerTask(ctx, &w) + if assert.NoError(t, err) && assert.NotNil(t, foundTask) { + assert.Equal(t, assignedTask.UUID, foundTask.UUID) + assert.Equal(t, jobUUID, foundTask.Job.UUID, "the job UUID should be returned as well") + } + } + + // Assign another task. + newlyAssignedTask, err := db.ScheduleTask(ctx, &w) + if !assert.NoError(t, err) || !assert.NotNil(t, newlyAssignedTask) { + t.FailNow() + } + + { // Newly assigned task should be returned. + foundTask, err := db.FetchWorkerTask(ctx, &w) + if assert.NoError(t, err) && assert.NotNil(t, foundTask) { + assert.Equal(t, newlyAssignedTask.UUID, foundTask.UUID) + assert.Equal(t, jobUUID, foundTask.Job.UUID, "the job UUID should be returned as well") + } + } + + // Set the new task to 'completed'. + newlyAssignedTask.Status = api.TaskStatusCompleted + assert.NoError(t, db.SaveTaskStatus(ctx, newlyAssignedTask)) + + { // Completed-but-last-assigned task should be returned. + foundTask, err := db.FetchWorkerTask(ctx, &w) + if assert.NoError(t, err) && assert.NotNil(t, foundTask) { + assert.Equal(t, newlyAssignedTask.UUID, foundTask.UUID) + assert.Equal(t, jobUUID, foundTask.Job.UUID, "the job UUID should be returned as well") + } + } + +} + func TestSaveWorker(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, 1*time.Second) defer cancel() diff --git a/web/app/src/components/WorkerTaskLink.vue b/web/app/src/components/WorkerTaskLink.vue new file mode 100644 index 00000000..229b017f --- /dev/null +++ b/web/app/src/components/WorkerTaskLink.vue @@ -0,0 +1,18 @@ + + + + + diff --git a/web/app/src/components/workers/WorkerDetails.vue b/web/app/src/components/workers/WorkerDetails.vue index 875422af..9bf37f2f 100644 --- a/web/app/src/components/workers/WorkerDetails.vue +++ b/web/app/src/components/workers/WorkerDetails.vue @@ -27,6 +27,11 @@
Task Types
{{ workerData.supported_task_types.join(', ') }}
+ +
Last Task
+
+ +
@@ -40,11 +45,15 @@ import * as datetime from "@/datetime"; import { WorkerMgtApi } from '@/manager-api'; import { apiClient } from '@/stores/api-query-count'; import { workerStatus } from "../../statusindicator"; +import WorkerTaskLink from '@/components/WorkerTaskLink.vue'; export default { props: [ "workerData", // Worker data to show. ], + components: { + WorkerTaskLink, + }, data() { return { datetime: datetime, // So that the template can access it. @@ -58,6 +67,7 @@ export default { }, watch: { workerData(newData) { + console.log("new data:", plain(newData)); if (newData) this.workerStatusHTML = workerStatus(newData); else @@ -74,7 +84,7 @@ export default {