Add SocketIO subscription system for job-related updates
SocketIO clients can now send a message with `/subscription` event type in order to subscribe to or unsubscribe from job-related updates. These job-related updates themselves aren't sent yet, so this is a change that's impossible to really test. The socketIO code for joining/leaving rooms is called, though.
This commit is contained in:
parent
824425f466
commit
9b330280b7
@ -40,3 +40,13 @@ func (b *BiDirComms) BroadcastNewJob(jobUpdate api.JobUpdate) {
|
|||||||
log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting new job")
|
log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting new job")
|
||||||
b.BroadcastTo(SocketIORoomJobs, SIOEventJobUpdate, jobUpdate)
|
b.BroadcastTo(SocketIORoomJobs, SIOEventJobUpdate, jobUpdate)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// roomForJob will return the SocketIO room name for the given job. Clients in
|
||||||
|
// this room will receive info scoped to this job, so for example updates to all
|
||||||
|
// tasks of this job.
|
||||||
|
//
|
||||||
|
// Note that `api.JobUpdate`s themselves are sent to all SocketIO clients, and
|
||||||
|
// not to this room.
|
||||||
|
func roomForJob(jobUUID string) SocketIORoomName {
|
||||||
|
return SocketIORoomName("job-" + jobUUID)
|
||||||
|
}
|
||||||
|
@ -1,6 +1,14 @@
|
|||||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||||
package webupdates
|
package webupdates
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.blender.org/flamenco/pkg/api"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
gosocketio "github.com/graarh/golang-socketio"
|
||||||
|
)
|
||||||
|
|
||||||
type SocketIORoomName string
|
type SocketIORoomName string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -13,11 +21,59 @@ type SocketIOEventType string
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
// Predefined SocketIO event types.
|
// Predefined SocketIO event types.
|
||||||
SIOEventChatMessageRcv SocketIOEventType = "/chat" // clients send messages here
|
SIOEventChatMessageRcv SocketIOEventType = "/chat" // clients send chat messages here
|
||||||
SIOEventChatMessageSend SocketIOEventType = "/message" // messages are broadcasted here
|
SIOEventChatMessageSend SocketIOEventType = "/message" // chat messages are broadcasted here
|
||||||
SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.JobUpdate
|
SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.JobUpdate
|
||||||
|
SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription
|
||||||
)
|
)
|
||||||
|
|
||||||
func (b *BiDirComms) BroadcastTo(room SocketIORoomName, eventType SocketIOEventType, payload interface{}) {
|
func (b *BiDirComms) BroadcastTo(room SocketIORoomName, eventType SocketIOEventType, payload interface{}) {
|
||||||
b.sockserv.BroadcastTo(string(room), string(eventType), payload)
|
b.sockserv.BroadcastTo(string(room), string(eventType), payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *BiDirComms) registerRoomEventHandlers() {
|
||||||
|
_ = b.sockserv.On(string(SIOEventSubscription), b.handleRoomSubscription)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.SocketIOSubscription) string {
|
||||||
|
logger := sioLogger(c)
|
||||||
|
logger = logger.With().
|
||||||
|
Str("op", string(subs.Op)).
|
||||||
|
Str("type", string(subs.Type)).
|
||||||
|
Str("uuid", string(subs.Uuid)).
|
||||||
|
Logger()
|
||||||
|
|
||||||
|
// Make sure the UUID is actually a valid one.
|
||||||
|
uuid, err := uuid.Parse(subs.Uuid)
|
||||||
|
if err != nil {
|
||||||
|
logger.Warn().Msg("socketIO: invalid UUID, ignoring subscription request")
|
||||||
|
return "invalid UUID, ignoring request"
|
||||||
|
}
|
||||||
|
|
||||||
|
var sioRoom SocketIORoomName
|
||||||
|
switch subs.Type {
|
||||||
|
case api.SocketIOSubscriptionTypeJob:
|
||||||
|
sioRoom = roomForJob(uuid.String())
|
||||||
|
default:
|
||||||
|
logger.Warn().Msg("socketIO: invalid subscription type, ignoring")
|
||||||
|
return "invalid subscription type, ignoring request"
|
||||||
|
}
|
||||||
|
|
||||||
|
switch subs.Op {
|
||||||
|
case api.SocketIOSubscriptionOperationSubscribe:
|
||||||
|
err = c.Join(string(sioRoom))
|
||||||
|
case api.SocketIOSubscriptionOperationUnsubscribe:
|
||||||
|
err = c.Leave(string(sioRoom))
|
||||||
|
default:
|
||||||
|
logger.Warn().Msg("socketIO: invalid subscription operation, ignoring")
|
||||||
|
return "invalid subscription operation, ignoring request"
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logger.Warn().Err(err).Msg("socketIO: performing subscription operation")
|
||||||
|
return fmt.Sprintf("unable to perform subscription operation: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info().Msg("socketIO: subscription")
|
||||||
|
return "ok"
|
||||||
|
}
|
||||||
|
@ -59,6 +59,7 @@ func (b *BiDirComms) registerSIOEventHandlers() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
b.registerChatEventHandlers()
|
b.registerChatEventHandlers()
|
||||||
|
b.registerRoomEventHandlers()
|
||||||
}
|
}
|
||||||
|
|
||||||
func sioLogger(c *gosocketio.Channel) zerolog.Logger {
|
func sioLogger(c *gosocketio.Channel) zerolog.Logger {
|
||||||
|
@ -16,8 +16,9 @@
|
|||||||
</div>
|
</div>
|
||||||
<footer>
|
<footer>
|
||||||
<span class='notifications' v-if="notifs.last">{{ notifs.last.msg }}</span>
|
<span class='notifications' v-if="notifs.last">{{ notifs.last.msg }}</span>
|
||||||
<update-listener ref="updateListener" :websocketURL="websocketURL" @jobUpdate="onSioJobUpdate"
|
<update-listener ref="updateListener" :websocketURL="websocketURL" :subscribedJob="jobs.activeJobID"
|
||||||
@message="onChatMessage" @sioReconnected="onSIOReconnected" @sioDisconnected="onSIODisconnected" />
|
@jobUpdate="onSioJobUpdate" @message="onChatMessage" @sioReconnected="onSIOReconnected"
|
||||||
|
@sioDisconnected="onSIODisconnected" />
|
||||||
</footer>
|
</footer>
|
||||||
</template>
|
</template>
|
||||||
|
|
||||||
|
@ -13,7 +13,7 @@ export default {
|
|||||||
// SocketIO events:
|
// SocketIO events:
|
||||||
"sioReconnected", "sioDisconnected"
|
"sioReconnected", "sioDisconnected"
|
||||||
],
|
],
|
||||||
props: ["websocketURL"],
|
props: ["websocketURL", "subscribedJob"],
|
||||||
data() {
|
data() {
|
||||||
return {
|
return {
|
||||||
socket: null,
|
socket: null,
|
||||||
@ -29,14 +29,31 @@ export default {
|
|||||||
beforeDestroy: function () {
|
beforeDestroy: function () {
|
||||||
this.disconnectWebsocket();
|
this.disconnectWebsocket();
|
||||||
},
|
},
|
||||||
|
watch: {
|
||||||
|
subscribedJob(newJob, oldJob) {
|
||||||
|
if (oldJob) {
|
||||||
|
this._updateJobSubscription("unsubscribe", oldJob);
|
||||||
|
}
|
||||||
|
if (newJob) {
|
||||||
|
this._updateJobSubscription("subscribe", newJob);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
methods: {
|
methods: {
|
||||||
connectToWebsocket() {
|
connectToWebsocket() {
|
||||||
// The SocketIO client API docs are available at:
|
// The SocketIO client API docs are available at:
|
||||||
// https://github.com/socketio/socket.io-client/blob/2.4.x/docs/API.md
|
// https://github.com/socketio/socket.io-client/blob/2.4.x/docs/API.md
|
||||||
console.log("connecting JobsListener to WS", this.websocketURL);
|
console.log("connecting JobsListener to WS", this.websocketURL);
|
||||||
this.socket = io(this.websocketURL, {
|
const ws = io(this.websocketURL, {
|
||||||
transports: ["websocket"],
|
transports: ["websocket"],
|
||||||
});
|
});
|
||||||
|
this.socket = ws;
|
||||||
|
|
||||||
|
// For easy debugging. This assigns `ws` and not `this.socket`, as the
|
||||||
|
// latter is Vue-reactive, which gets in the way of using in the browser
|
||||||
|
// console.
|
||||||
|
window.ws = ws;
|
||||||
|
|
||||||
this.socket.on('connect_error', (error) => {
|
this.socket.on('connect_error', (error) => {
|
||||||
// Don't log the error here, it's too long and noisy for regular logs.
|
// Don't log the error here, it's too long and noisy for regular logs.
|
||||||
console.log("socketIO connection error");
|
console.log("socketIO connection error");
|
||||||
@ -64,6 +81,10 @@ export default {
|
|||||||
});
|
});
|
||||||
this.socket.on("reconnect", (attemptNumber) => {
|
this.socket.on("reconnect", (attemptNumber) => {
|
||||||
console.log("socketIO reconnected after", attemptNumber, "attempts");
|
console.log("socketIO reconnected after", attemptNumber, "attempts");
|
||||||
|
|
||||||
|
// Resubscribe to whatever we want to be subscribed to:
|
||||||
|
if (this.subscribedJob) this._updateJobSubscription("subscribe", newJob);
|
||||||
|
|
||||||
this.$emit("sioReconnected", attemptNumber);
|
this.$emit("sioReconnected", attemptNumber);
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -74,6 +95,13 @@ export default {
|
|||||||
this.$emit("jobUpdate", apiJobUpdate);
|
this.$emit("jobUpdate", apiJobUpdate);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
this.socket.on("/task", (taskUpdate) => {
|
||||||
|
// Convert to API object, in order to have the same parsing of data as
|
||||||
|
// when we'd do an API call.
|
||||||
|
const apiTaskUpdate = API.SocketIOTaskUpdate.constructFromObject(taskUpdate)
|
||||||
|
this.$emit("taskUpdate", apiTaskUpdate);
|
||||||
|
});
|
||||||
|
|
||||||
// Chat system, useful for debugging.
|
// Chat system, useful for debugging.
|
||||||
this.socket.on("/message", (message) => {
|
this.socket.on("/message", (message) => {
|
||||||
this.$emit("message", message);
|
this.$emit("message", message);
|
||||||
@ -96,6 +124,12 @@ export default {
|
|||||||
console.log("sending broadcast message:", payload);
|
console.log("sending broadcast message:", payload);
|
||||||
this.socket.emit("/chat", payload);
|
this.socket.emit("/chat", payload);
|
||||||
},
|
},
|
||||||
|
|
||||||
|
_updateJobSubscription(operation, jobID) {
|
||||||
|
const payload = new API.SocketIOSubscription(operation, "job", jobID);
|
||||||
|
console.log("sending job subscription:", payload);
|
||||||
|
this.socket.emit("/subscription", payload);
|
||||||
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
</script>
|
</script>
|
||||||
|
Loading…
x
Reference in New Issue
Block a user