Sybren A. Stüvel ed71042073 Add MQTT Server for debugging purposes
Add an MQTT server (aka 'broker') that logs all published messages. This
is purely for debugging Flamenco Manager's events. Although it should
work fine as MQTT broker, it's not meant for production use.

Run the server with `go run ./cmd/mqtt-server`
2025-09-02 09:41:42 +02:00

223 lines
5.7 KiB
Go

package eventbus
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"encoding/json"
"net/url"
"strings"
"time"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"projects.blender.org/studio/flamenco/pkg/api"
)
const (
MQTTDefaultTopicPrefix = "flamenco"
MQTTDefaultClientID = "flamenco"
keepAlive = 30 // seconds
connectRetryDelay = 10 * time.Second
mqttQoS = 1 // QoS field for MQTT events.
mqttQueueSize = 10 // How many events are queued when there is no connection to an MQTT broker.
)
type MQTTForwarder struct {
config autopaho.ClientConfig
conn *autopaho.ConnectionManager
topicPrefix string
// Context to use when publishing messages.
ctx context.Context
queue chan mqttQueuedMessage
queueCancel context.CancelFunc
}
var _ Forwarder = (*MQTTForwarder)(nil)
// MQTTClientConfig contains the MQTT client configuration.
type MQTTClientConfig struct {
BrokerURL string `json:"broker" yaml:"broker"`
ClientID string `json:"clientID" yaml:"clientID"`
TopicPrefix string `json:"topic_prefix" yaml:"topic_prefix"`
Username string `json:"username" yaml:"username"`
Password string `json:"password" yaml:"password"`
}
type mqttQueuedMessage struct {
topic string
payload []byte
}
func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder {
config.BrokerURL = strings.TrimSpace(config.BrokerURL)
config.ClientID = strings.TrimSpace(config.ClientID)
if config.BrokerURL == "" {
return nil
}
if config.ClientID == "" {
config.ClientID = MQTTDefaultClientID
}
brokerURL, err := url.Parse(config.BrokerURL)
if err != nil {
log.Error().
Err(err).
Str("brokerURL", config.BrokerURL).
Msg("mqtt client: could not parse MQTT broker URL, skipping creation of MQTT client")
return nil
}
client := MQTTForwarder{
topicPrefix: config.TopicPrefix,
queue: make(chan mqttQueuedMessage, mqttQueueSize),
}
client.config = autopaho.ClientConfig{
BrokerUrls: []*url.URL{brokerURL},
KeepAlive: keepAlive,
ConnectRetryDelay: connectRetryDelay,
OnConnectionUp: client.onConnectionUp,
OnConnectError: client.onConnectionError,
ClientConfig: paho.ClientConfig{
ClientID: config.ClientID,
OnClientError: client.onClientError,
OnServerDisconnect: client.onServerDisconnect,
},
}
client.config.SetUsernamePassword(config.Username, []byte(config.Password))
return &client
}
func (m *MQTTForwarder) Connect(ctx context.Context) {
m.logger().Debug().Msg("mqtt client: connecting to broker")
conn, err := autopaho.NewConnection(ctx, m.config)
if err != nil {
panic(err)
}
m.conn = conn
m.ctx = ctx
}
func (m *MQTTForwarder) onConnectionUp(connMgr *autopaho.ConnectionManager, connAck *paho.Connack) {
m.logger().Info().Msg("mqtt client: connection established")
queueCtx, queueCtxCancel := context.WithCancel(m.ctx)
m.queueCancel = queueCtxCancel
go m.queueRunner(queueCtx)
}
func (m *MQTTForwarder) onConnectionError(err error) {
m.logger().Warn().AnErr("cause", err).Msg("mqtt client: could not connect to MQTT broker")
}
func (m *MQTTForwarder) onClientError(err error) {
m.logger().Warn().AnErr("cause", err).Msg("mqtt client: broker requested disconnect")
}
func (m *MQTTForwarder) onServerDisconnect(d *paho.Disconnect) {
if m.queueCancel != nil {
m.queueCancel()
}
logEntry := m.logger().Warn()
if d.Properties != nil {
logEntry = logEntry.Str("reason", d.Properties.ReasonString)
} else {
logEntry = logEntry.Int("reasonCode", int(d.ReasonCode))
}
logEntry.Msg("mqtt client: broker requested disconnect")
}
func (m *MQTTForwarder) queueRunner(queueRunnerCtx context.Context) {
m.logger().Debug().Msg("mqtt client: starting queue runner")
defer m.logger().Debug().Msg("mqtt client: stopping queue runner")
for {
select {
case <-queueRunnerCtx.Done():
return
case message := <-m.queue:
m.sendEvent(message)
}
}
}
func (m *MQTTForwarder) Broadcast(topic EventTopic, payload interface{}) {
if _, ok := payload.(api.EventTaskLogUpdate); ok {
// Task log updates aren't sent through MQTT, as that can generate a lot of traffic.
return
}
fullTopic := m.topicPrefix + string(topic)
asJSON, err := json.Marshal(payload)
if err != nil {
m.logger().Error().
Str("topic", fullTopic).
AnErr("cause", err).
Interface("event", payload).
Msg("mqtt client: could not convert event to JSON")
return
}
// Queue the message, if we can.
message := mqttQueuedMessage{
topic: fullTopic,
payload: asJSON,
}
select {
case m.queue <- message:
// All good, message is queued.
default:
m.logger().Error().
Str("topic", fullTopic).
Msg("mqtt client: could not send event, queue is full")
}
}
func (m *MQTTForwarder) sendEvent(message mqttQueuedMessage) {
logger := m.logger().With().
Str("topic", message.topic).
Logger()
pr, err := m.conn.Publish(m.ctx, &paho.Publish{
QoS: mqttQoS,
Topic: message.topic,
Payload: message.payload,
})
switch {
case err != nil:
logger.Error().AnErr("cause", err).Msg("mqtt client: error publishing event")
return
case pr.ReasonCode == 16:
logger.Debug().Msg("mqtt client: event sent to broker, but there were no subscribers")
return
case pr.ReasonCode != 0:
logger.Warn().Int("reasonCode", int(pr.ReasonCode)).Msg("mqtt client: event rejected by mqtt broker")
default:
logger.Debug().Msg("mqtt client: event sent to broker")
}
}
func (m *MQTTForwarder) logger() *zerolog.Logger {
logCtx := log.With()
if len(m.config.BrokerUrls) > 0 {
// Assumption: there's no more than one broker URL.
logCtx = logCtx.Stringer("broker", m.config.BrokerUrls[0])
}
logger := logCtx.Logger()
return &logger
}