From 2b8939920631e17726c047bf56dc78ec45ca9427 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Tue, 31 May 2022 15:00:37 +0200 Subject: [PATCH] Manager: handle `allJobs` subscription SocketIO clients no longer automatically subscribe to the jobs updates. This is now done explicitly via the `allJobs` subscription type, and unsubscribing is also possible. --- internal/manager/webupdates/sio_rooms.go | 26 +++++++++++++++++------ internal/manager/webupdates/webupdates.go | 1 - web/app/src/components/UpdateListener.vue | 26 ++++++++++++++++++++++- web/app/src/views/JobsView.vue | 2 +- 4 files changed, 45 insertions(+), 10 deletions(-) diff --git a/internal/manager/webupdates/sio_rooms.go b/internal/manager/webupdates/sio_rooms.go index 6dc4610f..b10ba86e 100644 --- a/internal/manager/webupdates/sio_rooms.go +++ b/internal/manager/webupdates/sio_rooms.go @@ -45,23 +45,35 @@ func (b *BiDirComms) registerRoomEventHandlers() { func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.SocketIOSubscription) string { logger := sioLogger(c) - logger = logger.With(). + logCtx := logger.With(). Str("op", string(subs.Op)). - Str("type", string(subs.Type)). - Str("uuid", string(subs.Uuid)). - Logger() + Str("type", string(subs.Type)) + if subs.Uuid != nil { + logCtx = logCtx.Str("uuid", string(*subs.Uuid)) + } + logger = logCtx.Logger() - if !uuid.IsValid(subs.Uuid) { + if subs.Uuid != nil && !uuid.IsValid(*subs.Uuid) { logger.Warn().Msg("socketIO: invalid UUID, ignoring subscription request") return "invalid UUID, ignoring request" } var sioRoom SocketIORoomName switch subs.Type { + case api.SocketIOSubscriptionTypeAllJobs: + sioRoom = SocketIORoomJobs case api.SocketIOSubscriptionTypeJob: - sioRoom = roomForJob(subs.Uuid) + if subs.Uuid == nil { + logger.Warn().Msg("socketIO: trying to (un)subscribe to job without UUID") + return "operation on job requires a UUID" + } + sioRoom = roomForJob(*subs.Uuid) case api.SocketIOSubscriptionTypeTasklog: - sioRoom = roomForTaskLog(subs.Uuid) + if subs.Uuid == nil { + logger.Warn().Msg("socketIO: trying to (un)subscribe to task without UUID") + return "operation on task requires a UUID" + } + sioRoom = roomForTaskLog(*subs.Uuid) default: logger.Warn().Msg("socketIO: unknown subscription type, ignoring") return "unknown subscription type, ignoring request" diff --git a/internal/manager/webupdates/webupdates.go b/internal/manager/webupdates/webupdates.go index 2e4a0085..6b31b4d8 100644 --- a/internal/manager/webupdates/webupdates.go +++ b/internal/manager/webupdates/webupdates.go @@ -44,7 +44,6 @@ func (b *BiDirComms) registerSIOEventHandlers() { logger := sioLogger(c) logger.Debug().Msg("socketIO: connected") _ = c.Join(string(SocketIORoomChat)) // All clients connect to the chat room. - _ = c.Join(string(SocketIORoomJobs)) // All clients subscribe to job updates. }) // socket disconnection diff --git a/web/app/src/components/UpdateListener.vue b/web/app/src/components/UpdateListener.vue index a183eeb8..b25ae6b5 100644 --- a/web/app/src/components/UpdateListener.vue +++ b/web/app/src/components/UpdateListener.vue @@ -17,7 +17,11 @@ export default { // SocketIO events: "sioReconnected", "sioDisconnected" ], - props: ["subscribedJobID", "subscribedTaskID"], + props: [ + "mainSubscription", // One of the 'allXXX' subscription types, see `SocketIOSubscriptionType` in `flamenco-openapi.yaml`. + "subscribedJobID", + "subscribedTaskID", + ], data() { return { socket: null, @@ -54,6 +58,14 @@ export default { this._updateTaskLogSubscription("subscribe", newTaskID); } }, + mainSubscription(newType, oldType) { + if (oldType) { + this._updateMainSubscription("unsubscribe", oldType); + } + if (newType) { + this._updateMainSubscription("subscribe", newType); + } + }, }, methods: { connectToWebsocket() { @@ -157,6 +169,17 @@ export default { this.socket.emit("/chat", payload); }, + /** + * Send main subscription (un)subscription request. + * @param {string} operation either "subscribe" or "unsubscribe" + * @param {string} type see `SocketIOSubscriptionType` in `flamenco-openapi.yaml`. + */ + _updateMainSubscription(operation, type) { + const payload = new API.SocketIOSubscription(operation, type); + console.log(`sending ${type} ${operation}:`, payload); + this.socket.emit("/subscription", payload); + }, + /** * Send job (un)subscription request. * @param {string} operation either "subscribe" or "unsubscribe" @@ -183,6 +206,7 @@ export default { _resubscribe() { if (this.subscribedJobID) this._updateJobSubscription("subscribe", this.subscribedJobID); if (this.subscribedTaskID) this._updateTaskLogSubscription("subscribe", this.subscribedTaskID); + if (this.mainSubscription) this._updateMainSubscription("subscribe", this.mainSubscription); }, }, }; diff --git a/web/app/src/views/JobsView.vue b/web/app/src/views/JobsView.vue index 0e6503f1..c2f9dc93 100644 --- a/web/app/src/views/JobsView.vue +++ b/web/app/src/views/JobsView.vue @@ -13,7 +13,7 @@ -