Manager: Implement SocketIO worker updates

This commit is contained in:
Sybren A. Stüvel 2022-05-31 15:19:12 +02:00
parent 1f49880b7f
commit 2e11c1c240
10 changed files with 214 additions and 26 deletions

View File

@ -89,6 +89,9 @@ type ChangeBroadcaster interface {
// separate broadcast per task. // separate broadcast per task.
BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate) BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate)
BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate)
BroadcastNewWorker(workerUpdate api.SocketIOWorkerUpdate)
} }
// ChangeBroadcaster should be a subset of webupdates.BiDirComms. // ChangeBroadcaster should be a subset of webupdates.BiDirComms.

View File

@ -279,6 +279,18 @@ func (mr *MockChangeBroadcasterMockRecorder) BroadcastNewJob(arg0 interface{}) *
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewJob", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewJob), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewJob", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewJob), arg0)
} }
// BroadcastNewWorker mocks base method.
func (m *MockChangeBroadcaster) BroadcastNewWorker(arg0 api.SocketIOWorkerUpdate) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "BroadcastNewWorker", arg0)
}
// BroadcastNewWorker indicates an expected call of BroadcastNewWorker.
func (mr *MockChangeBroadcasterMockRecorder) BroadcastNewWorker(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewWorker", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewWorker), arg0)
}
// BroadcastTaskLogUpdate mocks base method. // BroadcastTaskLogUpdate mocks base method.
func (m *MockChangeBroadcaster) BroadcastTaskLogUpdate(arg0 api.SocketIOTaskLogUpdate) { func (m *MockChangeBroadcaster) BroadcastTaskLogUpdate(arg0 api.SocketIOTaskLogUpdate) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -291,6 +303,18 @@ func (mr *MockChangeBroadcasterMockRecorder) BroadcastTaskLogUpdate(arg0 interfa
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastTaskLogUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastTaskLogUpdate), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastTaskLogUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastTaskLogUpdate), arg0)
} }
// BroadcastWorkerUpdate mocks base method.
func (m *MockChangeBroadcaster) BroadcastWorkerUpdate(arg0 api.SocketIOWorkerUpdate) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "BroadcastWorkerUpdate", arg0)
}
// BroadcastWorkerUpdate indicates an expected call of BroadcastWorkerUpdate.
func (mr *MockChangeBroadcasterMockRecorder) BroadcastWorkerUpdate(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastWorkerUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastWorkerUpdate), arg0)
}
// MockJobCompiler is a mock of JobCompiler interface. // MockJobCompiler is a mock of JobCompiler interface.
type MockJobCompiler struct { type MockJobCompiler struct {
ctrl *gomock.Controller ctrl *gomock.Controller

View File

@ -84,25 +84,37 @@ func (f *Flamenco) SignOn(e echo.Context) error {
} }
logger.Info().Msg("worker signing on") logger.Info().Msg("worker signing on")
w, err := f.workerUpdateAfterSignOn(e, req) w, prevStatus, err := f.workerUpdateAfterSignOn(e, req)
if err != nil { if err != nil {
return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database") return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database")
} }
// Broadcast the status change.
update := webupdates.NewWorkerUpdate(w)
if prevStatus != "" {
update.PreviousStatus = &prevStatus
}
if w.StatusRequested != "" {
update.StatusRequested = &w.StatusRequested
}
f.broadcaster.BroadcastWorkerUpdate(update)
resp := api.WorkerStateChange{} resp := api.WorkerStateChange{}
if w.StatusRequested != "" { if w.StatusRequested != "" {
resp.StatusRequested = w.StatusRequested resp.StatusRequested = w.StatusRequested
} else { } else {
resp.StatusRequested = api.WorkerStatusAwake resp.StatusRequested = api.WorkerStatusAwake
} }
return e.JSON(http.StatusOK, resp) return e.JSON(http.StatusOK, resp)
} }
func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSONBody) (*persistence.Worker, error) { func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSONBody) (*persistence.Worker, api.WorkerStatus, error) {
logger := requestLogger(e) logger := requestLogger(e)
w := requestWorkerOrPanic(e) w := requestWorkerOrPanic(e)
// Update the worker for with the new sign-on info. // Update the worker for with the new sign-on info.
prevStatus := w.Status
w.Status = api.WorkerStatusStarting w.Status = api.WorkerStatusStarting
w.Address = e.RealIP() w.Address = e.RealIP()
w.Name = update.Nickname w.Name = update.Nickname
@ -120,10 +132,10 @@ func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSON
logger.Warn().Err(err). logger.Warn().Err(err).
Str("newStatus", string(w.Status)). Str("newStatus", string(w.Status)).
Msg("error storing Worker in database") Msg("error storing Worker in database")
return nil, err return nil, "", err
} }
return w, nil return w, prevStatus, nil
} }
func (f *Flamenco) SignOff(e echo.Context) error { func (f *Flamenco) SignOff(e echo.Context) error {
@ -138,6 +150,7 @@ func (f *Flamenco) SignOff(e echo.Context) error {
logger.Info().Msg("worker signing off") logger.Info().Msg("worker signing off")
w := requestWorkerOrPanic(e) w := requestWorkerOrPanic(e)
prevStatus := w.Status
w.Status = api.WorkerStatusOffline w.Status = api.WorkerStatusOffline
if w.StatusRequested == api.WorkerStatusShutdown { if w.StatusRequested == api.WorkerStatusShutdown {
w.StatusRequested = "" w.StatusRequested = ""
@ -163,6 +176,10 @@ func (f *Flamenco) SignOff(e echo.Context) error {
return sendAPIError(e, http.StatusInternalServerError, "error re-queueing your tasks") return sendAPIError(e, http.StatusInternalServerError, "error re-queueing your tasks")
} }
update := webupdates.NewWorkerUpdate(w)
update.PreviousStatus = &prevStatus
f.broadcaster.BroadcastWorkerUpdate(update)
return e.NoContent(http.StatusNoContent) return e.NoContent(http.StatusNoContent)
} }
@ -234,6 +251,7 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error {
Str("newStatus", string(req.Status)). Str("newStatus", string(req.Status)).
Logger() Logger()
prevStatus := w.Status
w.Status = req.Status w.Status = req.Status
if w.StatusRequested != "" && req.Status != w.StatusRequested { if w.StatusRequested != "" && req.Status != w.StatusRequested {
logger.Warn(). logger.Warn().
@ -254,6 +272,10 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error {
return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database") return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database")
} }
update := webupdates.NewWorkerUpdate(w)
update.PreviousStatus = &prevStatus
f.broadcaster.BroadcastWorkerUpdate(update)
return e.NoContent(http.StatusNoContent) return e.NoContent(http.StatusNoContent)
} }

View File

@ -96,6 +96,17 @@ func TestWorkerSignOn(t *testing.T) {
mf := newMockedFlamenco(mockCtrl) mf := newMockedFlamenco(mockCtrl)
worker := testWorker() worker := testWorker()
worker.Status = api.WorkerStatusOffline worker.Status = api.WorkerStatusOffline
prevStatus := worker.Status
mf.broadcaster.EXPECT().BroadcastWorkerUpdate(api.SocketIOWorkerUpdate{
Id: worker.UUID,
Nickname: "Lazy Boi",
PreviousStatus: &prevStatus,
Status: api.WorkerStatusStarting,
StatusRequested: nil,
Updated: worker.UpdatedAt,
Version: "3.0-testing",
})
mf.persistence.EXPECT().SaveWorker(gomock.Any(), &worker).Return(nil) mf.persistence.EXPECT().SaveWorker(gomock.Any(), &worker).Return(nil)
@ -167,6 +178,16 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) {
return nil return nil
}) })
prevStatus := api.WorkerStatusAwake
mf.broadcaster.EXPECT().BroadcastWorkerUpdate(api.SocketIOWorkerUpdate{
Id: worker.UUID,
Nickname: worker.Name,
PreviousStatus: &prevStatus,
Status: api.WorkerStatusOffline,
Updated: worker.UpdatedAt,
Version: worker.Software,
})
err := mf.flamenco.SignOff(echo) err := mf.flamenco.SignOff(echo)
assert.NoError(t, err) assert.NoError(t, err)
@ -174,6 +195,41 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) {
assert.Equal(t, http.StatusNoContent, resp.StatusCode) assert.Equal(t, http.StatusNoContent, resp.StatusCode)
} }
func TestWorkerStateChanged(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
worker := testWorker()
worker.Status = api.WorkerStatusStarting
prevStatus := worker.Status
// Expect a broadcast of the change
mf.broadcaster.EXPECT().BroadcastWorkerUpdate(api.SocketIOWorkerUpdate{
Id: worker.UUID,
Nickname: worker.Name,
PreviousStatus: &prevStatus,
Status: api.WorkerStatusAwake,
StatusRequested: nil,
Updated: worker.UpdatedAt,
Version: worker.Software,
})
// Expect the Worker to be saved with the new status
savedWorker := worker
savedWorker.Status = api.WorkerStatusAwake
mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil)
// Perform the request
echo := mf.prepareMockedJSONRequest(api.WorkerStateChanged{
Status: api.WorkerStatusAwake,
})
requestWorkerStore(echo, &worker)
err := mf.flamenco.WorkerStateChanged(echo)
assert.NoError(t, err)
assertResponseEmpty(t, echo)
}
func TestTaskUpdate(t *testing.T) { func TestTaskUpdate(t *testing.T) {
mockCtrl := gomock.NewController(t) mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish() defer mockCtrl.Finish()

View File

@ -21,8 +21,9 @@ const (
// Predefined SocketIO rooms. There will be others, but those will have a // Predefined SocketIO rooms. There will be others, but those will have a
// dynamic name like `job-fa48930a-105c-4125-a7f7-0aa1651dcd57` and cannot be // dynamic name like `job-fa48930a-105c-4125-a7f7-0aa1651dcd57` and cannot be
// listed here as constants. See `roomXXX()` functions for those. // listed here as constants. See `roomXXX()` functions for those.
SocketIORoomChat SocketIORoomName = "Chat" // For chat messages. SocketIORoomChat SocketIORoomName = "Chat" // For chat messages.
SocketIORoomJobs SocketIORoomName = "Jobs" // For job updates. SocketIORoomJobs SocketIORoomName = "Jobs" // For job updates.
SocketIORoomWorkers SocketIORoomName = "Workers" // For worker updates.
) )
const ( const (
@ -32,6 +33,7 @@ const (
SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.SocketIOJobUpdate SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.SocketIOJobUpdate
SIOEventTaskUpdate SocketIOEventType = "/task" // sends api.SocketIOTaskUpdate SIOEventTaskUpdate SocketIOEventType = "/task" // sends api.SocketIOTaskUpdate
SIOEventTaskLogUpdate SocketIOEventType = "/tasklog" // sends api.SocketIOTaskLogUpdate SIOEventTaskLogUpdate SocketIOEventType = "/tasklog" // sends api.SocketIOTaskLogUpdate
SIOEventWorkerUpdate SocketIOEventType = "/workers" // sends api.SocketIOWorkerUpdate
SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription
) )
@ -62,6 +64,8 @@ func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.Sock
switch subs.Type { switch subs.Type {
case api.SocketIOSubscriptionTypeAllJobs: case api.SocketIOSubscriptionTypeAllJobs:
sioRoom = SocketIORoomJobs sioRoom = SocketIORoomJobs
case api.SocketIOSubscriptionTypeAllWorkers:
sioRoom = SocketIORoomWorkers
case api.SocketIOSubscriptionTypeJob: case api.SocketIOSubscriptionTypeJob:
if subs.Uuid == nil { if subs.Uuid == nil {
logger.Warn().Msg("socketIO: trying to (un)subscribe to job without UUID") logger.Warn().Msg("socketIO: trying to (un)subscribe to job without UUID")

View File

@ -0,0 +1,41 @@
// SPDX-License-Identifier: GPL-3.0-or-later
package webupdates
import (
"github.com/rs/zerolog/log"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/pkg/api"
)
// NewWorkerUpdate returns a partial SocketIOWorkerUpdate struct for the given worker.
// It only fills in the fields that represent the current state of the worker. For
// example, it omits `PreviousStatus`. The ommitted fields can be filled in by
// the caller.
func NewWorkerUpdate(worker *persistence.Worker) api.SocketIOWorkerUpdate {
workerUpdate := api.SocketIOWorkerUpdate{
Id: worker.UUID,
Nickname: worker.Name,
Status: worker.Status,
Version: worker.Software,
Updated: worker.UpdatedAt,
}
return workerUpdate
}
// BroadcastWorkerUpdate sends the worker update to clients.
func (b *BiDirComms) BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) {
log.Debug().Interface("workerUpdate", workerUpdate).Msg("socketIO: broadcasting worker update")
b.BroadcastTo(SocketIORoomWorkers, SIOEventWorkerUpdate, workerUpdate)
}
// BroadcastNewWorker sends a "new worker" notification to clients.
func (b *BiDirComms) BroadcastNewWorker(workerUpdate api.SocketIOWorkerUpdate) {
if workerUpdate.PreviousStatus != nil {
log.Warn().Interface("workerUpdate", workerUpdate).Msg("socketIO: new workers should not have a previous state")
workerUpdate.PreviousStatus = nil
}
log.Debug().Interface("workerUpdate", workerUpdate).Msg("socketIO: broadcasting new worker")
b.BroadcastTo(SocketIORoomWorkers, SIOEventWorkerUpdate, workerUpdate)
}

View File

@ -13,7 +13,7 @@ const websocketURL = ws();
export default { export default {
emits: [ emits: [
// Data from Flamenco Manager: // Data from Flamenco Manager:
"jobUpdate", "taskUpdate", "taskLogUpdate", "message", "jobUpdate", "taskUpdate", "taskLogUpdate", "message", "workerUpdate",
// SocketIO events: // SocketIO events:
"sioReconnected", "sioDisconnected" "sioReconnected", "sioDisconnected"
], ],
@ -146,6 +146,13 @@ export default {
this.$emit("taskLogUpdate", apiTaskLogUpdate); this.$emit("taskLogUpdate", apiTaskLogUpdate);
}); });
this.socket.on("/workers", (workerUpdate) => {
// Convert to API object, in order to have the same parsing of data as
// when we'd do an API call.
const apiWorkerUpdate = API.SocketIOWorkerUpdate.constructFromObject(workerUpdate)
this.$emit("workerUpdate", apiWorkerUpdate);
});
// Chat system, useful for debugging. // Chat system, useful for debugging.
this.socket.on("/message", (message) => { this.socket.on("/message", (message) => {
this.$emit("message", message); this.$emit("message", message);

View File

@ -99,24 +99,24 @@ export default {
this.tabulator.setData(data.workers); this.tabulator.setData(data.workers);
this._refreshAvailableStatuses(); this._refreshAvailableStatuses();
}, },
// processWorkerUpdate(workerUpdate) { processWorkerUpdate(workerUpdate) {
// // updateData() will only overwrite properties that are actually set on // updateData() will only overwrite properties that are actually set on
// // workerUpdate, and leave the rest as-is. // workerUpdate, and leave the rest as-is.
// if (this.tabulator.initialized) { if (this.tabulator.initialized) {
// this.tabulator.updateData([workerUpdate]) this.tabulator.updateData([workerUpdate])
// .then(this.sortData); .then(this.sortData);
// } }
// this._refreshAvailableStatuses(); this._refreshAvailableStatuses();
// }, },
// processNewWorker(workerUpdate) { processNewWorker(workerUpdate) {
// if (this.tabulator.initialized) { if (this.tabulator.initialized) {
// this.tabulator.updateData([workerUpdate]) this.tabulator.updateData([workerUpdate])
// .then(this.sortData); .then(this.sortData);
// } }
// this.tabulator.addData([workerUpdate]) this.tabulator.addData([workerUpdate])
// .then(this.sortData); .then(this.sortData);
// this._refreshAvailableStatuses(); this._refreshAvailableStatuses();
// }, },
onRowClick(event, row) { onRowClick(event, row) {
// Take a copy of the data, so that it's decoupled from the tabulator data // Take a copy of the data, so that it's decoupled from the tabulator data

View File

@ -66,6 +66,18 @@ export const useNotifs = defineStore('notifications', {
this.add(msg) this.add(msg)
}, },
/**
* @param {API.SocketIOWorkerUpdate} workerUpdate Worker update received via SocketIO.
*/
addWorkerUpdate(workerUpdate) {
console.log('Received worker update:', workerUpdate);
let msg = `Worker ${workerUpdate.name}`;
if (workerUpdate.previous_status && workerUpdate.previous_status != workerUpdate.status) {
msg += ` changed status ${workerUpdate.previous_status}${workerUpdate.status}`;
}
this.add(msg)
},
/* Ensure there is only 1000 items in the history. */ /* Ensure there is only 1000 items in the history. */
_prune() { _prune() {
if (this.history.length <= 1000) return; if (this.history.length <= 1000) return;

View File

@ -7,7 +7,8 @@
</div> </div>
<footer> <footer>
<notification-bar /> <notification-bar />
<update-listener ref="updateListener" <update-listener ref="updateListener" mainSubscription="allWorkers"
@workerUpdate="onSIOWorkerUpdate"
@sioReconnected="onSIOReconnected" @sioDisconnected="onSIODisconnected" /> @sioReconnected="onSIOReconnected" @sioDisconnected="onSIODisconnected" />
</footer> </footer>
</template> </template>
@ -24,6 +25,7 @@
<script> <script>
import { WorkerMgtApi } from '@/manager-api'; import { WorkerMgtApi } from '@/manager-api';
import { useNotifs } from '@/stores/notifications'
import { useWorkers } from '@/stores/workers'; import { useWorkers } from '@/stores/workers';
import { apiClient } from '@/stores/api-query-count'; import { apiClient } from '@/stores/api-query-count';
@ -43,6 +45,7 @@ export default {
}, },
data: () => ({ data: () => ({
workers: useWorkers(), workers: useWorkers(),
notifs: useNotifs(),
api: new WorkerMgtApi(apiClient), api: new WorkerMgtApi(apiClient),
}), }),
mounted() { mounted() {
@ -57,9 +60,25 @@ export default {
methods: { methods: {
// SocketIO connection event handlers: // SocketIO connection event handlers:
onSIOReconnected() { onSIOReconnected() {
this.$refs.workersTable.onReconnected();
this._fetchWorker(this.workerID);
}, },
onSIODisconnected(reason) { onSIODisconnected(reason) {
}, },
onSIOWorkerUpdate(workerUpdate) {
this.notifs.addWorkerUpdate(workerUpdate);
if (this.$refs.workersTable) {
if (workerUpdate.previous_status)
this.$refs.workersTable.processWorkerUpdate(workerUpdate);
else
this.$refs.workersTable.processNewWorker(workerUpdate);
}
if (this.workerID != workerUpdate.id)
return;
this._fetchWorker(this.workerID);
},
onTableWorkerClicked(rowData) { onTableWorkerClicked(rowData) {
if (rowData.id == this.workerID) return; if (rowData.id == this.workerID) return;