diff --git a/internal/manager/eventbus/socketio.go b/internal/manager/eventbus/socketio.go index 234972e1..2997b73b 100644 --- a/internal/manager/eventbus/socketio.go +++ b/internal/manager/eventbus/socketio.go @@ -3,6 +3,7 @@ package eventbus // SPDX-License-Identifier: GPL-3.0-or-later import ( + "errors" "fmt" "reflect" @@ -122,44 +123,37 @@ func (s *SocketIOForwarder) handleRoomSubscription(c *gosocketio.Channel, subs a return "invalid UUID, ignoring request" } - var sioRoom EventTopic + var err error switch subs.Type { case api.SocketIOSubscriptionTypeAllJobs: - sioRoom = TopicJobUpdate + err = s.subUnsub(c, TopicJobUpdate, subs.Op) case api.SocketIOSubscriptionTypeAllWorkers: - sioRoom = TopicWorkerUpdate + err = s.subUnsub(c, TopicWorkerUpdate, subs.Op) case api.SocketIOSubscriptionTypeAllLastRendered: - sioRoom = TopicLastRenderedImage + err = s.subUnsub(c, TopicLastRenderedImage, subs.Op) case api.SocketIOSubscriptionTypeAllWorkerTags: - sioRoom = TopicWorkerTagUpdate + err = s.subUnsub(c, TopicWorkerTagUpdate, subs.Op) case api.SocketIOSubscriptionTypeJob: if subs.Uuid == nil { logger.Warn().Msg("socketIO: trying to (un)subscribe to job without UUID") return "operation on job requires a UUID" } - sioRoom = topicForJob(*subs.Uuid) + logger.Trace().Msg("socketio: sub subscription, also going to do last-rendered for that job") + err = s.subUnsub(c, topicForJob(*subs.Uuid), subs.Op) + if err == nil { + err = s.subUnsub(c, topicForJobLastRendered(*subs.Uuid), subs.Op) + } case api.SocketIOSubscriptionTypeTasklog: if subs.Uuid == nil { logger.Warn().Msg("socketIO: trying to (un)subscribe to task without UUID") return "operation on task requires a UUID" } - sioRoom = topicForTaskLog(*subs.Uuid) + err = s.subUnsub(c, topicForJob(*subs.Uuid), subs.Op) default: logger.Warn().Msg("socketIO: unknown subscription type, ignoring") return "unknown subscription type, ignoring request" } - var err error - switch subs.Op { - case api.SocketIOSubscriptionOperationSubscribe: - err = c.Join(string(sioRoom)) - case api.SocketIOSubscriptionOperationUnsubscribe: - err = c.Leave(string(sioRoom)) - default: - logger.Warn().Msg("socketIO: invalid subscription operation, ignoring") - return "invalid subscription operation, ignoring request" - } - if err != nil { logger.Warn().Err(err).Msg("socketIO: performing subscription operation") return fmt.Sprintf("unable to perform subscription operation: %v", err) @@ -168,3 +162,19 @@ func (s *SocketIOForwarder) handleRoomSubscription(c *gosocketio.Channel, subs a logger.Debug().Msg("socketIO: subscription") return "ok" } + +func (s *SocketIOForwarder) subUnsub( + c *gosocketio.Channel, + topic EventTopic, + operation api.SocketIOSubscriptionOperation, +) error { + room := string(topic) + switch operation { + case api.SocketIOSubscriptionOperationSubscribe: + return c.Join(room) + case api.SocketIOSubscriptionOperationUnsubscribe: + return c.Leave(room) + default: + return errors.New("invalid subscription operation, ignoring request") + } +}