Manager: implement worker status change requests
Implement the OpenAPI `RequestWorkerStatusChange` operation, and handle these changes in the web interface.
This commit is contained in:
parent
fdb0b82664
commit
f97f0a34c3
@ -7,6 +7,7 @@ import (
|
||||
"net/http"
|
||||
|
||||
"git.blender.org/flamenco/internal/manager/persistence"
|
||||
"git.blender.org/flamenco/internal/manager/webupdates"
|
||||
"git.blender.org/flamenco/internal/uuid"
|
||||
"git.blender.org/flamenco/pkg/api"
|
||||
"github.com/labstack/echo/v4"
|
||||
@ -54,6 +55,54 @@ func (f *Flamenco) FetchWorker(e echo.Context, workerUUID string) error {
|
||||
return e.JSON(http.StatusOK, apiWorker)
|
||||
}
|
||||
|
||||
func (f *Flamenco) RequestWorkerStatusChange(e echo.Context, workerUUID string) error {
|
||||
logger := requestLogger(e)
|
||||
logger = logger.With().Str("worker", workerUUID).Logger()
|
||||
|
||||
if !uuid.IsValid(workerUUID) {
|
||||
return sendAPIError(e, http.StatusBadRequest, "not a valid UUID")
|
||||
}
|
||||
|
||||
// Decode the request body.
|
||||
var change api.WorkerStatusChangeRequest
|
||||
if err := e.Bind(&change); err != nil {
|
||||
logger.Warn().Err(err).Msg("bad request received")
|
||||
return sendAPIError(e, http.StatusBadRequest, "invalid format")
|
||||
}
|
||||
|
||||
// Fetch the worker.
|
||||
dbWorker, err := f.persist.FetchWorker(e.Request().Context(), workerUUID)
|
||||
if errors.Is(err, persistence.ErrWorkerNotFound) {
|
||||
logger.Debug().Msg("non-existent worker requested")
|
||||
return sendAPIError(e, http.StatusNotFound, "worker %q not found", workerUUID)
|
||||
}
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("error fetching worker")
|
||||
return sendAPIError(e, http.StatusInternalServerError, "error fetching worker: %v", err)
|
||||
}
|
||||
|
||||
// Store the status change.
|
||||
logger = logger.With().
|
||||
Str("status", string(dbWorker.Status)).
|
||||
Str("requested", string(change.StatusRequested)).
|
||||
Bool("lazy", change.IsLazy).
|
||||
Logger()
|
||||
|
||||
logger.Info().Msg("worker status change requested")
|
||||
dbWorker.StatusRequested = change.StatusRequested
|
||||
dbWorker.LazyStatusRequest = change.IsLazy
|
||||
if err := f.persist.SaveWorker(e.Request().Context(), dbWorker); err != nil {
|
||||
logger.Error().Err(err).Msg("error saving worker after status change request")
|
||||
return sendAPIError(e, http.StatusInternalServerError, "error saving worker: %v", err)
|
||||
}
|
||||
|
||||
// Broadcast the change.
|
||||
update := webupdates.NewWorkerUpdate(dbWorker)
|
||||
f.broadcaster.BroadcastWorkerUpdate(update)
|
||||
|
||||
return e.NoContent(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func workerSummary(w persistence.Worker) api.WorkerSummary {
|
||||
summary := api.WorkerSummary{
|
||||
Id: w.UUID,
|
||||
@ -63,6 +112,7 @@ func workerSummary(w persistence.Worker) api.WorkerSummary {
|
||||
}
|
||||
if w.StatusRequested != "" {
|
||||
summary.StatusRequested = &w.StatusRequested
|
||||
summary.LazyStatusRequest = &w.LazyStatusRequest
|
||||
}
|
||||
return summary
|
||||
}
|
||||
@ -82,6 +132,7 @@ func workerDBtoAPI(dbWorker *persistence.Worker) api.Worker {
|
||||
|
||||
if dbWorker.StatusRequested != "" {
|
||||
apiWorker.StatusRequested = &dbWorker.StatusRequested
|
||||
apiWorker.LazyStatusRequest = &dbWorker.LazyStatusRequest
|
||||
}
|
||||
|
||||
return apiWorker
|
||||
|
@ -121,3 +121,41 @@ func TestFetchWorker(t *testing.T) {
|
||||
SupportedTaskTypes: []string{"blender", "ffmpeg", "file-management", "misc"},
|
||||
})
|
||||
}
|
||||
|
||||
func TestRequestWorkerStatusChange(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
mf := newMockedFlamenco(mockCtrl)
|
||||
worker := testWorker()
|
||||
workerUUID := worker.UUID
|
||||
prevStatus := worker.Status
|
||||
|
||||
mf.persistence.EXPECT().FetchWorker(gomock.Any(), workerUUID).Return(&worker, nil)
|
||||
|
||||
requestStatus := api.WorkerStatusAsleep
|
||||
savedWorker := worker
|
||||
savedWorker.StatusRequested = requestStatus
|
||||
savedWorker.LazyStatusRequest = true
|
||||
mf.persistence.EXPECT().SaveWorker(gomock.Any(), &savedWorker).Return(nil)
|
||||
|
||||
// Expect a broadcast of the change
|
||||
mf.broadcaster.EXPECT().BroadcastWorkerUpdate(api.SocketIOWorkerUpdate{
|
||||
Id: worker.UUID,
|
||||
Nickname: worker.Name,
|
||||
PreviousStatus: &prevStatus,
|
||||
Status: prevStatus,
|
||||
StatusRequested: &requestStatus,
|
||||
LazyStatusRequest: ptr(true),
|
||||
Updated: worker.UpdatedAt,
|
||||
Version: worker.Software,
|
||||
})
|
||||
|
||||
echo := mf.prepareMockedJSONRequest(api.WorkerStatusChangeRequest{
|
||||
StatusRequested: requestStatus,
|
||||
IsLazy: true,
|
||||
})
|
||||
err := mf.flamenco.RequestWorkerStatusChange(echo, workerUUID)
|
||||
assert.NoError(t, err)
|
||||
assertResponseEmpty(t, echo)
|
||||
}
|
||||
|
@ -18,11 +18,13 @@ type Worker struct {
|
||||
Secret string `gorm:"type:varchar(255);default:''"`
|
||||
Name string `gorm:"type:varchar(64);default:''"`
|
||||
|
||||
Address string `gorm:"type:varchar(39);default:'';index"` // 39 = max length of IPv6 address.
|
||||
Platform string `gorm:"type:varchar(16);default:''"`
|
||||
Software string `gorm:"type:varchar(32);default:''"`
|
||||
Status api.WorkerStatus `gorm:"type:varchar(16);default:''"`
|
||||
StatusRequested api.WorkerStatus `gorm:"type:varchar(16);default:''"`
|
||||
Address string `gorm:"type:varchar(39);default:'';index"` // 39 = max length of IPv6 address.
|
||||
Platform string `gorm:"type:varchar(16);default:''"`
|
||||
Software string `gorm:"type:varchar(32);default:''"`
|
||||
Status api.WorkerStatus `gorm:"type:varchar(16);default:''"`
|
||||
|
||||
StatusRequested api.WorkerStatus `gorm:"type:varchar(16);default:''"`
|
||||
LazyStatusRequest bool `gorm:"type:smallint;default:0"`
|
||||
|
||||
SupportedTaskTypes string `gorm:"type:varchar(255);default:''"` // comma-separated list of task types.
|
||||
}
|
||||
|
@ -20,6 +20,12 @@ func NewWorkerUpdate(worker *persistence.Worker) api.SocketIOWorkerUpdate {
|
||||
Version: worker.Software,
|
||||
Updated: worker.UpdatedAt,
|
||||
}
|
||||
|
||||
if worker.StatusRequested != "" {
|
||||
workerUpdate.StatusRequested = &worker.StatusRequested
|
||||
workerUpdate.LazyStatusRequest = &worker.LazyStatusRequest
|
||||
}
|
||||
|
||||
return workerUpdate
|
||||
}
|
||||
|
||||
|
@ -465,4 +465,8 @@ section.footer-popup .tabulator .tabulator-tableholder .tabulator-placeholder .t
|
||||
span.state-transition-arrow {
|
||||
font-weight: bold;
|
||||
font-size: 110%;
|
||||
color: var(--color-accent);
|
||||
}
|
||||
span.state-transition-arrow.lazy {
|
||||
color: var(--color-text-muted);
|
||||
}
|
||||
|
@ -10,11 +10,7 @@
|
||||
<dd>{{ workerData.nickname }}</dd>
|
||||
|
||||
<dt class="field-status">Status</dt>
|
||||
<dd>{{ workerData.status }}
|
||||
<template v-if="workerData.status_requested">
|
||||
<span class='state-transition-arrow'>➜</span> {{ workerData.status_requested }}
|
||||
</template>
|
||||
</dd>
|
||||
<dd v-html="workerStatusHTML"></dd>
|
||||
|
||||
<dt class="field-version">Version</dt>
|
||||
<dd title="Version of Flamenco">{{ workerData.version }}</dd>
|
||||
@ -39,6 +35,7 @@
|
||||
import * as datetime from "@/datetime";
|
||||
import { WorkerMgtApi } from '@/manager-api';
|
||||
import { apiClient } from '@/stores/api-query-count';
|
||||
import { workerStatus } from "../../statusindicator";
|
||||
|
||||
export default {
|
||||
props: [
|
||||
@ -48,12 +45,19 @@ export default {
|
||||
return {
|
||||
datetime: datetime, // So that the template can access it.
|
||||
api: new WorkerMgtApi(apiClient),
|
||||
workerStatusHTML: "",
|
||||
};
|
||||
},
|
||||
mounted() {
|
||||
// Allow testing from the JS console:
|
||||
window.workerDetailsVue = this;
|
||||
},
|
||||
watch: {
|
||||
workerData(newData) {
|
||||
this.workerStatusHTML = workerStatus(newData);
|
||||
console.log("new worker data; status=", this.workerStatusHTML);
|
||||
},
|
||||
},
|
||||
computed: {
|
||||
hasWorkerData() {
|
||||
return !!this.workerData && !!this.workerData.id;
|
||||
|
@ -14,7 +14,7 @@
|
||||
<script lang="js">
|
||||
import { TabulatorFull as Tabulator } from 'tabulator-tables';
|
||||
import { WorkerMgtApi } from '@/manager-api'
|
||||
import { indicator } from '@/statusindicator';
|
||||
import { indicator, workerStatus } from '@/statusindicator';
|
||||
import { apiClient } from '@/stores/api-query-count';
|
||||
|
||||
import StatusFilterBar from '@/components/StatusFilterBar.vue'
|
||||
@ -46,10 +46,8 @@ export default {
|
||||
formatter: (cell) => {
|
||||
const data = cell.getData();
|
||||
const dot = indicator(data.status, 'worker-');
|
||||
if (data.status_requested) {
|
||||
return `${dot} ${data.status} <span class='state-transition-arrow'>➜</span> ${data.status_requested}`;
|
||||
}
|
||||
return `${dot} ${data.status}`;
|
||||
const asString = workerStatus(data);
|
||||
return `${dot} ${asString}`;
|
||||
},
|
||||
},
|
||||
{ title: 'Name', field: 'nickname', sorter: 'string' },
|
||||
@ -108,22 +106,24 @@ export default {
|
||||
this._refreshAvailableStatuses();
|
||||
},
|
||||
processWorkerUpdate(workerUpdate) {
|
||||
// updateData() will only overwrite properties that are actually set on
|
||||
// workerUpdate, and leave the rest as-is.
|
||||
if (this.tabulator.initialized) {
|
||||
this.tabulator.updateData([workerUpdate])
|
||||
.then(this.sortData);
|
||||
if (!this.tabulator.initialized) return;
|
||||
|
||||
// Contrary to tabulator.getRow(), rowManager.findRow() doesn't log a
|
||||
// warning when the row cannot be found,
|
||||
const existingRow = this.tabulator.rowManager.findRow(workerUpdate.id);
|
||||
|
||||
let promise;
|
||||
if (existingRow) {
|
||||
promise = this.tabulator.updateData([workerUpdate]);
|
||||
// Tabulator doesn't know we're using 'status_requested' in the 'status'
|
||||
// column, so it also won't know to redraw when that field changes.
|
||||
promise.then(() => existingRow.reinitialize(true));
|
||||
} else {
|
||||
promise = this.tabulator.addData([workerUpdate]);
|
||||
}
|
||||
this._refreshAvailableStatuses();
|
||||
},
|
||||
processNewWorker(workerUpdate) {
|
||||
if (this.tabulator.initialized) {
|
||||
this.tabulator.updateData([workerUpdate])
|
||||
.then(this.sortData);
|
||||
}
|
||||
this.tabulator.addData([workerUpdate])
|
||||
.then(this.sortData);
|
||||
this._refreshAvailableStatuses();
|
||||
promise
|
||||
.then(this.sortData)
|
||||
.then(this.refreshAvailableStatuses);
|
||||
},
|
||||
|
||||
onRowClick(event, row) {
|
||||
|
@ -16,3 +16,25 @@ export function indicator(status, classNamePrefix) {
|
||||
if (!classNamePrefix) classNamePrefix = ""; // force an empty string for any false value.
|
||||
return `<span title="${label}" class="indicator ${classNamePrefix}status-${status}"></span>`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct HTML for showing a worker's status, including any status change
|
||||
* request.
|
||||
*
|
||||
* @param {API.WorkerSummary} workerInfo
|
||||
* @returns the HTML for the worker status.
|
||||
*/
|
||||
export function workerStatus(worker) {
|
||||
if (!worker.status_requested) {
|
||||
return `${worker.status}`;
|
||||
}
|
||||
|
||||
let arrow;
|
||||
if (worker.lazy_status_request) {
|
||||
arrow = `<span class='state-transition-arrow lazy' title='lazy status transition'>➜</span>`
|
||||
} else {
|
||||
arrow = `<span class='state-transition-arrow forced' title='forced status transition'>➠</span>`
|
||||
}
|
||||
|
||||
return `${worker.status} ${arrow} ${worker.status_requested}`;
|
||||
}
|
||||
|
@ -68,10 +68,7 @@ export default {
|
||||
this.notifs.addWorkerUpdate(workerUpdate);
|
||||
|
||||
if (this.$refs.workersTable) {
|
||||
if (workerUpdate.previous_status)
|
||||
this.$refs.workersTable.processWorkerUpdate(workerUpdate);
|
||||
else
|
||||
this.$refs.workersTable.processNewWorker(workerUpdate);
|
||||
this.$refs.workersTable.processWorkerUpdate(workerUpdate);
|
||||
}
|
||||
if (this.workerID != workerUpdate.id)
|
||||
return;
|
||||
|
Loading…
x
Reference in New Issue
Block a user