From 12bfa82854088e10f9e8cf9b8936e0dbabceaeab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Wed, 21 Feb 2024 22:20:44 +0100 Subject: [PATCH] Manager: add lifecycle events to the event bus Send events on Manager startup & shutdown. To make this possible, events sent to MQTT are now queued up until an MQTT server can be reached. Otherwise the startup event would be sent before the MQTT connection was established. --- cmd/flamenco-manager/main.go | 29 ++++- internal/manager/eventbus/events_lifecycle.go | 20 ++++ internal/manager/eventbus/mqtt_client.go | 103 +++++++++++++----- internal/manager/eventbus/topics.go | 1 + 4 files changed, 120 insertions(+), 33 deletions(-) create mode 100644 internal/manager/eventbus/events_lifecycle.go diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index e341f3dc..8da0198f 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -38,6 +38,7 @@ import ( "projects.blender.org/studio/flamenco/internal/manager/timeout_checker" "projects.blender.org/studio/flamenco/internal/own_url" "projects.blender.org/studio/flamenco/internal/upnp_ssdp" + "projects.blender.org/studio/flamenco/pkg/api" "projects.blender.org/studio/flamenco/pkg/shaman" "projects.blender.org/studio/flamenco/pkg/sysinfo" ) @@ -56,6 +57,8 @@ const ( webappEntryPoint = "index.html" ) +type shutdownFunc func() + func main() { output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339} log.Logger = log.Output(output) @@ -188,7 +191,19 @@ func runFlamencoManager() bool { // once it closes. mainCtx, mainCtxCancel := context.WithCancel(context.Background()) - installSignalHandler(mainCtxCancel) + triggerShutdown := func() { + // Notify that Flamenco is shutting down. + event := eventbus.NewLifeCycleEvent(api.LifeCycleEventTypeManagerShutdown) + eventBroker.BroadcastLifeCycleEvent(event) + + // Give event bus some time to process the shutdown event. + time.Sleep(500 * time.Millisecond) + + // Cancel the main context, triggering an application-wide shutdown. + mainCtxCancel() + } + + installSignalHandler(triggerShutdown) if mqttClient != nil { mqttClient.Connect(mainCtx) @@ -274,11 +289,17 @@ func runFlamencoManager() bool { go openWebbrowser(mainCtx, urls[0]) } + // Notify that Flamenco has started. + { + event := eventbus.NewLifeCycleEvent(api.LifeCycleEventTypeManagerStartup) + eventBroker.BroadcastLifeCycleEvent(event) + } + // Allow the Flamenco API itself trigger a shutdown as well. log.Debug().Msg("waiting for a shutdown request from Flamenco") doRestart := flamenco.WaitForShutdown(mainCtx) log.Info().Bool("willRestart", doRestart).Msg("going to shut down the service") - mainCtxCancel() + triggerShutdown() wg.Wait() log.Info().Bool("willRestart", doRestart).Msg("Flamenco Manager service shut down") @@ -362,14 +383,14 @@ func openDB(configService config.Service) *persistence.DB { } // installSignalHandler spawns a goroutine that handles incoming POSIX signals. -func installSignalHandler(cancelFunc context.CancelFunc) { +func installSignalHandler(shutdownFunc shutdownFunc) { signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) signal.Notify(signals, syscall.SIGTERM) go func() { for signum := range signals { log.Info().Str("signal", signum.String()).Msg("signal received, shutting down") - cancelFunc() + shutdownFunc() } }() } diff --git a/internal/manager/eventbus/events_lifecycle.go b/internal/manager/eventbus/events_lifecycle.go new file mode 100644 index 00000000..7ab4149f --- /dev/null +++ b/internal/manager/eventbus/events_lifecycle.go @@ -0,0 +1,20 @@ +package eventbus + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "github.com/rs/zerolog/log" + "projects.blender.org/studio/flamenco/pkg/api" +) + +func NewLifeCycleEvent(lifeCycleType api.LifeCycleEventType) api.EventLifeCycle { + event := api.EventLifeCycle{ + Type: lifeCycleType, + } + return event +} + +func (b *Broker) BroadcastLifeCycleEvent(event api.EventLifeCycle) { + log.Debug().Interface("event", event).Msg("eventbus: broadcasting lifecycle event") + b.broadcast(TopicLifeCycle, event) +} diff --git a/internal/manager/eventbus/mqtt_client.go b/internal/manager/eventbus/mqtt_client.go index d074f7ca..a2a6b51c 100644 --- a/internal/manager/eventbus/mqtt_client.go +++ b/internal/manager/eventbus/mqtt_client.go @@ -20,7 +20,8 @@ const ( keepAlive = 30 // seconds connectRetryDelay = 10 * time.Second - mqttQoS = 1 + mqttQoS = 1 // QoS field for MQTT events. + mqttQueueSize = 10 // How many events are queued when there is no connection to an MQTT broker. ) type MQTTForwarder struct { @@ -30,6 +31,9 @@ type MQTTForwarder struct { // Context to use when publishing messages. ctx context.Context + + queue chan mqttQueuedMessage + queueCancel context.CancelFunc } var _ Forwarder = (*MQTTForwarder)(nil) @@ -44,6 +48,11 @@ type MQTTClientConfig struct { Password string `yaml:"password"` } +type mqttQueuedMessage struct { + topic string + payload []byte +} + func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder { config.BrokerURL = strings.TrimSpace(config.BrokerURL) config.ClientID = strings.TrimSpace(config.ClientID) @@ -66,6 +75,7 @@ func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder { client := MQTTForwarder{ topicPrefix: config.TopicPrefix, + queue: make(chan mqttQueuedMessage, mqttQueueSize), } client.config = autopaho.ClientConfig{ BrokerUrls: []*url.URL{brokerURL}, @@ -93,11 +103,14 @@ func (m *MQTTForwarder) Connect(ctx context.Context) { m.conn = conn m.ctx = ctx - } func (m *MQTTForwarder) onConnectionUp(connMgr *autopaho.ConnectionManager, connAck *paho.Connack) { m.logger().Info().Msg("mqtt client: connection established") + + queueCtx, queueCtxCancel := context.WithCancel(m.ctx) + m.queueCancel = queueCtxCancel + go m.queueRunner(queueCtx) } func (m *MQTTForwarder) onConnectionError(err error) { @@ -109,6 +122,10 @@ func (m *MQTTForwarder) onClientError(err error) { } func (m *MQTTForwarder) onServerDisconnect(d *paho.Disconnect) { + if m.queueCancel != nil { + m.queueCancel() + } + logEntry := m.logger().Warn() if d.Properties != nil { logEntry = logEntry.Str("reason", d.Properties.ReasonString) @@ -118,42 +135,70 @@ func (m *MQTTForwarder) onServerDisconnect(d *paho.Disconnect) { logEntry.Msg("mqtt client: broker requested disconnect") } +func (m *MQTTForwarder) queueRunner(queueRunnerCtx context.Context) { + m.logger().Debug().Msg("mqtt client: starting queue runner") + defer m.logger().Debug().Msg("mqtt client: stopping queue runner") + + for { + select { + case <-queueRunnerCtx.Done(): + return + case message := <-m.queue: + m.sendEvent(message) + } + } +} + func (m *MQTTForwarder) Broadcast(topic EventTopic, payload interface{}) { fullTopic := m.topicPrefix + string(topic) - logger := m.logger().With(). - Str("topic", fullTopic). - // Interface("event", payload). - Logger() - asJSON, err := json.Marshal(payload) if err != nil { - logger.Error().AnErr("cause", err).Interface("event", payload). + m.logger().Error(). + Str("topic", fullTopic). + AnErr("cause", err). + Interface("event", payload). Msg("mqtt client: could not convert event to JSON") return } - // Publish will block so we run it in a GoRoutine. - // TODO: might be a good idea todo this at the event broker level, rather than in this function. - go func(topic string, msg []byte) { - pr, err := m.conn.Publish(m.ctx, &paho.Publish{ - QoS: mqttQoS, - Topic: topic, - Payload: msg, - }) - switch { - case err != nil: - logger.Error().AnErr("cause", err).Msg("mqtt client: error publishing event") - return - case pr.ReasonCode == 16: - logger.Debug().Msg("mqtt client: event sent to broker, but there were no subscribers") - return - case pr.ReasonCode != 0: - logger.Warn().Int("reasonCode", int(pr.ReasonCode)).Msg("mqtt client: event rejected by mqtt broker") - default: - logger.Debug().Msg("mqtt client: event sent to broker") - } - }(fullTopic, asJSON) + // Queue the message, if we can. + message := mqttQueuedMessage{ + topic: fullTopic, + payload: asJSON, + } + select { + case m.queue <- message: + // All good, message is queued. + default: + m.logger().Error(). + Str("topic", fullTopic). + Msg("mqtt client: could not send event, queue is full") + } +} + +func (m *MQTTForwarder) sendEvent(message mqttQueuedMessage) { + logger := m.logger().With(). + Str("topic", message.topic). + Logger() + + pr, err := m.conn.Publish(m.ctx, &paho.Publish{ + QoS: mqttQoS, + Topic: message.topic, + Payload: message.payload, + }) + switch { + case err != nil: + logger.Error().AnErr("cause", err).Msg("mqtt client: error publishing event") + return + case pr.ReasonCode == 16: + logger.Debug().Msg("mqtt client: event sent to broker, but there were no subscribers") + return + case pr.ReasonCode != 0: + logger.Warn().Int("reasonCode", int(pr.ReasonCode)).Msg("mqtt client: event rejected by mqtt broker") + default: + logger.Debug().Msg("mqtt client: event sent to broker") + } } func (m *MQTTForwarder) logger() *zerolog.Logger { diff --git a/internal/manager/eventbus/topics.go b/internal/manager/eventbus/topics.go index 8da96716..ea2b6056 100644 --- a/internal/manager/eventbus/topics.go +++ b/internal/manager/eventbus/topics.go @@ -6,6 +6,7 @@ import "fmt" const ( // Topics on which events are published. + TopicLifeCycle EventTopic = "/lifecycle" // sends api.EventLifeCycle TopicJobUpdate EventTopic = "/jobs" // sends api.EventJobUpdate TopicLastRenderedImage EventTopic = "/last-rendered" // sends api.EventLastRenderedUpdate TopicTaskUpdate EventTopic = "/task" // sends api.EventTaskUpdate