From 736ca103c3d7f37557ed541ca70117bc95bef932 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Tue, 26 Jul 2022 10:36:02 +0200 Subject: [PATCH] Manager: show current/last task in worker details The Task details component already linked to the Worker it was assigned to last, and now the Worker links back to the task. There's only one task shown in the Worker details. If the Worker is actively working on a task, that one's shown. Otherwise it's the last-updated task that was assigned to the worker. --- internal/manager/api_impl/interfaces.go | 1 + .../api_impl/mocks/api_impl_mock.gen.go | 15 ++++ internal/manager/api_impl/worker_mgt.go | 18 +++- .../manager/persistence/task_scheduler.go | 19 +++-- internal/manager/persistence/workers.go | 36 ++++++++ internal/manager/persistence/workers_test.go | 85 +++++++++++++++++++ web/app/src/components/WorkerTaskLink.vue | 18 ++++ .../src/components/workers/WorkerDetails.vue | 12 ++- 8 files changed, 195 insertions(+), 9 deletions(-) create mode 100644 web/app/src/components/WorkerTaskLink.vue diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 7ce1861d..0d7103db 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -42,6 +42,7 @@ type PersistenceService interface { CreateWorker(ctx context.Context, w *persistence.Worker) error FetchWorker(ctx context.Context, uuid string) (*persistence.Worker, error) FetchWorkers(ctx context.Context) ([]*persistence.Worker, error) + FetchWorkerTask(context.Context, *persistence.Worker) (*persistence.Task, error) SaveWorker(ctx context.Context, w *persistence.Worker) error SaveWorkerStatus(ctx context.Context, w *persistence.Worker) error WorkerSeen(ctx context.Context, w *persistence.Worker) error diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 9240895e..8d7ef3f5 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -216,6 +216,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchWorker(arg0, arg1 interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorker", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorker), arg0, arg1) } +// FetchWorkerTask mocks base method. +func (m *MockPersistenceService) FetchWorkerTask(arg0 context.Context, arg1 *persistence.Worker) (*persistence.Task, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchWorkerTask", arg0, arg1) + ret0, _ := ret[0].(*persistence.Task) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchWorkerTask indicates an expected call of FetchWorkerTask. +func (mr *MockPersistenceServiceMockRecorder) FetchWorkerTask(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerTask", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkerTask), arg0, arg1) +} + // FetchWorkers mocks base method. func (m *MockPersistenceService) FetchWorkers(arg0 context.Context) ([]*persistence.Worker, error) { m.ctrl.T.Helper() diff --git a/internal/manager/api_impl/worker_mgt.go b/internal/manager/api_impl/worker_mgt.go index 9c004b1c..933c2437 100644 --- a/internal/manager/api_impl/worker_mgt.go +++ b/internal/manager/api_impl/worker_mgt.go @@ -40,7 +40,8 @@ func (f *Flamenco) FetchWorker(e echo.Context, workerUUID string) error { return sendAPIError(e, http.StatusBadRequest, "not a valid UUID") } - dbWorker, err := f.persist.FetchWorker(e.Request().Context(), workerUUID) + ctx := e.Request().Context() + dbWorker, err := f.persist.FetchWorker(ctx, workerUUID) if errors.Is(err, persistence.ErrWorkerNotFound) { logger.Debug().Msg("non-existent worker requested") return sendAPIError(e, http.StatusNotFound, "worker %q not found", workerUUID) @@ -50,8 +51,23 @@ func (f *Flamenco) FetchWorker(e echo.Context, workerUUID string) error { return sendAPIError(e, http.StatusInternalServerError, "error fetching worker: %v", err) } + dbTask, err := f.persist.FetchWorkerTask(ctx, dbWorker) + if err != nil { + logger.Error().Err(err).Msg("error fetching task assigned to worker") + return sendAPIError(e, http.StatusInternalServerError, "error fetching task assigned to worker: %v", err) + } + logger.Debug().Msg("fetched worker") apiWorker := workerDBtoAPI(*dbWorker) + + if dbTask != nil { + apiWorkerTask := api.WorkerTask{ + TaskSummary: taskDBtoSummary(dbTask), + JobId: dbTask.Job.UUID, + } + apiWorker.Task = &apiWorkerTask + } + return e.JSON(http.StatusOK, apiWorker) } diff --git a/internal/manager/persistence/task_scheduler.go b/internal/manager/persistence/task_scheduler.go index 15e6af7f..c1d603cc 100644 --- a/internal/manager/persistence/task_scheduler.go +++ b/internal/manager/persistence/task_scheduler.go @@ -85,13 +85,7 @@ func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) { // If a task is alreay active & assigned to this worker, return just that. // Note that this task type could be blocklisted or no longer supported by the // Worker, but since it's active that is unlikely. - assignedTaskResult := tx. - Model(&task). - Joins("left join jobs on tasks.job_id = jobs.id"). - Where("tasks.status = ?", api.TaskStatusActive). - Where("jobs.status in ?", schedulableJobStatuses). - Where("tasks.worker_id = ?", w.ID). // assigned to this worker - Limit(1). + assignedTaskResult := taskAssignedAndRunnableQuery(tx.Model(&task), w). Preload("Job"). Find(&task) if assignedTaskResult.Error != nil { @@ -152,3 +146,14 @@ func assignTaskToWorker(tx *gorm.DB, w *Worker, t *Task) error { Select("WorkerID", "LastTouchedAt"). Updates(Task{WorkerID: &w.ID, LastTouchedAt: tx.NowFunc()}).Error } + +// taskAssignedAndRunnableQuery appends some GORM clauses to query for a task +// that's already assigned to this worker, and is in a runnable state. +func taskAssignedAndRunnableQuery(tx *gorm.DB, w *Worker) *gorm.DB { + return tx. + Joins("left join jobs on tasks.job_id = jobs.id"). + Where("tasks.status = ?", api.TaskStatusActive). + Where("jobs.status in ?", schedulableJobStatuses). + Where("tasks.worker_id = ?", w.ID). // assigned to this worker + Limit(1) +} diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index eba990e8..ccfad354 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -80,6 +80,42 @@ func (db *DB) FetchWorkers(ctx context.Context) ([]*Worker, error) { return workers, nil } +// FetchWorkerTask returns the most recent task assigned to the given Worker. +func (db *DB) FetchWorkerTask(ctx context.Context, worker *Worker) (*Task, error) { + task := Task{} + + // See if there is a task assigned to this worker in the same way that the + // task scheduler does. + query := db.gormDB.WithContext(ctx) + query = taskAssignedAndRunnableQuery(query, worker) + tx := query. + Order("tasks.updated_at"). + Preload("Job"). + Find(&task) + if tx.Error != nil { + return nil, taskError(tx.Error, "fetching task assigned to Worker %s", worker.UUID) + } + if task.ID != 0 { + // Found a task! + return &task, nil + } + + // If not found, just find the last-modified task associated with this Worker. + tx = db.gormDB.WithContext(ctx). + Where("worker_id = ?", worker.ID). + Order("tasks.updated_at DESC"). + Preload("Job"). + Find(&task) + if tx.Error != nil { + return nil, taskError(tx.Error, "fetching task assigned to Worker %s", worker.UUID) + } + if task.ID == 0 { + return nil, nil + } + + return &task, nil +} + func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error { err := db.gormDB.WithContext(ctx). Model(w). diff --git a/internal/manager/persistence/workers_test.go b/internal/manager/persistence/workers_test.go index 98e9cdd6..d422b232 100644 --- a/internal/manager/persistence/workers_test.go +++ b/internal/manager/persistence/workers_test.go @@ -51,6 +51,91 @@ func TestCreateFetchWorker(t *testing.T) { assert.EqualValues(t, w.SupportedTaskTypes, fetchedWorker.SupportedTaskTypes) } +func TestFetchWorkerTask(t *testing.T) { + ctx, cancel, db := persistenceTestFixtures(t, 10000*time.Second) + defer cancel() + + // Worker without task. + w := Worker{ + UUID: uuid.New(), + Name: "дрон", + Address: "fe80::5054:ff:fede:2ad7", + Platform: "linux", + Software: "3.0", + Status: api.WorkerStatusAwake, + SupportedTaskTypes: "blender,ffmpeg,file-management", + } + + err := db.CreateWorker(ctx, &w) + if !assert.NoError(t, err) { + t.FailNow() + } + + { // Test without any task assigned. + task, err := db.FetchWorkerTask(ctx, &w) + if assert.NoError(t, err) { + assert.Nil(t, task) + } + } + + // Create a job with tasks. + authTask1 := authorTestTask("the task", "blender") + authTask2 := authorTestTask("the other task", "blender") + jobUUID := "b6a1d859-122f-4791-8b78-b943329a9989" + atj := authorTestJob(jobUUID, "simple-blender-render", authTask1, authTask2) + constructTestJob(ctx, t, db, atj) + + assignedTask, err := db.ScheduleTask(ctx, &w) + assert.NoError(t, err) + + { // Assigned task should be returned. + foundTask, err := db.FetchWorkerTask(ctx, &w) + if assert.NoError(t, err) && assert.NotNil(t, foundTask) { + assert.Equal(t, assignedTask.UUID, foundTask.UUID) + assert.Equal(t, jobUUID, foundTask.Job.UUID, "the job UUID should be returned as well") + } + } + + // Set the task to 'completed'. + assignedTask.Status = api.TaskStatusCompleted + assert.NoError(t, db.SaveTaskStatus(ctx, assignedTask)) + + { // Completed-but-last-assigned task should be returned. + foundTask, err := db.FetchWorkerTask(ctx, &w) + if assert.NoError(t, err) && assert.NotNil(t, foundTask) { + assert.Equal(t, assignedTask.UUID, foundTask.UUID) + assert.Equal(t, jobUUID, foundTask.Job.UUID, "the job UUID should be returned as well") + } + } + + // Assign another task. + newlyAssignedTask, err := db.ScheduleTask(ctx, &w) + if !assert.NoError(t, err) || !assert.NotNil(t, newlyAssignedTask) { + t.FailNow() + } + + { // Newly assigned task should be returned. + foundTask, err := db.FetchWorkerTask(ctx, &w) + if assert.NoError(t, err) && assert.NotNil(t, foundTask) { + assert.Equal(t, newlyAssignedTask.UUID, foundTask.UUID) + assert.Equal(t, jobUUID, foundTask.Job.UUID, "the job UUID should be returned as well") + } + } + + // Set the new task to 'completed'. + newlyAssignedTask.Status = api.TaskStatusCompleted + assert.NoError(t, db.SaveTaskStatus(ctx, newlyAssignedTask)) + + { // Completed-but-last-assigned task should be returned. + foundTask, err := db.FetchWorkerTask(ctx, &w) + if assert.NoError(t, err) && assert.NotNil(t, foundTask) { + assert.Equal(t, newlyAssignedTask.UUID, foundTask.UUID) + assert.Equal(t, jobUUID, foundTask.Job.UUID, "the job UUID should be returned as well") + } + } + +} + func TestSaveWorker(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, 1*time.Second) defer cancel() diff --git a/web/app/src/components/WorkerTaskLink.vue b/web/app/src/components/WorkerTaskLink.vue new file mode 100644 index 00000000..229b017f --- /dev/null +++ b/web/app/src/components/WorkerTaskLink.vue @@ -0,0 +1,18 @@ + + + + + diff --git a/web/app/src/components/workers/WorkerDetails.vue b/web/app/src/components/workers/WorkerDetails.vue index 875422af..9bf37f2f 100644 --- a/web/app/src/components/workers/WorkerDetails.vue +++ b/web/app/src/components/workers/WorkerDetails.vue @@ -27,6 +27,11 @@
Task Types
{{ workerData.supported_task_types.join(', ') }}
+ +
Last Task
+
+ +
@@ -40,11 +45,15 @@ import * as datetime from "@/datetime"; import { WorkerMgtApi } from '@/manager-api'; import { apiClient } from '@/stores/api-query-count'; import { workerStatus } from "../../statusindicator"; +import WorkerTaskLink from '@/components/WorkerTaskLink.vue'; export default { props: [ "workerData", // Worker data to show. ], + components: { + WorkerTaskLink, + }, data() { return { datetime: datetime, // So that the template can access it. @@ -58,6 +67,7 @@ export default { }, watch: { workerData(newData) { + console.log("new data:", plain(newData)); if (newData) this.workerStatusHTML = workerStatus(newData); else @@ -74,7 +84,7 @@ export default {