diff --git a/internal/manager/eventbus/mqtt_client.go b/internal/manager/eventbus/mqtt_client.go index ad7d64ee..d074f7ca 100644 --- a/internal/manager/eventbus/mqtt_client.go +++ b/internal/manager/eventbus/mqtt_client.go @@ -11,6 +11,7 @@ import ( "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -54,12 +55,12 @@ func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder { config.ClientID = defaultClientID } - serverURL, err := url.Parse(config.BrokerURL) + brokerURL, err := url.Parse(config.BrokerURL) if err != nil { log.Error(). Err(err). - Str("mqttServerURL", config.BrokerURL). - Msg("could not parse MQTT server URL, skipping creation of MQTT client") + Str("brokerURL", config.BrokerURL). + Msg("mqtt client: could not parse MQTT broker URL, skipping creation of MQTT client") return nil } @@ -67,7 +68,7 @@ func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder { topicPrefix: config.TopicPrefix, } client.config = autopaho.ClientConfig{ - BrokerUrls: []*url.URL{serverURL}, + BrokerUrls: []*url.URL{brokerURL}, KeepAlive: keepAlive, ConnectRetryDelay: connectRetryDelay, OnConnectionUp: client.onConnectionUp, @@ -84,7 +85,7 @@ func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder { } func (m *MQTTForwarder) Connect(ctx context.Context) { - log.Debug().Msg("mqtt client: connecting") + m.logger().Debug().Msg("mqtt client: connecting to broker") conn, err := autopaho.NewConnection(ctx, m.config) if err != nil { panic(err) @@ -92,34 +93,35 @@ func (m *MQTTForwarder) Connect(ctx context.Context) { m.conn = conn m.ctx = ctx + } func (m *MQTTForwarder) onConnectionUp(connMgr *autopaho.ConnectionManager, connAck *paho.Connack) { - log.Info().Msg("mqtt client: connection established") + m.logger().Info().Msg("mqtt client: connection established") } func (m *MQTTForwarder) onConnectionError(err error) { - log.Warn().AnErr("cause", err).Msg("mqtt client: could not connect to MQTT server") + m.logger().Warn().AnErr("cause", err).Msg("mqtt client: could not connect to MQTT broker") } func (m *MQTTForwarder) onClientError(err error) { - log.Warn().AnErr("cause", err).Msg("mqtt client: server requested disconnect") + m.logger().Warn().AnErr("cause", err).Msg("mqtt client: broker requested disconnect") } func (m *MQTTForwarder) onServerDisconnect(d *paho.Disconnect) { - logEntry := log.Warn() + logEntry := m.logger().Warn() if d.Properties != nil { logEntry = logEntry.Str("reason", d.Properties.ReasonString) } else { logEntry = logEntry.Int("reasonCode", int(d.ReasonCode)) } - logEntry.Msg("mqtt client: server requested disconnect") + logEntry.Msg("mqtt client: broker requested disconnect") } func (m *MQTTForwarder) Broadcast(topic EventTopic, payload interface{}) { fullTopic := m.topicPrefix + string(topic) - logger := log.With(). + logger := m.logger().With(). Str("topic", fullTopic). // Interface("event", payload). Logger() @@ -144,12 +146,24 @@ func (m *MQTTForwarder) Broadcast(topic EventTopic, payload interface{}) { logger.Error().AnErr("cause", err).Msg("mqtt client: error publishing event") return case pr.ReasonCode == 16: - logger.Debug().Msg("mqtt client: event sent to server, but there were no subscribers") + logger.Debug().Msg("mqtt client: event sent to broker, but there were no subscribers") return case pr.ReasonCode != 0: - logger.Warn().Int("reasonCode", int(pr.ReasonCode)).Msg("mqtt client: event rejected by mqtt server") + logger.Warn().Int("reasonCode", int(pr.ReasonCode)).Msg("mqtt client: event rejected by mqtt broker") default: - logger.Debug().Msg("mqtt client: event sent to server") + logger.Debug().Msg("mqtt client: event sent to broker") } }(fullTopic, asJSON) } + +func (m *MQTTForwarder) logger() *zerolog.Logger { + logCtx := log.With() + + if len(m.config.BrokerUrls) > 0 { + // Assumption: there's no more than one broker URL. + logCtx = logCtx.Stringer("broker", m.config.BrokerUrls[0]) + } + + logger := logCtx.Logger() + return &logger +}