diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index e341f3dc..8da0198f 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -38,6 +38,7 @@ import ( "projects.blender.org/studio/flamenco/internal/manager/timeout_checker" "projects.blender.org/studio/flamenco/internal/own_url" "projects.blender.org/studio/flamenco/internal/upnp_ssdp" + "projects.blender.org/studio/flamenco/pkg/api" "projects.blender.org/studio/flamenco/pkg/shaman" "projects.blender.org/studio/flamenco/pkg/sysinfo" ) @@ -56,6 +57,8 @@ const ( webappEntryPoint = "index.html" ) +type shutdownFunc func() + func main() { output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339} log.Logger = log.Output(output) @@ -188,7 +191,19 @@ func runFlamencoManager() bool { // once it closes. mainCtx, mainCtxCancel := context.WithCancel(context.Background()) - installSignalHandler(mainCtxCancel) + triggerShutdown := func() { + // Notify that Flamenco is shutting down. + event := eventbus.NewLifeCycleEvent(api.LifeCycleEventTypeManagerShutdown) + eventBroker.BroadcastLifeCycleEvent(event) + + // Give event bus some time to process the shutdown event. + time.Sleep(500 * time.Millisecond) + + // Cancel the main context, triggering an application-wide shutdown. + mainCtxCancel() + } + + installSignalHandler(triggerShutdown) if mqttClient != nil { mqttClient.Connect(mainCtx) @@ -274,11 +289,17 @@ func runFlamencoManager() bool { go openWebbrowser(mainCtx, urls[0]) } + // Notify that Flamenco has started. + { + event := eventbus.NewLifeCycleEvent(api.LifeCycleEventTypeManagerStartup) + eventBroker.BroadcastLifeCycleEvent(event) + } + // Allow the Flamenco API itself trigger a shutdown as well. log.Debug().Msg("waiting for a shutdown request from Flamenco") doRestart := flamenco.WaitForShutdown(mainCtx) log.Info().Bool("willRestart", doRestart).Msg("going to shut down the service") - mainCtxCancel() + triggerShutdown() wg.Wait() log.Info().Bool("willRestart", doRestart).Msg("Flamenco Manager service shut down") @@ -362,14 +383,14 @@ func openDB(configService config.Service) *persistence.DB { } // installSignalHandler spawns a goroutine that handles incoming POSIX signals. -func installSignalHandler(cancelFunc context.CancelFunc) { +func installSignalHandler(shutdownFunc shutdownFunc) { signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) signal.Notify(signals, syscall.SIGTERM) go func() { for signum := range signals { log.Info().Str("signal", signum.String()).Msg("signal received, shutting down") - cancelFunc() + shutdownFunc() } }() } diff --git a/internal/manager/eventbus/events_lifecycle.go b/internal/manager/eventbus/events_lifecycle.go new file mode 100644 index 00000000..7ab4149f --- /dev/null +++ b/internal/manager/eventbus/events_lifecycle.go @@ -0,0 +1,20 @@ +package eventbus + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "github.com/rs/zerolog/log" + "projects.blender.org/studio/flamenco/pkg/api" +) + +func NewLifeCycleEvent(lifeCycleType api.LifeCycleEventType) api.EventLifeCycle { + event := api.EventLifeCycle{ + Type: lifeCycleType, + } + return event +} + +func (b *Broker) BroadcastLifeCycleEvent(event api.EventLifeCycle) { + log.Debug().Interface("event", event).Msg("eventbus: broadcasting lifecycle event") + b.broadcast(TopicLifeCycle, event) +} diff --git a/internal/manager/eventbus/mqtt_client.go b/internal/manager/eventbus/mqtt_client.go index d074f7ca..a2a6b51c 100644 --- a/internal/manager/eventbus/mqtt_client.go +++ b/internal/manager/eventbus/mqtt_client.go @@ -20,7 +20,8 @@ const ( keepAlive = 30 // seconds connectRetryDelay = 10 * time.Second - mqttQoS = 1 + mqttQoS = 1 // QoS field for MQTT events. + mqttQueueSize = 10 // How many events are queued when there is no connection to an MQTT broker. ) type MQTTForwarder struct { @@ -30,6 +31,9 @@ type MQTTForwarder struct { // Context to use when publishing messages. ctx context.Context + + queue chan mqttQueuedMessage + queueCancel context.CancelFunc } var _ Forwarder = (*MQTTForwarder)(nil) @@ -44,6 +48,11 @@ type MQTTClientConfig struct { Password string `yaml:"password"` } +type mqttQueuedMessage struct { + topic string + payload []byte +} + func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder { config.BrokerURL = strings.TrimSpace(config.BrokerURL) config.ClientID = strings.TrimSpace(config.ClientID) @@ -66,6 +75,7 @@ func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder { client := MQTTForwarder{ topicPrefix: config.TopicPrefix, + queue: make(chan mqttQueuedMessage, mqttQueueSize), } client.config = autopaho.ClientConfig{ BrokerUrls: []*url.URL{brokerURL}, @@ -93,11 +103,14 @@ func (m *MQTTForwarder) Connect(ctx context.Context) { m.conn = conn m.ctx = ctx - } func (m *MQTTForwarder) onConnectionUp(connMgr *autopaho.ConnectionManager, connAck *paho.Connack) { m.logger().Info().Msg("mqtt client: connection established") + + queueCtx, queueCtxCancel := context.WithCancel(m.ctx) + m.queueCancel = queueCtxCancel + go m.queueRunner(queueCtx) } func (m *MQTTForwarder) onConnectionError(err error) { @@ -109,6 +122,10 @@ func (m *MQTTForwarder) onClientError(err error) { } func (m *MQTTForwarder) onServerDisconnect(d *paho.Disconnect) { + if m.queueCancel != nil { + m.queueCancel() + } + logEntry := m.logger().Warn() if d.Properties != nil { logEntry = logEntry.Str("reason", d.Properties.ReasonString) @@ -118,42 +135,70 @@ func (m *MQTTForwarder) onServerDisconnect(d *paho.Disconnect) { logEntry.Msg("mqtt client: broker requested disconnect") } +func (m *MQTTForwarder) queueRunner(queueRunnerCtx context.Context) { + m.logger().Debug().Msg("mqtt client: starting queue runner") + defer m.logger().Debug().Msg("mqtt client: stopping queue runner") + + for { + select { + case <-queueRunnerCtx.Done(): + return + case message := <-m.queue: + m.sendEvent(message) + } + } +} + func (m *MQTTForwarder) Broadcast(topic EventTopic, payload interface{}) { fullTopic := m.topicPrefix + string(topic) - logger := m.logger().With(). - Str("topic", fullTopic). - // Interface("event", payload). - Logger() - asJSON, err := json.Marshal(payload) if err != nil { - logger.Error().AnErr("cause", err).Interface("event", payload). + m.logger().Error(). + Str("topic", fullTopic). + AnErr("cause", err). + Interface("event", payload). Msg("mqtt client: could not convert event to JSON") return } - // Publish will block so we run it in a GoRoutine. - // TODO: might be a good idea todo this at the event broker level, rather than in this function. - go func(topic string, msg []byte) { - pr, err := m.conn.Publish(m.ctx, &paho.Publish{ - QoS: mqttQoS, - Topic: topic, - Payload: msg, - }) - switch { - case err != nil: - logger.Error().AnErr("cause", err).Msg("mqtt client: error publishing event") - return - case pr.ReasonCode == 16: - logger.Debug().Msg("mqtt client: event sent to broker, but there were no subscribers") - return - case pr.ReasonCode != 0: - logger.Warn().Int("reasonCode", int(pr.ReasonCode)).Msg("mqtt client: event rejected by mqtt broker") - default: - logger.Debug().Msg("mqtt client: event sent to broker") - } - }(fullTopic, asJSON) + // Queue the message, if we can. + message := mqttQueuedMessage{ + topic: fullTopic, + payload: asJSON, + } + select { + case m.queue <- message: + // All good, message is queued. + default: + m.logger().Error(). + Str("topic", fullTopic). + Msg("mqtt client: could not send event, queue is full") + } +} + +func (m *MQTTForwarder) sendEvent(message mqttQueuedMessage) { + logger := m.logger().With(). + Str("topic", message.topic). + Logger() + + pr, err := m.conn.Publish(m.ctx, &paho.Publish{ + QoS: mqttQoS, + Topic: message.topic, + Payload: message.payload, + }) + switch { + case err != nil: + logger.Error().AnErr("cause", err).Msg("mqtt client: error publishing event") + return + case pr.ReasonCode == 16: + logger.Debug().Msg("mqtt client: event sent to broker, but there were no subscribers") + return + case pr.ReasonCode != 0: + logger.Warn().Int("reasonCode", int(pr.ReasonCode)).Msg("mqtt client: event rejected by mqtt broker") + default: + logger.Debug().Msg("mqtt client: event sent to broker") + } } func (m *MQTTForwarder) logger() *zerolog.Logger { diff --git a/internal/manager/eventbus/topics.go b/internal/manager/eventbus/topics.go index 8da96716..ea2b6056 100644 --- a/internal/manager/eventbus/topics.go +++ b/internal/manager/eventbus/topics.go @@ -6,6 +6,7 @@ import "fmt" const ( // Topics on which events are published. + TopicLifeCycle EventTopic = "/lifecycle" // sends api.EventLifeCycle TopicJobUpdate EventTopic = "/jobs" // sends api.EventJobUpdate TopicLastRenderedImage EventTopic = "/last-rendered" // sends api.EventLastRenderedUpdate TopicTaskUpdate EventTopic = "/task" // sends api.EventTaskUpdate