Manager: show current/last task in worker details

The Task details component already linked to the Worker it was assigned
to last, and now the Worker links back to the task.

There's only one task shown in the Worker details. If the Worker is
actively working on a task, that one's shown. Otherwise it's the
last-updated task that was assigned to the worker.
This commit is contained in:
Sybren A. Stüvel 2022-07-26 10:36:02 +02:00
parent 2a4e557bd9
commit 736ca103c3
8 changed files with 195 additions and 9 deletions

View File

@ -42,6 +42,7 @@ type PersistenceService interface {
CreateWorker(ctx context.Context, w *persistence.Worker) error CreateWorker(ctx context.Context, w *persistence.Worker) error
FetchWorker(ctx context.Context, uuid string) (*persistence.Worker, error) FetchWorker(ctx context.Context, uuid string) (*persistence.Worker, error)
FetchWorkers(ctx context.Context) ([]*persistence.Worker, error) FetchWorkers(ctx context.Context) ([]*persistence.Worker, error)
FetchWorkerTask(context.Context, *persistence.Worker) (*persistence.Task, error)
SaveWorker(ctx context.Context, w *persistence.Worker) error SaveWorker(ctx context.Context, w *persistence.Worker) error
SaveWorkerStatus(ctx context.Context, w *persistence.Worker) error SaveWorkerStatus(ctx context.Context, w *persistence.Worker) error
WorkerSeen(ctx context.Context, w *persistence.Worker) error WorkerSeen(ctx context.Context, w *persistence.Worker) error

View File

@ -216,6 +216,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchWorker(arg0, arg1 interface{}
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorker", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorker), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorker", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorker), arg0, arg1)
} }
// FetchWorkerTask mocks base method.
func (m *MockPersistenceService) FetchWorkerTask(arg0 context.Context, arg1 *persistence.Worker) (*persistence.Task, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchWorkerTask", arg0, arg1)
ret0, _ := ret[0].(*persistence.Task)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchWorkerTask indicates an expected call of FetchWorkerTask.
func (mr *MockPersistenceServiceMockRecorder) FetchWorkerTask(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerTask", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkerTask), arg0, arg1)
}
// FetchWorkers mocks base method. // FetchWorkers mocks base method.
func (m *MockPersistenceService) FetchWorkers(arg0 context.Context) ([]*persistence.Worker, error) { func (m *MockPersistenceService) FetchWorkers(arg0 context.Context) ([]*persistence.Worker, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@ -40,7 +40,8 @@ func (f *Flamenco) FetchWorker(e echo.Context, workerUUID string) error {
return sendAPIError(e, http.StatusBadRequest, "not a valid UUID") return sendAPIError(e, http.StatusBadRequest, "not a valid UUID")
} }
dbWorker, err := f.persist.FetchWorker(e.Request().Context(), workerUUID) ctx := e.Request().Context()
dbWorker, err := f.persist.FetchWorker(ctx, workerUUID)
if errors.Is(err, persistence.ErrWorkerNotFound) { if errors.Is(err, persistence.ErrWorkerNotFound) {
logger.Debug().Msg("non-existent worker requested") logger.Debug().Msg("non-existent worker requested")
return sendAPIError(e, http.StatusNotFound, "worker %q not found", workerUUID) return sendAPIError(e, http.StatusNotFound, "worker %q not found", workerUUID)
@ -50,8 +51,23 @@ func (f *Flamenco) FetchWorker(e echo.Context, workerUUID string) error {
return sendAPIError(e, http.StatusInternalServerError, "error fetching worker: %v", err) return sendAPIError(e, http.StatusInternalServerError, "error fetching worker: %v", err)
} }
dbTask, err := f.persist.FetchWorkerTask(ctx, dbWorker)
if err != nil {
logger.Error().Err(err).Msg("error fetching task assigned to worker")
return sendAPIError(e, http.StatusInternalServerError, "error fetching task assigned to worker: %v", err)
}
logger.Debug().Msg("fetched worker") logger.Debug().Msg("fetched worker")
apiWorker := workerDBtoAPI(*dbWorker) apiWorker := workerDBtoAPI(*dbWorker)
if dbTask != nil {
apiWorkerTask := api.WorkerTask{
TaskSummary: taskDBtoSummary(dbTask),
JobId: dbTask.Job.UUID,
}
apiWorker.Task = &apiWorkerTask
}
return e.JSON(http.StatusOK, apiWorker) return e.JSON(http.StatusOK, apiWorker)
} }

View File

@ -85,13 +85,7 @@ func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) {
// If a task is alreay active & assigned to this worker, return just that. // If a task is alreay active & assigned to this worker, return just that.
// Note that this task type could be blocklisted or no longer supported by the // Note that this task type could be blocklisted or no longer supported by the
// Worker, but since it's active that is unlikely. // Worker, but since it's active that is unlikely.
assignedTaskResult := tx. assignedTaskResult := taskAssignedAndRunnableQuery(tx.Model(&task), w).
Model(&task).
Joins("left join jobs on tasks.job_id = jobs.id").
Where("tasks.status = ?", api.TaskStatusActive).
Where("jobs.status in ?", schedulableJobStatuses).
Where("tasks.worker_id = ?", w.ID). // assigned to this worker
Limit(1).
Preload("Job"). Preload("Job").
Find(&task) Find(&task)
if assignedTaskResult.Error != nil { if assignedTaskResult.Error != nil {
@ -152,3 +146,14 @@ func assignTaskToWorker(tx *gorm.DB, w *Worker, t *Task) error {
Select("WorkerID", "LastTouchedAt"). Select("WorkerID", "LastTouchedAt").
Updates(Task{WorkerID: &w.ID, LastTouchedAt: tx.NowFunc()}).Error Updates(Task{WorkerID: &w.ID, LastTouchedAt: tx.NowFunc()}).Error
} }
// taskAssignedAndRunnableQuery appends some GORM clauses to query for a task
// that's already assigned to this worker, and is in a runnable state.
func taskAssignedAndRunnableQuery(tx *gorm.DB, w *Worker) *gorm.DB {
return tx.
Joins("left join jobs on tasks.job_id = jobs.id").
Where("tasks.status = ?", api.TaskStatusActive).
Where("jobs.status in ?", schedulableJobStatuses).
Where("tasks.worker_id = ?", w.ID). // assigned to this worker
Limit(1)
}

View File

@ -80,6 +80,42 @@ func (db *DB) FetchWorkers(ctx context.Context) ([]*Worker, error) {
return workers, nil return workers, nil
} }
// FetchWorkerTask returns the most recent task assigned to the given Worker.
func (db *DB) FetchWorkerTask(ctx context.Context, worker *Worker) (*Task, error) {
task := Task{}
// See if there is a task assigned to this worker in the same way that the
// task scheduler does.
query := db.gormDB.WithContext(ctx)
query = taskAssignedAndRunnableQuery(query, worker)
tx := query.
Order("tasks.updated_at").
Preload("Job").
Find(&task)
if tx.Error != nil {
return nil, taskError(tx.Error, "fetching task assigned to Worker %s", worker.UUID)
}
if task.ID != 0 {
// Found a task!
return &task, nil
}
// If not found, just find the last-modified task associated with this Worker.
tx = db.gormDB.WithContext(ctx).
Where("worker_id = ?", worker.ID).
Order("tasks.updated_at DESC").
Preload("Job").
Find(&task)
if tx.Error != nil {
return nil, taskError(tx.Error, "fetching task assigned to Worker %s", worker.UUID)
}
if task.ID == 0 {
return nil, nil
}
return &task, nil
}
func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error { func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error {
err := db.gormDB.WithContext(ctx). err := db.gormDB.WithContext(ctx).
Model(w). Model(w).

View File

@ -51,6 +51,91 @@ func TestCreateFetchWorker(t *testing.T) {
assert.EqualValues(t, w.SupportedTaskTypes, fetchedWorker.SupportedTaskTypes) assert.EqualValues(t, w.SupportedTaskTypes, fetchedWorker.SupportedTaskTypes)
} }
func TestFetchWorkerTask(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(t, 10000*time.Second)
defer cancel()
// Worker without task.
w := Worker{
UUID: uuid.New(),
Name: "дрон",
Address: "fe80::5054:ff:fede:2ad7",
Platform: "linux",
Software: "3.0",
Status: api.WorkerStatusAwake,
SupportedTaskTypes: "blender,ffmpeg,file-management",
}
err := db.CreateWorker(ctx, &w)
if !assert.NoError(t, err) {
t.FailNow()
}
{ // Test without any task assigned.
task, err := db.FetchWorkerTask(ctx, &w)
if assert.NoError(t, err) {
assert.Nil(t, task)
}
}
// Create a job with tasks.
authTask1 := authorTestTask("the task", "blender")
authTask2 := authorTestTask("the other task", "blender")
jobUUID := "b6a1d859-122f-4791-8b78-b943329a9989"
atj := authorTestJob(jobUUID, "simple-blender-render", authTask1, authTask2)
constructTestJob(ctx, t, db, atj)
assignedTask, err := db.ScheduleTask(ctx, &w)
assert.NoError(t, err)
{ // Assigned task should be returned.
foundTask, err := db.FetchWorkerTask(ctx, &w)
if assert.NoError(t, err) && assert.NotNil(t, foundTask) {
assert.Equal(t, assignedTask.UUID, foundTask.UUID)
assert.Equal(t, jobUUID, foundTask.Job.UUID, "the job UUID should be returned as well")
}
}
// Set the task to 'completed'.
assignedTask.Status = api.TaskStatusCompleted
assert.NoError(t, db.SaveTaskStatus(ctx, assignedTask))
{ // Completed-but-last-assigned task should be returned.
foundTask, err := db.FetchWorkerTask(ctx, &w)
if assert.NoError(t, err) && assert.NotNil(t, foundTask) {
assert.Equal(t, assignedTask.UUID, foundTask.UUID)
assert.Equal(t, jobUUID, foundTask.Job.UUID, "the job UUID should be returned as well")
}
}
// Assign another task.
newlyAssignedTask, err := db.ScheduleTask(ctx, &w)
if !assert.NoError(t, err) || !assert.NotNil(t, newlyAssignedTask) {
t.FailNow()
}
{ // Newly assigned task should be returned.
foundTask, err := db.FetchWorkerTask(ctx, &w)
if assert.NoError(t, err) && assert.NotNil(t, foundTask) {
assert.Equal(t, newlyAssignedTask.UUID, foundTask.UUID)
assert.Equal(t, jobUUID, foundTask.Job.UUID, "the job UUID should be returned as well")
}
}
// Set the new task to 'completed'.
newlyAssignedTask.Status = api.TaskStatusCompleted
assert.NoError(t, db.SaveTaskStatus(ctx, newlyAssignedTask))
{ // Completed-but-last-assigned task should be returned.
foundTask, err := db.FetchWorkerTask(ctx, &w)
if assert.NoError(t, err) && assert.NotNil(t, foundTask) {
assert.Equal(t, newlyAssignedTask.UUID, foundTask.UUID)
assert.Equal(t, jobUUID, foundTask.Job.UUID, "the job UUID should be returned as well")
}
}
}
func TestSaveWorker(t *testing.T) { func TestSaveWorker(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(t, 1*time.Second) ctx, cancel, db := persistenceTestFixtures(t, 1*time.Second)
defer cancel() defer cancel()

View File

@ -0,0 +1,18 @@
<template>
<router-link :to="{ name: 'jobs', params: { jobID: workerTask.job_id, taskID: workerTask.id } }">
{{ workerTask.name }}
(<span class="status-label" :class="'status-' + workerTask.status">{{ workerTask.status }}</span>)
</router-link>
</template>
<script setup>
// 'workerTask' should be a WorkerTask object (see schema defined in `flamenco-openapi.yaml`).
const props = defineProps(['workerTask']);
</script>
<style scoped>
.status-label {
color: var(--indicator-color);
font-weight: bold;
}
</style>

View File

@ -27,6 +27,11 @@
<dt class="field-supported_task_types">Task Types</dt> <dt class="field-supported_task_types">Task Types</dt>
<dd>{{ workerData.supported_task_types.join(', ') }}</dd> <dd>{{ workerData.supported_task_types.join(', ') }}</dd>
<dt class="field-task">Last Task</dt>
<dd>
<worker-task-link :workerTask="workerData.task" />
</dd>
</dl> </dl>
</template> </template>
@ -40,11 +45,15 @@ import * as datetime from "@/datetime";
import { WorkerMgtApi } from '@/manager-api'; import { WorkerMgtApi } from '@/manager-api';
import { apiClient } from '@/stores/api-query-count'; import { apiClient } from '@/stores/api-query-count';
import { workerStatus } from "../../statusindicator"; import { workerStatus } from "../../statusindicator";
import WorkerTaskLink from '@/components/WorkerTaskLink.vue';
export default { export default {
props: [ props: [
"workerData", // Worker data to show. "workerData", // Worker data to show.
], ],
components: {
WorkerTaskLink,
},
data() { data() {
return { return {
datetime: datetime, // So that the template can access it. datetime: datetime, // So that the template can access it.
@ -58,6 +67,7 @@ export default {
}, },
watch: { watch: {
workerData(newData) { workerData(newData) {
console.log("new data:", plain(newData));
if (newData) if (newData)
this.workerStatusHTML = workerStatus(newData); this.workerStatusHTML = workerStatus(newData);
else else