Manager: consistent MQTT server/broker naming
Consistently log about the MQTT "broker", not the "server". The former is common MQTT terminology.
This commit is contained in:
parent
91d15df765
commit
3326f683da
@ -11,6 +11,7 @@ import (
|
|||||||
|
|
||||||
"github.com/eclipse/paho.golang/autopaho"
|
"github.com/eclipse/paho.golang/autopaho"
|
||||||
"github.com/eclipse/paho.golang/paho"
|
"github.com/eclipse/paho.golang/paho"
|
||||||
|
"github.com/rs/zerolog"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -54,12 +55,12 @@ func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder {
|
|||||||
config.ClientID = defaultClientID
|
config.ClientID = defaultClientID
|
||||||
}
|
}
|
||||||
|
|
||||||
serverURL, err := url.Parse(config.BrokerURL)
|
brokerURL, err := url.Parse(config.BrokerURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().
|
log.Error().
|
||||||
Err(err).
|
Err(err).
|
||||||
Str("mqttServerURL", config.BrokerURL).
|
Str("brokerURL", config.BrokerURL).
|
||||||
Msg("could not parse MQTT server URL, skipping creation of MQTT client")
|
Msg("mqtt client: could not parse MQTT broker URL, skipping creation of MQTT client")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,7 +68,7 @@ func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder {
|
|||||||
topicPrefix: config.TopicPrefix,
|
topicPrefix: config.TopicPrefix,
|
||||||
}
|
}
|
||||||
client.config = autopaho.ClientConfig{
|
client.config = autopaho.ClientConfig{
|
||||||
BrokerUrls: []*url.URL{serverURL},
|
BrokerUrls: []*url.URL{brokerURL},
|
||||||
KeepAlive: keepAlive,
|
KeepAlive: keepAlive,
|
||||||
ConnectRetryDelay: connectRetryDelay,
|
ConnectRetryDelay: connectRetryDelay,
|
||||||
OnConnectionUp: client.onConnectionUp,
|
OnConnectionUp: client.onConnectionUp,
|
||||||
@ -84,7 +85,7 @@ func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *MQTTForwarder) Connect(ctx context.Context) {
|
func (m *MQTTForwarder) Connect(ctx context.Context) {
|
||||||
log.Debug().Msg("mqtt client: connecting")
|
m.logger().Debug().Msg("mqtt client: connecting to broker")
|
||||||
conn, err := autopaho.NewConnection(ctx, m.config)
|
conn, err := autopaho.NewConnection(ctx, m.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -92,34 +93,35 @@ func (m *MQTTForwarder) Connect(ctx context.Context) {
|
|||||||
|
|
||||||
m.conn = conn
|
m.conn = conn
|
||||||
m.ctx = ctx
|
m.ctx = ctx
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MQTTForwarder) onConnectionUp(connMgr *autopaho.ConnectionManager, connAck *paho.Connack) {
|
func (m *MQTTForwarder) onConnectionUp(connMgr *autopaho.ConnectionManager, connAck *paho.Connack) {
|
||||||
log.Info().Msg("mqtt client: connection established")
|
m.logger().Info().Msg("mqtt client: connection established")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MQTTForwarder) onConnectionError(err error) {
|
func (m *MQTTForwarder) onConnectionError(err error) {
|
||||||
log.Warn().AnErr("cause", err).Msg("mqtt client: could not connect to MQTT server")
|
m.logger().Warn().AnErr("cause", err).Msg("mqtt client: could not connect to MQTT broker")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MQTTForwarder) onClientError(err error) {
|
func (m *MQTTForwarder) onClientError(err error) {
|
||||||
log.Warn().AnErr("cause", err).Msg("mqtt client: server requested disconnect")
|
m.logger().Warn().AnErr("cause", err).Msg("mqtt client: broker requested disconnect")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MQTTForwarder) onServerDisconnect(d *paho.Disconnect) {
|
func (m *MQTTForwarder) onServerDisconnect(d *paho.Disconnect) {
|
||||||
logEntry := log.Warn()
|
logEntry := m.logger().Warn()
|
||||||
if d.Properties != nil {
|
if d.Properties != nil {
|
||||||
logEntry = logEntry.Str("reason", d.Properties.ReasonString)
|
logEntry = logEntry.Str("reason", d.Properties.ReasonString)
|
||||||
} else {
|
} else {
|
||||||
logEntry = logEntry.Int("reasonCode", int(d.ReasonCode))
|
logEntry = logEntry.Int("reasonCode", int(d.ReasonCode))
|
||||||
}
|
}
|
||||||
logEntry.Msg("mqtt client: server requested disconnect")
|
logEntry.Msg("mqtt client: broker requested disconnect")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MQTTForwarder) Broadcast(topic EventTopic, payload interface{}) {
|
func (m *MQTTForwarder) Broadcast(topic EventTopic, payload interface{}) {
|
||||||
fullTopic := m.topicPrefix + string(topic)
|
fullTopic := m.topicPrefix + string(topic)
|
||||||
|
|
||||||
logger := log.With().
|
logger := m.logger().With().
|
||||||
Str("topic", fullTopic).
|
Str("topic", fullTopic).
|
||||||
// Interface("event", payload).
|
// Interface("event", payload).
|
||||||
Logger()
|
Logger()
|
||||||
@ -144,12 +146,24 @@ func (m *MQTTForwarder) Broadcast(topic EventTopic, payload interface{}) {
|
|||||||
logger.Error().AnErr("cause", err).Msg("mqtt client: error publishing event")
|
logger.Error().AnErr("cause", err).Msg("mqtt client: error publishing event")
|
||||||
return
|
return
|
||||||
case pr.ReasonCode == 16:
|
case pr.ReasonCode == 16:
|
||||||
logger.Debug().Msg("mqtt client: event sent to server, but there were no subscribers")
|
logger.Debug().Msg("mqtt client: event sent to broker, but there were no subscribers")
|
||||||
return
|
return
|
||||||
case pr.ReasonCode != 0:
|
case pr.ReasonCode != 0:
|
||||||
logger.Warn().Int("reasonCode", int(pr.ReasonCode)).Msg("mqtt client: event rejected by mqtt server")
|
logger.Warn().Int("reasonCode", int(pr.ReasonCode)).Msg("mqtt client: event rejected by mqtt broker")
|
||||||
default:
|
default:
|
||||||
logger.Debug().Msg("mqtt client: event sent to server")
|
logger.Debug().Msg("mqtt client: event sent to broker")
|
||||||
}
|
}
|
||||||
}(fullTopic, asJSON)
|
}(fullTopic, asJSON)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MQTTForwarder) logger() *zerolog.Logger {
|
||||||
|
logCtx := log.With()
|
||||||
|
|
||||||
|
if len(m.config.BrokerUrls) > 0 {
|
||||||
|
// Assumption: there's no more than one broker URL.
|
||||||
|
logCtx = logCtx.Stringer("broker", m.config.BrokerUrls[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := logCtx.Logger()
|
||||||
|
return &logger
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user