
Add a PUT method for `/api/v3/configuration/file`, which entirely replaces `flamenco-manager.yaml` with the received JSON payload. This will be used in the future to store configuration edited in the web frontend. Ref: #99426 Reviewed-on: https://projects.blender.org/studio/flamenco/pulls/104406
224 lines
5.7 KiB
Go
224 lines
5.7 KiB
Go
package eventbus
|
|
|
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/eclipse/paho.golang/autopaho"
|
|
"github.com/eclipse/paho.golang/paho"
|
|
"github.com/rs/zerolog"
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"projects.blender.org/studio/flamenco/pkg/api"
|
|
)
|
|
|
|
const (
|
|
MQTTDefaultTopicPrefix = "flamenco"
|
|
MQTTDefaultClientID = "flamenco"
|
|
|
|
keepAlive = 30 // seconds
|
|
connectRetryDelay = 10 * time.Second
|
|
|
|
mqttQoS = 1 // QoS field for MQTT events.
|
|
mqttQueueSize = 10 // How many events are queued when there is no connection to an MQTT broker.
|
|
)
|
|
|
|
type MQTTForwarder struct {
|
|
config autopaho.ClientConfig
|
|
conn *autopaho.ConnectionManager
|
|
topicPrefix string
|
|
|
|
// Context to use when publishing messages.
|
|
ctx context.Context
|
|
|
|
queue chan mqttQueuedMessage
|
|
queueCancel context.CancelFunc
|
|
}
|
|
|
|
var _ Forwarder = (*MQTTForwarder)(nil)
|
|
|
|
// MQTTClientConfig contains the MQTT client configuration.
|
|
type MQTTClientConfig struct {
|
|
BrokerURL string `json:"broker" yaml:"broker"`
|
|
ClientID string `json:"clientID" yaml:"clientID"`
|
|
TopicPrefix string `json:"topic_prefix" yaml:"topic_prefix"`
|
|
|
|
Username string `json:"username" yaml:"username"`
|
|
Password string `json:"password" yaml:"password"`
|
|
}
|
|
|
|
type mqttQueuedMessage struct {
|
|
topic string
|
|
payload []byte
|
|
}
|
|
|
|
func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder {
|
|
config.BrokerURL = strings.TrimSpace(config.BrokerURL)
|
|
config.ClientID = strings.TrimSpace(config.ClientID)
|
|
|
|
if config.BrokerURL == "" {
|
|
return nil
|
|
}
|
|
if config.ClientID == "" {
|
|
config.ClientID = MQTTDefaultClientID
|
|
}
|
|
|
|
brokerURL, err := url.Parse(config.BrokerURL)
|
|
if err != nil {
|
|
log.Error().
|
|
Err(err).
|
|
Str("brokerURL", config.BrokerURL).
|
|
Msg("mqtt client: could not parse MQTT broker URL, skipping creation of MQTT client")
|
|
return nil
|
|
}
|
|
|
|
client := MQTTForwarder{
|
|
topicPrefix: config.TopicPrefix,
|
|
queue: make(chan mqttQueuedMessage, mqttQueueSize),
|
|
}
|
|
client.config = autopaho.ClientConfig{
|
|
BrokerUrls: []*url.URL{brokerURL},
|
|
KeepAlive: keepAlive,
|
|
ConnectRetryDelay: connectRetryDelay,
|
|
OnConnectionUp: client.onConnectionUp,
|
|
OnConnectError: client.onConnectionError,
|
|
Debug: paho.NOOPLogger{},
|
|
ClientConfig: paho.ClientConfig{
|
|
ClientID: config.ClientID,
|
|
OnClientError: client.onClientError,
|
|
OnServerDisconnect: client.onServerDisconnect,
|
|
},
|
|
}
|
|
client.config.SetUsernamePassword(config.Username, []byte(config.Password))
|
|
return &client
|
|
}
|
|
|
|
func (m *MQTTForwarder) Connect(ctx context.Context) {
|
|
m.logger().Debug().Msg("mqtt client: connecting to broker")
|
|
conn, err := autopaho.NewConnection(ctx, m.config)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
m.conn = conn
|
|
m.ctx = ctx
|
|
}
|
|
|
|
func (m *MQTTForwarder) onConnectionUp(connMgr *autopaho.ConnectionManager, connAck *paho.Connack) {
|
|
m.logger().Info().Msg("mqtt client: connection established")
|
|
|
|
queueCtx, queueCtxCancel := context.WithCancel(m.ctx)
|
|
m.queueCancel = queueCtxCancel
|
|
go m.queueRunner(queueCtx)
|
|
}
|
|
|
|
func (m *MQTTForwarder) onConnectionError(err error) {
|
|
m.logger().Warn().AnErr("cause", err).Msg("mqtt client: could not connect to MQTT broker")
|
|
}
|
|
|
|
func (m *MQTTForwarder) onClientError(err error) {
|
|
m.logger().Warn().AnErr("cause", err).Msg("mqtt client: broker requested disconnect")
|
|
}
|
|
|
|
func (m *MQTTForwarder) onServerDisconnect(d *paho.Disconnect) {
|
|
if m.queueCancel != nil {
|
|
m.queueCancel()
|
|
}
|
|
|
|
logEntry := m.logger().Warn()
|
|
if d.Properties != nil {
|
|
logEntry = logEntry.Str("reason", d.Properties.ReasonString)
|
|
} else {
|
|
logEntry = logEntry.Int("reasonCode", int(d.ReasonCode))
|
|
}
|
|
logEntry.Msg("mqtt client: broker requested disconnect")
|
|
}
|
|
|
|
func (m *MQTTForwarder) queueRunner(queueRunnerCtx context.Context) {
|
|
m.logger().Debug().Msg("mqtt client: starting queue runner")
|
|
defer m.logger().Debug().Msg("mqtt client: stopping queue runner")
|
|
|
|
for {
|
|
select {
|
|
case <-queueRunnerCtx.Done():
|
|
return
|
|
case message := <-m.queue:
|
|
m.sendEvent(message)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *MQTTForwarder) Broadcast(topic EventTopic, payload interface{}) {
|
|
if _, ok := payload.(api.EventTaskLogUpdate); ok {
|
|
// Task log updates aren't sent through MQTT, as that can generate a lot of traffic.
|
|
return
|
|
}
|
|
|
|
fullTopic := m.topicPrefix + string(topic)
|
|
|
|
asJSON, err := json.Marshal(payload)
|
|
if err != nil {
|
|
m.logger().Error().
|
|
Str("topic", fullTopic).
|
|
AnErr("cause", err).
|
|
Interface("event", payload).
|
|
Msg("mqtt client: could not convert event to JSON")
|
|
return
|
|
}
|
|
|
|
// Queue the message, if we can.
|
|
message := mqttQueuedMessage{
|
|
topic: fullTopic,
|
|
payload: asJSON,
|
|
}
|
|
select {
|
|
case m.queue <- message:
|
|
// All good, message is queued.
|
|
default:
|
|
m.logger().Error().
|
|
Str("topic", fullTopic).
|
|
Msg("mqtt client: could not send event, queue is full")
|
|
}
|
|
}
|
|
|
|
func (m *MQTTForwarder) sendEvent(message mqttQueuedMessage) {
|
|
logger := m.logger().With().
|
|
Str("topic", message.topic).
|
|
Logger()
|
|
|
|
pr, err := m.conn.Publish(m.ctx, &paho.Publish{
|
|
QoS: mqttQoS,
|
|
Topic: message.topic,
|
|
Payload: message.payload,
|
|
})
|
|
switch {
|
|
case err != nil:
|
|
logger.Error().AnErr("cause", err).Msg("mqtt client: error publishing event")
|
|
return
|
|
case pr.ReasonCode == 16:
|
|
logger.Debug().Msg("mqtt client: event sent to broker, but there were no subscribers")
|
|
return
|
|
case pr.ReasonCode != 0:
|
|
logger.Warn().Int("reasonCode", int(pr.ReasonCode)).Msg("mqtt client: event rejected by mqtt broker")
|
|
default:
|
|
logger.Debug().Msg("mqtt client: event sent to broker")
|
|
}
|
|
}
|
|
|
|
func (m *MQTTForwarder) logger() *zerolog.Logger {
|
|
logCtx := log.With()
|
|
|
|
if len(m.config.BrokerUrls) > 0 {
|
|
// Assumption: there's no more than one broker URL.
|
|
logCtx = logCtx.Stringer("broker", m.config.BrokerUrls[0])
|
|
}
|
|
|
|
logger := logCtx.Logger()
|
|
return &logger
|
|
}
|