Manager: handle allJobs subscription

SocketIO clients no longer automatically subscribe to the jobs updates.
This is now done explicitly via the `allJobs` subscription type, and
unsubscribing is also possible.
This commit is contained in:
Sybren A. Stüvel 2022-05-31 15:00:37 +02:00
parent 0fc0d1d0e0
commit 2b89399206
4 changed files with 45 additions and 10 deletions

View File

@ -45,23 +45,35 @@ func (b *BiDirComms) registerRoomEventHandlers() {
func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.SocketIOSubscription) string { func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.SocketIOSubscription) string {
logger := sioLogger(c) logger := sioLogger(c)
logger = logger.With(). logCtx := logger.With().
Str("op", string(subs.Op)). Str("op", string(subs.Op)).
Str("type", string(subs.Type)). Str("type", string(subs.Type))
Str("uuid", string(subs.Uuid)). if subs.Uuid != nil {
Logger() logCtx = logCtx.Str("uuid", string(*subs.Uuid))
}
logger = logCtx.Logger()
if !uuid.IsValid(subs.Uuid) { if subs.Uuid != nil && !uuid.IsValid(*subs.Uuid) {
logger.Warn().Msg("socketIO: invalid UUID, ignoring subscription request") logger.Warn().Msg("socketIO: invalid UUID, ignoring subscription request")
return "invalid UUID, ignoring request" return "invalid UUID, ignoring request"
} }
var sioRoom SocketIORoomName var sioRoom SocketIORoomName
switch subs.Type { switch subs.Type {
case api.SocketIOSubscriptionTypeAllJobs:
sioRoom = SocketIORoomJobs
case api.SocketIOSubscriptionTypeJob: case api.SocketIOSubscriptionTypeJob:
sioRoom = roomForJob(subs.Uuid) if subs.Uuid == nil {
logger.Warn().Msg("socketIO: trying to (un)subscribe to job without UUID")
return "operation on job requires a UUID"
}
sioRoom = roomForJob(*subs.Uuid)
case api.SocketIOSubscriptionTypeTasklog: case api.SocketIOSubscriptionTypeTasklog:
sioRoom = roomForTaskLog(subs.Uuid) if subs.Uuid == nil {
logger.Warn().Msg("socketIO: trying to (un)subscribe to task without UUID")
return "operation on task requires a UUID"
}
sioRoom = roomForTaskLog(*subs.Uuid)
default: default:
logger.Warn().Msg("socketIO: unknown subscription type, ignoring") logger.Warn().Msg("socketIO: unknown subscription type, ignoring")
return "unknown subscription type, ignoring request" return "unknown subscription type, ignoring request"

View File

@ -44,7 +44,6 @@ func (b *BiDirComms) registerSIOEventHandlers() {
logger := sioLogger(c) logger := sioLogger(c)
logger.Debug().Msg("socketIO: connected") logger.Debug().Msg("socketIO: connected")
_ = c.Join(string(SocketIORoomChat)) // All clients connect to the chat room. _ = c.Join(string(SocketIORoomChat)) // All clients connect to the chat room.
_ = c.Join(string(SocketIORoomJobs)) // All clients subscribe to job updates.
}) })
// socket disconnection // socket disconnection

View File

@ -17,7 +17,11 @@ export default {
// SocketIO events: // SocketIO events:
"sioReconnected", "sioDisconnected" "sioReconnected", "sioDisconnected"
], ],
props: ["subscribedJobID", "subscribedTaskID"], props: [
"mainSubscription", // One of the 'allXXX' subscription types, see `SocketIOSubscriptionType` in `flamenco-openapi.yaml`.
"subscribedJobID",
"subscribedTaskID",
],
data() { data() {
return { return {
socket: null, socket: null,
@ -54,6 +58,14 @@ export default {
this._updateTaskLogSubscription("subscribe", newTaskID); this._updateTaskLogSubscription("subscribe", newTaskID);
} }
}, },
mainSubscription(newType, oldType) {
if (oldType) {
this._updateMainSubscription("unsubscribe", oldType);
}
if (newType) {
this._updateMainSubscription("subscribe", newType);
}
},
}, },
methods: { methods: {
connectToWebsocket() { connectToWebsocket() {
@ -157,6 +169,17 @@ export default {
this.socket.emit("/chat", payload); this.socket.emit("/chat", payload);
}, },
/**
* Send main subscription (un)subscription request.
* @param {string} operation either "subscribe" or "unsubscribe"
* @param {string} type see `SocketIOSubscriptionType` in `flamenco-openapi.yaml`.
*/
_updateMainSubscription(operation, type) {
const payload = new API.SocketIOSubscription(operation, type);
console.log(`sending ${type} ${operation}:`, payload);
this.socket.emit("/subscription", payload);
},
/** /**
* Send job (un)subscription request. * Send job (un)subscription request.
* @param {string} operation either "subscribe" or "unsubscribe" * @param {string} operation either "subscribe" or "unsubscribe"
@ -183,6 +206,7 @@ export default {
_resubscribe() { _resubscribe() {
if (this.subscribedJobID) this._updateJobSubscription("subscribe", this.subscribedJobID); if (this.subscribedJobID) this._updateJobSubscription("subscribe", this.subscribedJobID);
if (this.subscribedTaskID) this._updateTaskLogSubscription("subscribe", this.subscribedTaskID); if (this.subscribedTaskID) this._updateTaskLogSubscription("subscribe", this.subscribedTaskID);
if (this.mainSubscription) this._updateMainSubscription("subscribe", this.mainSubscription);
}, },
}, },
}; };

View File

@ -13,7 +13,7 @@
<footer class="window-footer" v-if="!showFooterPopup" @click="showFooterPopup = true"><notification-bar /></footer> <footer class="window-footer" v-if="!showFooterPopup" @click="showFooterPopup = true"><notification-bar /></footer>
<footer-popup v-if="showFooterPopup" ref="footerPopup" @clickClose="showFooterPopup = false" /> <footer-popup v-if="showFooterPopup" ref="footerPopup" @clickClose="showFooterPopup = false" />
<update-listener ref="updateListener" <update-listener ref="updateListener" mainSubscription="allJobs"
:subscribedJobID="jobID" :subscribedTaskID="taskID" :subscribedJobID="jobID" :subscribedTaskID="taskID"
@jobUpdate="onSioJobUpdate" @taskUpdate="onSioTaskUpdate" @taskLogUpdate="onSioTaskLogUpdate" @jobUpdate="onSioJobUpdate" @taskUpdate="onSioTaskUpdate" @taskLogUpdate="onSioTaskLogUpdate"
@message="onChatMessage" @message="onChatMessage"