Sybren A. Stüvel cbafacdff6 Manager: don't forward task log updates to MQTT
Task log updates are big and frequent, and should not be sent via MQTT.
At least not until we have a practical reason to do so.
2024-03-07 15:22:44 +01:00

222 lines
5.6 KiB
Go

package eventbus
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"encoding/json"
"net/url"
"strings"
"time"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"projects.blender.org/studio/flamenco/pkg/api"
)
const (
defaultClientID = "flamenco"
keepAlive = 30 // seconds
connectRetryDelay = 10 * time.Second
mqttQoS = 1 // QoS field for MQTT events.
mqttQueueSize = 10 // How many events are queued when there is no connection to an MQTT broker.
)
type MQTTForwarder struct {
config autopaho.ClientConfig
conn *autopaho.ConnectionManager
topicPrefix string
// Context to use when publishing messages.
ctx context.Context
queue chan mqttQueuedMessage
queueCancel context.CancelFunc
}
var _ Forwarder = (*MQTTForwarder)(nil)
// MQTTClientConfig contains the MQTT client configuration.
type MQTTClientConfig struct {
BrokerURL string `yaml:"broker"`
ClientID string `yaml:"clientID"`
TopicPrefix string `yaml:"topic_prefix"`
Username string `yaml:"username"`
Password string `yaml:"password"`
}
type mqttQueuedMessage struct {
topic string
payload []byte
}
func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder {
config.BrokerURL = strings.TrimSpace(config.BrokerURL)
config.ClientID = strings.TrimSpace(config.ClientID)
if config.BrokerURL == "" {
return nil
}
if config.ClientID == "" {
config.ClientID = defaultClientID
}
brokerURL, err := url.Parse(config.BrokerURL)
if err != nil {
log.Error().
Err(err).
Str("brokerURL", config.BrokerURL).
Msg("mqtt client: could not parse MQTT broker URL, skipping creation of MQTT client")
return nil
}
client := MQTTForwarder{
topicPrefix: config.TopicPrefix,
queue: make(chan mqttQueuedMessage, mqttQueueSize),
}
client.config = autopaho.ClientConfig{
BrokerUrls: []*url.URL{brokerURL},
KeepAlive: keepAlive,
ConnectRetryDelay: connectRetryDelay,
OnConnectionUp: client.onConnectionUp,
OnConnectError: client.onConnectionError,
Debug: paho.NOOPLogger{},
ClientConfig: paho.ClientConfig{
ClientID: config.ClientID,
OnClientError: client.onClientError,
OnServerDisconnect: client.onServerDisconnect,
},
}
client.config.SetUsernamePassword(config.Username, []byte(config.Password))
return &client
}
func (m *MQTTForwarder) Connect(ctx context.Context) {
m.logger().Debug().Msg("mqtt client: connecting to broker")
conn, err := autopaho.NewConnection(ctx, m.config)
if err != nil {
panic(err)
}
m.conn = conn
m.ctx = ctx
}
func (m *MQTTForwarder) onConnectionUp(connMgr *autopaho.ConnectionManager, connAck *paho.Connack) {
m.logger().Info().Msg("mqtt client: connection established")
queueCtx, queueCtxCancel := context.WithCancel(m.ctx)
m.queueCancel = queueCtxCancel
go m.queueRunner(queueCtx)
}
func (m *MQTTForwarder) onConnectionError(err error) {
m.logger().Warn().AnErr("cause", err).Msg("mqtt client: could not connect to MQTT broker")
}
func (m *MQTTForwarder) onClientError(err error) {
m.logger().Warn().AnErr("cause", err).Msg("mqtt client: broker requested disconnect")
}
func (m *MQTTForwarder) onServerDisconnect(d *paho.Disconnect) {
if m.queueCancel != nil {
m.queueCancel()
}
logEntry := m.logger().Warn()
if d.Properties != nil {
logEntry = logEntry.Str("reason", d.Properties.ReasonString)
} else {
logEntry = logEntry.Int("reasonCode", int(d.ReasonCode))
}
logEntry.Msg("mqtt client: broker requested disconnect")
}
func (m *MQTTForwarder) queueRunner(queueRunnerCtx context.Context) {
m.logger().Debug().Msg("mqtt client: starting queue runner")
defer m.logger().Debug().Msg("mqtt client: stopping queue runner")
for {
select {
case <-queueRunnerCtx.Done():
return
case message := <-m.queue:
m.sendEvent(message)
}
}
}
func (m *MQTTForwarder) Broadcast(topic EventTopic, payload interface{}) {
if _, ok := payload.(api.EventTaskLogUpdate); ok {
// Task log updates aren't sent through MQTT, as that can generate a lot of traffic.
return
}
fullTopic := m.topicPrefix + string(topic)
asJSON, err := json.Marshal(payload)
if err != nil {
m.logger().Error().
Str("topic", fullTopic).
AnErr("cause", err).
Interface("event", payload).
Msg("mqtt client: could not convert event to JSON")
return
}
// Queue the message, if we can.
message := mqttQueuedMessage{
topic: fullTopic,
payload: asJSON,
}
select {
case m.queue <- message:
// All good, message is queued.
default:
m.logger().Error().
Str("topic", fullTopic).
Msg("mqtt client: could not send event, queue is full")
}
}
func (m *MQTTForwarder) sendEvent(message mqttQueuedMessage) {
logger := m.logger().With().
Str("topic", message.topic).
Logger()
pr, err := m.conn.Publish(m.ctx, &paho.Publish{
QoS: mqttQoS,
Topic: message.topic,
Payload: message.payload,
})
switch {
case err != nil:
logger.Error().AnErr("cause", err).Msg("mqtt client: error publishing event")
return
case pr.ReasonCode == 16:
logger.Debug().Msg("mqtt client: event sent to broker, but there were no subscribers")
return
case pr.ReasonCode != 0:
logger.Warn().Int("reasonCode", int(pr.ReasonCode)).Msg("mqtt client: event rejected by mqtt broker")
default:
logger.Debug().Msg("mqtt client: event sent to broker")
}
}
func (m *MQTTForwarder) logger() *zerolog.Logger {
logCtx := log.With()
if len(m.config.BrokerUrls) > 0 {
// Assumption: there's no more than one broker URL.
logCtx = logCtx.Stringer("broker", m.config.BrokerUrls[0])
}
logger := logCtx.Logger()
return &logger
}