
Send events on Manager startup & shutdown. To make this possible, events sent to MQTT are now queued up until an MQTT server can be reached. Otherwise the startup event would be sent before the MQTT connection was established.
215 lines
5.4 KiB
Go
215 lines
5.4 KiB
Go
package eventbus
|
|
|
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/eclipse/paho.golang/autopaho"
|
|
"github.com/eclipse/paho.golang/paho"
|
|
"github.com/rs/zerolog"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
const (
|
|
defaultClientID = "flamenco"
|
|
keepAlive = 30 // seconds
|
|
connectRetryDelay = 10 * time.Second
|
|
|
|
mqttQoS = 1 // QoS field for MQTT events.
|
|
mqttQueueSize = 10 // How many events are queued when there is no connection to an MQTT broker.
|
|
)
|
|
|
|
type MQTTForwarder struct {
|
|
config autopaho.ClientConfig
|
|
conn *autopaho.ConnectionManager
|
|
topicPrefix string
|
|
|
|
// Context to use when publishing messages.
|
|
ctx context.Context
|
|
|
|
queue chan mqttQueuedMessage
|
|
queueCancel context.CancelFunc
|
|
}
|
|
|
|
var _ Forwarder = (*MQTTForwarder)(nil)
|
|
|
|
// MQTTClientConfig contains the MQTT client configuration.
|
|
type MQTTClientConfig struct {
|
|
BrokerURL string `yaml:"broker"`
|
|
ClientID string `yaml:"clientID"`
|
|
TopicPrefix string `yaml:"topic_prefix"`
|
|
|
|
Username string `yaml:"username"`
|
|
Password string `yaml:"password"`
|
|
}
|
|
|
|
type mqttQueuedMessage struct {
|
|
topic string
|
|
payload []byte
|
|
}
|
|
|
|
func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder {
|
|
config.BrokerURL = strings.TrimSpace(config.BrokerURL)
|
|
config.ClientID = strings.TrimSpace(config.ClientID)
|
|
|
|
if config.BrokerURL == "" {
|
|
return nil
|
|
}
|
|
if config.ClientID == "" {
|
|
config.ClientID = defaultClientID
|
|
}
|
|
|
|
brokerURL, err := url.Parse(config.BrokerURL)
|
|
if err != nil {
|
|
log.Error().
|
|
Err(err).
|
|
Str("brokerURL", config.BrokerURL).
|
|
Msg("mqtt client: could not parse MQTT broker URL, skipping creation of MQTT client")
|
|
return nil
|
|
}
|
|
|
|
client := MQTTForwarder{
|
|
topicPrefix: config.TopicPrefix,
|
|
queue: make(chan mqttQueuedMessage, mqttQueueSize),
|
|
}
|
|
client.config = autopaho.ClientConfig{
|
|
BrokerUrls: []*url.URL{brokerURL},
|
|
KeepAlive: keepAlive,
|
|
ConnectRetryDelay: connectRetryDelay,
|
|
OnConnectionUp: client.onConnectionUp,
|
|
OnConnectError: client.onConnectionError,
|
|
Debug: paho.NOOPLogger{},
|
|
ClientConfig: paho.ClientConfig{
|
|
ClientID: config.ClientID,
|
|
OnClientError: client.onClientError,
|
|
OnServerDisconnect: client.onServerDisconnect,
|
|
},
|
|
}
|
|
client.config.SetUsernamePassword(config.Username, []byte(config.Password))
|
|
return &client
|
|
}
|
|
|
|
func (m *MQTTForwarder) Connect(ctx context.Context) {
|
|
m.logger().Debug().Msg("mqtt client: connecting to broker")
|
|
conn, err := autopaho.NewConnection(ctx, m.config)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
m.conn = conn
|
|
m.ctx = ctx
|
|
}
|
|
|
|
func (m *MQTTForwarder) onConnectionUp(connMgr *autopaho.ConnectionManager, connAck *paho.Connack) {
|
|
m.logger().Info().Msg("mqtt client: connection established")
|
|
|
|
queueCtx, queueCtxCancel := context.WithCancel(m.ctx)
|
|
m.queueCancel = queueCtxCancel
|
|
go m.queueRunner(queueCtx)
|
|
}
|
|
|
|
func (m *MQTTForwarder) onConnectionError(err error) {
|
|
m.logger().Warn().AnErr("cause", err).Msg("mqtt client: could not connect to MQTT broker")
|
|
}
|
|
|
|
func (m *MQTTForwarder) onClientError(err error) {
|
|
m.logger().Warn().AnErr("cause", err).Msg("mqtt client: broker requested disconnect")
|
|
}
|
|
|
|
func (m *MQTTForwarder) onServerDisconnect(d *paho.Disconnect) {
|
|
if m.queueCancel != nil {
|
|
m.queueCancel()
|
|
}
|
|
|
|
logEntry := m.logger().Warn()
|
|
if d.Properties != nil {
|
|
logEntry = logEntry.Str("reason", d.Properties.ReasonString)
|
|
} else {
|
|
logEntry = logEntry.Int("reasonCode", int(d.ReasonCode))
|
|
}
|
|
logEntry.Msg("mqtt client: broker requested disconnect")
|
|
}
|
|
|
|
func (m *MQTTForwarder) queueRunner(queueRunnerCtx context.Context) {
|
|
m.logger().Debug().Msg("mqtt client: starting queue runner")
|
|
defer m.logger().Debug().Msg("mqtt client: stopping queue runner")
|
|
|
|
for {
|
|
select {
|
|
case <-queueRunnerCtx.Done():
|
|
return
|
|
case message := <-m.queue:
|
|
m.sendEvent(message)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *MQTTForwarder) Broadcast(topic EventTopic, payload interface{}) {
|
|
fullTopic := m.topicPrefix + string(topic)
|
|
|
|
asJSON, err := json.Marshal(payload)
|
|
if err != nil {
|
|
m.logger().Error().
|
|
Str("topic", fullTopic).
|
|
AnErr("cause", err).
|
|
Interface("event", payload).
|
|
Msg("mqtt client: could not convert event to JSON")
|
|
return
|
|
}
|
|
|
|
// Queue the message, if we can.
|
|
message := mqttQueuedMessage{
|
|
topic: fullTopic,
|
|
payload: asJSON,
|
|
}
|
|
select {
|
|
case m.queue <- message:
|
|
// All good, message is queued.
|
|
default:
|
|
m.logger().Error().
|
|
Str("topic", fullTopic).
|
|
Msg("mqtt client: could not send event, queue is full")
|
|
}
|
|
}
|
|
|
|
func (m *MQTTForwarder) sendEvent(message mqttQueuedMessage) {
|
|
logger := m.logger().With().
|
|
Str("topic", message.topic).
|
|
Logger()
|
|
|
|
pr, err := m.conn.Publish(m.ctx, &paho.Publish{
|
|
QoS: mqttQoS,
|
|
Topic: message.topic,
|
|
Payload: message.payload,
|
|
})
|
|
switch {
|
|
case err != nil:
|
|
logger.Error().AnErr("cause", err).Msg("mqtt client: error publishing event")
|
|
return
|
|
case pr.ReasonCode == 16:
|
|
logger.Debug().Msg("mqtt client: event sent to broker, but there were no subscribers")
|
|
return
|
|
case pr.ReasonCode != 0:
|
|
logger.Warn().Int("reasonCode", int(pr.ReasonCode)).Msg("mqtt client: event rejected by mqtt broker")
|
|
default:
|
|
logger.Debug().Msg("mqtt client: event sent to broker")
|
|
}
|
|
}
|
|
|
|
func (m *MQTTForwarder) logger() *zerolog.Logger {
|
|
logCtx := log.With()
|
|
|
|
if len(m.config.BrokerUrls) > 0 {
|
|
// Assumption: there's no more than one broker URL.
|
|
logCtx = logCtx.Stringer("broker", m.config.BrokerUrls[0])
|
|
}
|
|
|
|
logger := logCtx.Logger()
|
|
return &logger
|
|
}
|