diff --git a/internal/manager/webupdates/job_updates.go b/internal/manager/webupdates/job_updates.go index 64c1874b..04df7620 100644 --- a/internal/manager/webupdates/job_updates.go +++ b/internal/manager/webupdates/job_updates.go @@ -40,3 +40,13 @@ func (b *BiDirComms) BroadcastNewJob(jobUpdate api.JobUpdate) { log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting new job") b.BroadcastTo(SocketIORoomJobs, SIOEventJobUpdate, jobUpdate) } + +// roomForJob will return the SocketIO room name for the given job. Clients in +// this room will receive info scoped to this job, so for example updates to all +// tasks of this job. +// +// Note that `api.JobUpdate`s themselves are sent to all SocketIO clients, and +// not to this room. +func roomForJob(jobUUID string) SocketIORoomName { + return SocketIORoomName("job-" + jobUUID) +} diff --git a/internal/manager/webupdates/sio_rooms.go b/internal/manager/webupdates/sio_rooms.go index 57806ba8..c49f9abb 100644 --- a/internal/manager/webupdates/sio_rooms.go +++ b/internal/manager/webupdates/sio_rooms.go @@ -1,6 +1,14 @@ // SPDX-License-Identifier: GPL-3.0-or-later package webupdates +import ( + "fmt" + + "git.blender.org/flamenco/pkg/api" + "github.com/google/uuid" + gosocketio "github.com/graarh/golang-socketio" +) + type SocketIORoomName string const ( @@ -13,11 +21,59 @@ type SocketIOEventType string const ( // Predefined SocketIO event types. - SIOEventChatMessageRcv SocketIOEventType = "/chat" // clients send messages here - SIOEventChatMessageSend SocketIOEventType = "/message" // messages are broadcasted here - SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.JobUpdate + SIOEventChatMessageRcv SocketIOEventType = "/chat" // clients send chat messages here + SIOEventChatMessageSend SocketIOEventType = "/message" // chat messages are broadcasted here + SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.JobUpdate + SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription ) func (b *BiDirComms) BroadcastTo(room SocketIORoomName, eventType SocketIOEventType, payload interface{}) { b.sockserv.BroadcastTo(string(room), string(eventType), payload) } + +func (b *BiDirComms) registerRoomEventHandlers() { + _ = b.sockserv.On(string(SIOEventSubscription), b.handleRoomSubscription) +} + +func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.SocketIOSubscription) string { + logger := sioLogger(c) + logger = logger.With(). + Str("op", string(subs.Op)). + Str("type", string(subs.Type)). + Str("uuid", string(subs.Uuid)). + Logger() + + // Make sure the UUID is actually a valid one. + uuid, err := uuid.Parse(subs.Uuid) + if err != nil { + logger.Warn().Msg("socketIO: invalid UUID, ignoring subscription request") + return "invalid UUID, ignoring request" + } + + var sioRoom SocketIORoomName + switch subs.Type { + case api.SocketIOSubscriptionTypeJob: + sioRoom = roomForJob(uuid.String()) + default: + logger.Warn().Msg("socketIO: invalid subscription type, ignoring") + return "invalid subscription type, ignoring request" + } + + switch subs.Op { + case api.SocketIOSubscriptionOperationSubscribe: + err = c.Join(string(sioRoom)) + case api.SocketIOSubscriptionOperationUnsubscribe: + err = c.Leave(string(sioRoom)) + default: + logger.Warn().Msg("socketIO: invalid subscription operation, ignoring") + return "invalid subscription operation, ignoring request" + } + + if err != nil { + logger.Warn().Err(err).Msg("socketIO: performing subscription operation") + return fmt.Sprintf("unable to perform subscription operation: %v", err) + } + + logger.Info().Msg("socketIO: subscription") + return "ok" +} diff --git a/internal/manager/webupdates/webupdates.go b/internal/manager/webupdates/webupdates.go index 8fe45316..2e4a0085 100644 --- a/internal/manager/webupdates/webupdates.go +++ b/internal/manager/webupdates/webupdates.go @@ -59,6 +59,7 @@ func (b *BiDirComms) registerSIOEventHandlers() { }) b.registerChatEventHandlers() + b.registerRoomEventHandlers() } func sioLogger(c *gosocketio.Channel) zerolog.Logger { diff --git a/web/app/src/App.vue b/web/app/src/App.vue index c877d1e6..3ec26aa8 100644 --- a/web/app/src/App.vue +++ b/web/app/src/App.vue @@ -16,8 +16,9 @@ diff --git a/web/app/src/components/UpdateListener.vue b/web/app/src/components/UpdateListener.vue index 4291a9a2..081ce7cc 100644 --- a/web/app/src/components/UpdateListener.vue +++ b/web/app/src/components/UpdateListener.vue @@ -13,7 +13,7 @@ export default { // SocketIO events: "sioReconnected", "sioDisconnected" ], - props: ["websocketURL"], + props: ["websocketURL", "subscribedJob"], data() { return { socket: null, @@ -29,14 +29,31 @@ export default { beforeDestroy: function () { this.disconnectWebsocket(); }, + watch: { + subscribedJob(newJob, oldJob) { + if (oldJob) { + this._updateJobSubscription("unsubscribe", oldJob); + } + if (newJob) { + this._updateJobSubscription("subscribe", newJob); + } + }, + }, methods: { connectToWebsocket() { // The SocketIO client API docs are available at: // https://github.com/socketio/socket.io-client/blob/2.4.x/docs/API.md console.log("connecting JobsListener to WS", this.websocketURL); - this.socket = io(this.websocketURL, { + const ws = io(this.websocketURL, { transports: ["websocket"], }); + this.socket = ws; + + // For easy debugging. This assigns `ws` and not `this.socket`, as the + // latter is Vue-reactive, which gets in the way of using in the browser + // console. + window.ws = ws; + this.socket.on('connect_error', (error) => { // Don't log the error here, it's too long and noisy for regular logs. console.log("socketIO connection error"); @@ -64,6 +81,10 @@ export default { }); this.socket.on("reconnect", (attemptNumber) => { console.log("socketIO reconnected after", attemptNumber, "attempts"); + + // Resubscribe to whatever we want to be subscribed to: + if (this.subscribedJob) this._updateJobSubscription("subscribe", newJob); + this.$emit("sioReconnected", attemptNumber); }); @@ -74,6 +95,13 @@ export default { this.$emit("jobUpdate", apiJobUpdate); }); + this.socket.on("/task", (taskUpdate) => { + // Convert to API object, in order to have the same parsing of data as + // when we'd do an API call. + const apiTaskUpdate = API.SocketIOTaskUpdate.constructFromObject(taskUpdate) + this.$emit("taskUpdate", apiTaskUpdate); + }); + // Chat system, useful for debugging. this.socket.on("/message", (message) => { this.$emit("message", message); @@ -96,6 +124,12 @@ export default { console.log("sending broadcast message:", payload); this.socket.emit("/chat", payload); }, + + _updateJobSubscription(operation, jobID) { + const payload = new API.SocketIOSubscription(operation, "job", jobID); + console.log("sending job subscription:", payload); + this.socket.emit("/subscription", payload); + }, }, };