From 2e11c1c2408eb5c8b5245271e312323ebd2af004 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Tue, 31 May 2022 15:19:12 +0200 Subject: [PATCH] Manager: Implement SocketIO worker updates --- internal/manager/api_impl/api_impl.go | 3 + .../api_impl/mocks/api_impl_mock.gen.go | 24 ++++++++ internal/manager/api_impl/workers.go | 30 ++++++++-- internal/manager/api_impl/workers_test.go | 56 +++++++++++++++++++ internal/manager/webupdates/sio_rooms.go | 8 ++- internal/manager/webupdates/worker_updates.go | 41 ++++++++++++++ web/app/src/components/UpdateListener.vue | 9 ++- .../src/components/workers/WorkersTable.vue | 36 ++++++------ web/app/src/stores/notifications.js | 12 ++++ web/app/src/views/WorkersView.vue | 21 ++++++- 10 files changed, 214 insertions(+), 26 deletions(-) create mode 100644 internal/manager/webupdates/worker_updates.go diff --git a/internal/manager/api_impl/api_impl.go b/internal/manager/api_impl/api_impl.go index 824ce919..3c186e1a 100644 --- a/internal/manager/api_impl/api_impl.go +++ b/internal/manager/api_impl/api_impl.go @@ -89,6 +89,9 @@ type ChangeBroadcaster interface { // separate broadcast per task. BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate) + + BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) + BroadcastNewWorker(workerUpdate api.SocketIOWorkerUpdate) } // ChangeBroadcaster should be a subset of webupdates.BiDirComms. diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 9460b85c..a5043bd2 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -279,6 +279,18 @@ func (mr *MockChangeBroadcasterMockRecorder) BroadcastNewJob(arg0 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewJob", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewJob), arg0) } +// BroadcastNewWorker mocks base method. +func (m *MockChangeBroadcaster) BroadcastNewWorker(arg0 api.SocketIOWorkerUpdate) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "BroadcastNewWorker", arg0) +} + +// BroadcastNewWorker indicates an expected call of BroadcastNewWorker. +func (mr *MockChangeBroadcasterMockRecorder) BroadcastNewWorker(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewWorker", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewWorker), arg0) +} + // BroadcastTaskLogUpdate mocks base method. func (m *MockChangeBroadcaster) BroadcastTaskLogUpdate(arg0 api.SocketIOTaskLogUpdate) { m.ctrl.T.Helper() @@ -291,6 +303,18 @@ func (mr *MockChangeBroadcasterMockRecorder) BroadcastTaskLogUpdate(arg0 interfa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastTaskLogUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastTaskLogUpdate), arg0) } +// BroadcastWorkerUpdate mocks base method. +func (m *MockChangeBroadcaster) BroadcastWorkerUpdate(arg0 api.SocketIOWorkerUpdate) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "BroadcastWorkerUpdate", arg0) +} + +// BroadcastWorkerUpdate indicates an expected call of BroadcastWorkerUpdate. +func (mr *MockChangeBroadcasterMockRecorder) BroadcastWorkerUpdate(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastWorkerUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastWorkerUpdate), arg0) +} + // MockJobCompiler is a mock of JobCompiler interface. type MockJobCompiler struct { ctrl *gomock.Controller diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index b369d63a..e5ee4ecb 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -84,25 +84,37 @@ func (f *Flamenco) SignOn(e echo.Context) error { } logger.Info().Msg("worker signing on") - w, err := f.workerUpdateAfterSignOn(e, req) + w, prevStatus, err := f.workerUpdateAfterSignOn(e, req) if err != nil { return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database") } + // Broadcast the status change. + update := webupdates.NewWorkerUpdate(w) + if prevStatus != "" { + update.PreviousStatus = &prevStatus + } + if w.StatusRequested != "" { + update.StatusRequested = &w.StatusRequested + } + f.broadcaster.BroadcastWorkerUpdate(update) + resp := api.WorkerStateChange{} if w.StatusRequested != "" { resp.StatusRequested = w.StatusRequested } else { resp.StatusRequested = api.WorkerStatusAwake } + return e.JSON(http.StatusOK, resp) } -func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSONBody) (*persistence.Worker, error) { +func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSONBody) (*persistence.Worker, api.WorkerStatus, error) { logger := requestLogger(e) w := requestWorkerOrPanic(e) // Update the worker for with the new sign-on info. + prevStatus := w.Status w.Status = api.WorkerStatusStarting w.Address = e.RealIP() w.Name = update.Nickname @@ -120,10 +132,10 @@ func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSON logger.Warn().Err(err). Str("newStatus", string(w.Status)). Msg("error storing Worker in database") - return nil, err + return nil, "", err } - return w, nil + return w, prevStatus, nil } func (f *Flamenco) SignOff(e echo.Context) error { @@ -138,6 +150,7 @@ func (f *Flamenco) SignOff(e echo.Context) error { logger.Info().Msg("worker signing off") w := requestWorkerOrPanic(e) + prevStatus := w.Status w.Status = api.WorkerStatusOffline if w.StatusRequested == api.WorkerStatusShutdown { w.StatusRequested = "" @@ -163,6 +176,10 @@ func (f *Flamenco) SignOff(e echo.Context) error { return sendAPIError(e, http.StatusInternalServerError, "error re-queueing your tasks") } + update := webupdates.NewWorkerUpdate(w) + update.PreviousStatus = &prevStatus + f.broadcaster.BroadcastWorkerUpdate(update) + return e.NoContent(http.StatusNoContent) } @@ -234,6 +251,7 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error { Str("newStatus", string(req.Status)). Logger() + prevStatus := w.Status w.Status = req.Status if w.StatusRequested != "" && req.Status != w.StatusRequested { logger.Warn(). @@ -254,6 +272,10 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error { return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database") } + update := webupdates.NewWorkerUpdate(w) + update.PreviousStatus = &prevStatus + f.broadcaster.BroadcastWorkerUpdate(update) + return e.NoContent(http.StatusNoContent) } diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index 427003bd..f0e18b81 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -96,6 +96,17 @@ func TestWorkerSignOn(t *testing.T) { mf := newMockedFlamenco(mockCtrl) worker := testWorker() worker.Status = api.WorkerStatusOffline + prevStatus := worker.Status + + mf.broadcaster.EXPECT().BroadcastWorkerUpdate(api.SocketIOWorkerUpdate{ + Id: worker.UUID, + Nickname: "Lazy Boi", + PreviousStatus: &prevStatus, + Status: api.WorkerStatusStarting, + StatusRequested: nil, + Updated: worker.UpdatedAt, + Version: "3.0-testing", + }) mf.persistence.EXPECT().SaveWorker(gomock.Any(), &worker).Return(nil) @@ -167,6 +178,16 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) { return nil }) + prevStatus := api.WorkerStatusAwake + mf.broadcaster.EXPECT().BroadcastWorkerUpdate(api.SocketIOWorkerUpdate{ + Id: worker.UUID, + Nickname: worker.Name, + PreviousStatus: &prevStatus, + Status: api.WorkerStatusOffline, + Updated: worker.UpdatedAt, + Version: worker.Software, + }) + err := mf.flamenco.SignOff(echo) assert.NoError(t, err) @@ -174,6 +195,41 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) { assert.Equal(t, http.StatusNoContent, resp.StatusCode) } +func TestWorkerStateChanged(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + worker := testWorker() + worker.Status = api.WorkerStatusStarting + prevStatus := worker.Status + + // Expect a broadcast of the change + mf.broadcaster.EXPECT().BroadcastWorkerUpdate(api.SocketIOWorkerUpdate{ + Id: worker.UUID, + Nickname: worker.Name, + PreviousStatus: &prevStatus, + Status: api.WorkerStatusAwake, + StatusRequested: nil, + Updated: worker.UpdatedAt, + Version: worker.Software, + }) + + // Expect the Worker to be saved with the new status + savedWorker := worker + savedWorker.Status = api.WorkerStatusAwake + mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil) + + // Perform the request + echo := mf.prepareMockedJSONRequest(api.WorkerStateChanged{ + Status: api.WorkerStatusAwake, + }) + requestWorkerStore(echo, &worker) + err := mf.flamenco.WorkerStateChanged(echo) + assert.NoError(t, err) + assertResponseEmpty(t, echo) +} + func TestTaskUpdate(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() diff --git a/internal/manager/webupdates/sio_rooms.go b/internal/manager/webupdates/sio_rooms.go index b10ba86e..9aef9c92 100644 --- a/internal/manager/webupdates/sio_rooms.go +++ b/internal/manager/webupdates/sio_rooms.go @@ -21,8 +21,9 @@ const ( // Predefined SocketIO rooms. There will be others, but those will have a // dynamic name like `job-fa48930a-105c-4125-a7f7-0aa1651dcd57` and cannot be // listed here as constants. See `roomXXX()` functions for those. - SocketIORoomChat SocketIORoomName = "Chat" // For chat messages. - SocketIORoomJobs SocketIORoomName = "Jobs" // For job updates. + SocketIORoomChat SocketIORoomName = "Chat" // For chat messages. + SocketIORoomJobs SocketIORoomName = "Jobs" // For job updates. + SocketIORoomWorkers SocketIORoomName = "Workers" // For worker updates. ) const ( @@ -32,6 +33,7 @@ const ( SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.SocketIOJobUpdate SIOEventTaskUpdate SocketIOEventType = "/task" // sends api.SocketIOTaskUpdate SIOEventTaskLogUpdate SocketIOEventType = "/tasklog" // sends api.SocketIOTaskLogUpdate + SIOEventWorkerUpdate SocketIOEventType = "/workers" // sends api.SocketIOWorkerUpdate SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription ) @@ -62,6 +64,8 @@ func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.Sock switch subs.Type { case api.SocketIOSubscriptionTypeAllJobs: sioRoom = SocketIORoomJobs + case api.SocketIOSubscriptionTypeAllWorkers: + sioRoom = SocketIORoomWorkers case api.SocketIOSubscriptionTypeJob: if subs.Uuid == nil { logger.Warn().Msg("socketIO: trying to (un)subscribe to job without UUID") diff --git a/internal/manager/webupdates/worker_updates.go b/internal/manager/webupdates/worker_updates.go new file mode 100644 index 00000000..03928e88 --- /dev/null +++ b/internal/manager/webupdates/worker_updates.go @@ -0,0 +1,41 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +package webupdates + +import ( + "github.com/rs/zerolog/log" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/pkg/api" +) + +// NewWorkerUpdate returns a partial SocketIOWorkerUpdate struct for the given worker. +// It only fills in the fields that represent the current state of the worker. For +// example, it omits `PreviousStatus`. The ommitted fields can be filled in by +// the caller. +func NewWorkerUpdate(worker *persistence.Worker) api.SocketIOWorkerUpdate { + workerUpdate := api.SocketIOWorkerUpdate{ + Id: worker.UUID, + Nickname: worker.Name, + Status: worker.Status, + Version: worker.Software, + Updated: worker.UpdatedAt, + } + return workerUpdate +} + +// BroadcastWorkerUpdate sends the worker update to clients. +func (b *BiDirComms) BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) { + log.Debug().Interface("workerUpdate", workerUpdate).Msg("socketIO: broadcasting worker update") + b.BroadcastTo(SocketIORoomWorkers, SIOEventWorkerUpdate, workerUpdate) +} + +// BroadcastNewWorker sends a "new worker" notification to clients. +func (b *BiDirComms) BroadcastNewWorker(workerUpdate api.SocketIOWorkerUpdate) { + if workerUpdate.PreviousStatus != nil { + log.Warn().Interface("workerUpdate", workerUpdate).Msg("socketIO: new workers should not have a previous state") + workerUpdate.PreviousStatus = nil + } + + log.Debug().Interface("workerUpdate", workerUpdate).Msg("socketIO: broadcasting new worker") + b.BroadcastTo(SocketIORoomWorkers, SIOEventWorkerUpdate, workerUpdate) +} diff --git a/web/app/src/components/UpdateListener.vue b/web/app/src/components/UpdateListener.vue index b25ae6b5..e8bfe7e4 100644 --- a/web/app/src/components/UpdateListener.vue +++ b/web/app/src/components/UpdateListener.vue @@ -13,7 +13,7 @@ const websocketURL = ws(); export default { emits: [ // Data from Flamenco Manager: - "jobUpdate", "taskUpdate", "taskLogUpdate", "message", + "jobUpdate", "taskUpdate", "taskLogUpdate", "message", "workerUpdate", // SocketIO events: "sioReconnected", "sioDisconnected" ], @@ -146,6 +146,13 @@ export default { this.$emit("taskLogUpdate", apiTaskLogUpdate); }); + this.socket.on("/workers", (workerUpdate) => { + // Convert to API object, in order to have the same parsing of data as + // when we'd do an API call. + const apiWorkerUpdate = API.SocketIOWorkerUpdate.constructFromObject(workerUpdate) + this.$emit("workerUpdate", apiWorkerUpdate); + }); + // Chat system, useful for debugging. this.socket.on("/message", (message) => { this.$emit("message", message); diff --git a/web/app/src/components/workers/WorkersTable.vue b/web/app/src/components/workers/WorkersTable.vue index 8f027080..054fec99 100644 --- a/web/app/src/components/workers/WorkersTable.vue +++ b/web/app/src/components/workers/WorkersTable.vue @@ -99,24 +99,24 @@ export default { this.tabulator.setData(data.workers); this._refreshAvailableStatuses(); }, - // processWorkerUpdate(workerUpdate) { - // // updateData() will only overwrite properties that are actually set on - // // workerUpdate, and leave the rest as-is. - // if (this.tabulator.initialized) { - // this.tabulator.updateData([workerUpdate]) - // .then(this.sortData); - // } - // this._refreshAvailableStatuses(); - // }, - // processNewWorker(workerUpdate) { - // if (this.tabulator.initialized) { - // this.tabulator.updateData([workerUpdate]) - // .then(this.sortData); - // } - // this.tabulator.addData([workerUpdate]) - // .then(this.sortData); - // this._refreshAvailableStatuses(); - // }, + processWorkerUpdate(workerUpdate) { + // updateData() will only overwrite properties that are actually set on + // workerUpdate, and leave the rest as-is. + if (this.tabulator.initialized) { + this.tabulator.updateData([workerUpdate]) + .then(this.sortData); + } + this._refreshAvailableStatuses(); + }, + processNewWorker(workerUpdate) { + if (this.tabulator.initialized) { + this.tabulator.updateData([workerUpdate]) + .then(this.sortData); + } + this.tabulator.addData([workerUpdate]) + .then(this.sortData); + this._refreshAvailableStatuses(); + }, onRowClick(event, row) { // Take a copy of the data, so that it's decoupled from the tabulator data diff --git a/web/app/src/stores/notifications.js b/web/app/src/stores/notifications.js index 2974555e..405dbf87 100644 --- a/web/app/src/stores/notifications.js +++ b/web/app/src/stores/notifications.js @@ -66,6 +66,18 @@ export const useNotifs = defineStore('notifications', { this.add(msg) }, + /** + * @param {API.SocketIOWorkerUpdate} workerUpdate Worker update received via SocketIO. + */ + addWorkerUpdate(workerUpdate) { + console.log('Received worker update:', workerUpdate); + let msg = `Worker ${workerUpdate.name}`; + if (workerUpdate.previous_status && workerUpdate.previous_status != workerUpdate.status) { + msg += ` changed status ${workerUpdate.previous_status} ➜ ${workerUpdate.status}`; + } + this.add(msg) + }, + /* Ensure there is only 1000 items in the history. */ _prune() { if (this.history.length <= 1000) return; diff --git a/web/app/src/views/WorkersView.vue b/web/app/src/views/WorkersView.vue index 284a9cc6..aaa6a077 100644 --- a/web/app/src/views/WorkersView.vue +++ b/web/app/src/views/WorkersView.vue @@ -7,7 +7,8 @@ @@ -24,6 +25,7 @@