meshcore-repeater/src/MQTTBridge.cpp
Ryan Malloy 0905aa4bf8 Add exponential backoff to MQTT reconnection
Instead of fixed 5s/30s retry intervals, implement exponential backoff:
- Initial delay: 1 second
- Max delay: 60 seconds
- Doubles on each failed attempt
- Resets to minimum on successful connection

This prevents hammering a down broker while still reconnecting
quickly when the issue is transient.

Also includes fix for WiFiState/NetworkState enum mismatch in
getWiFiStatus() which was already on main.
2026-02-05 09:48:49 -07:00

421 lines
12 KiB
C++

#if defined(WITH_MQTT) || defined(WITH_ETHERNET)
#include "MQTTBridge.h"
#include <MeshCore.h>
#include <RTClib.h>
// Static instance for callback
MQTTBridge* MQTTBridge::_instance = nullptr;
MQTTBridge::MQTTBridge(INetworkManager* network, mesh::PacketManager* mgr, mesh::RTCClock* rtc)
: _network(network),
_mqtt_client(),
_state(MQTTState::DISCONNECTED),
_mgr(mgr),
_rtc(rtc),
_last_connect_attempt(0),
_last_status_publish(0),
_last_stats_publish(0),
_connected_since(0),
_current_backoff_ms(BACKOFF_MIN_MS),
_messages_sent(0),
_messages_received(0),
_reconnect_count(0),
_hash_index(0),
_command_callback(nullptr),
_initialized(false) {
memset(&_config, 0, sizeof(_config));
memset(_self_pubkey, 0, sizeof(_self_pubkey));
memset(_gateway_id, 0, sizeof(_gateway_id));
memset(_seen_hashes, 0, sizeof(_seen_hashes));
_instance = this;
}
void MQTTBridge::begin(const MQTTConfig& config, const uint8_t* self_pubkey) {
_config = config;
memcpy(_self_pubkey, self_pubkey, 32);
// Generate gateway ID from first 8 bytes of pubkey
for (int i = 0; i < 8; i++) {
sprintf(&_gateway_id[i * 2], "%02x", _self_pubkey[i]);
}
setupTopics();
// Configure client based on TLS setting
if (_config.use_tls) {
_wifi_client_secure.setInsecure(); // Skip cert verification
_mqtt_client.setClient(_wifi_client_secure);
Serial.printf("[MQTT] TLS enabled\n");
} else {
_mqtt_client.setClient(_wifi_client);
Serial.printf("[MQTT] Plain TCP\n");
}
_mqtt_client.setServer(_config.broker, _config.port);
_mqtt_client.setCallback(mqttCallback);
_mqtt_client.setKeepAlive(_config.keepalive_secs > 0 ? _config.keepalive_secs : 60);
_mqtt_client.setBufferSize(512);
_initialized = true;
_state = MQTTState::DISCONNECTED;
Serial.printf("[MQTT] Initialized: %s:%d, prefix=%s, gateway=%s\n",
_config.broker, _config.port, _config.topic_prefix, _gateway_id);
if (_config.enabled && _network->isConnected()) {
attemptConnection();
}
}
void MQTTBridge::loop() {
if (!_initialized || !_config.enabled) return;
if (!_network->isConnected()) {
if (_state == MQTTState::CONNECTED) {
_state = MQTTState::DISCONNECTED;
Serial.println("[MQTTS] WiFi lost, disconnected");
}
return;
}
switch (_state) {
case MQTTState::DISCONNECTED:
// Use exponential backoff for reconnection attempts
if (millis() - _last_connect_attempt > _current_backoff_ms) {
attemptConnection();
}
break;
case MQTTState::CONNECTING:
break;
case MQTTState::CONNECTED:
if (!_mqtt_client.connected()) {
_state = MQTTState::DISCONNECTED;
Serial.println("[MQTTS] Connection lost");
_last_connect_attempt = millis();
// Don't reset backoff on connection loss - broker might be down
} else {
_mqtt_client.loop();
if (millis() - _last_status_publish > 60000) {
publishStatus();
_last_status_publish = millis();
}
}
break;
case MQTTState::ERROR:
// Use backoff for error recovery too
if (millis() - _last_connect_attempt > _current_backoff_ms) {
_state = MQTTState::DISCONNECTED;
}
break;
}
}
void MQTTBridge::end() {
if (!_initialized) return;
if (_mqtt_client.connected()) {
JsonDocument doc;
doc["status"] = "offline";
doc["gateway_id"] = _gateway_id;
doc["timestamp"] = getTimestamp();
publishJson(_topic_status, doc, true);
_mqtt_client.disconnect();
}
_state = MQTTState::DISCONNECTED;
_initialized = false;
Serial.println("[MQTTS] Stopped");
}
void MQTTBridge::attemptConnection() {
if (!_network->isConnected()) return;
_state = MQTTState::CONNECTING;
_last_connect_attempt = millis();
Serial.printf("[MQTTS] Connecting to %s:%d (backoff=%lums)...\n",
_config.broker, _config.port, _current_backoff_ms);
String client_id = String(_config.client_id);
if (client_id.length() == 0) {
client_id = "meshcore-" + String(_gateway_id);
}
char lwt_topic[80];
snprintf(lwt_topic, sizeof(lwt_topic), "%s/gateway/%s/status", _config.topic_prefix, _gateway_id);
bool connected = false;
if (strlen(_config.user) > 0) {
connected = _mqtt_client.connect(client_id.c_str(), _config.user, _config.password,
lwt_topic, 1, true, "{\"status\":\"offline\"}");
} else {
connected = _mqtt_client.connect(client_id.c_str(),
lwt_topic, 1, true, "{\"status\":\"offline\"}");
}
if (connected) {
_state = MQTTState::CONNECTED;
_connected_since = millis();
_reconnect_count++;
// Reset backoff on successful connection
_current_backoff_ms = BACKOFF_MIN_MS;
Serial.println("[MQTTS] Connected!");
subscribeToCommands();
publishStatus();
_last_status_publish = millis();
} else {
int rc = _mqtt_client.state();
Serial.printf("[MQTTS] Connection failed, rc=%d\n", rc);
_state = MQTTState::ERROR;
// Exponential backoff: double the delay (up to max)
_current_backoff_ms = min(_current_backoff_ms * BACKOFF_MULTIPLIER, BACKOFF_MAX_MS);
Serial.printf("[MQTTS] Next retry in %lums\n", _current_backoff_ms);
}
}
void MQTTBridge::setupTopics() {
snprintf(_topic_status, sizeof(_topic_status),
"%s/gateway/%s/status", _config.topic_prefix, _gateway_id);
snprintf(_topic_packets_rx, sizeof(_topic_packets_rx),
"%s/packets/rx", _config.topic_prefix);
snprintf(_topic_packets_tx, sizeof(_topic_packets_tx),
"%s/packets/tx", _config.topic_prefix);
snprintf(_topic_adverts, sizeof(_topic_adverts),
"%s/adverts", _config.topic_prefix);
snprintf(_topic_stats, sizeof(_topic_stats),
"%s/gateway/%s/stats", _config.topic_prefix, _gateway_id);
snprintf(_topic_cmd_prefix, sizeof(_topic_cmd_prefix),
"%s/gateway/%s/cmd/", _config.topic_prefix, _gateway_id);
}
void MQTTBridge::subscribeToCommands() {
char topic[100];
snprintf(topic, sizeof(topic), "%s#", _topic_cmd_prefix);
_mqtt_client.subscribe(topic);
Serial.printf("[MQTTS] Subscribed to: %s\n", topic);
}
void MQTTBridge::updateConfig(const MQTTConfig& config) {
bool reconnect_needed = (strcmp(_config.broker, config.broker) != 0 ||
_config.port != config.port ||
strcmp(_config.user, config.user) != 0 ||
strcmp(_config.password, config.password) != 0);
_config = config;
setupTopics();
if (reconnect_needed && _mqtt_client.connected()) {
_mqtt_client.disconnect();
_state = MQTTState::DISCONNECTED;
_last_connect_attempt = 0;
}
}
void MQTTBridge::publishStatus() {
if (!_mqtt_client.connected()) return;
JsonDocument doc;
doc["status"] = "online";
doc["gateway_id"] = _gateway_id;
doc["ip"] = _network->getLocalIP().toString();
doc["rssi"] = _network->getRSSI();
doc["uptime_secs"] = (_connected_since > 0) ? (millis() - _connected_since) / 1000 : 0;
doc["free_heap"] = ESP.getFreeHeap();
doc["timestamp"] = getTimestamp();
publishJson(_topic_status, doc, true);
}
void MQTTBridge::publishStats(uint32_t uptime_secs, uint32_t packets_rx, uint32_t packets_tx,
uint32_t air_time_secs, int16_t noise_floor) {
if (!_mqtt_client.connected()) return;
JsonDocument doc;
doc["gateway_id"] = _gateway_id;
doc["uptime_secs"] = uptime_secs;
doc["packets_rx"] = packets_rx;
doc["packets_tx"] = packets_tx;
doc["air_time_secs"] = air_time_secs;
doc["noise_floor"] = noise_floor;
doc["mqtt_msgs_sent"] = _messages_sent;
doc["mqtt_msgs_recv"] = _messages_received;
doc["timestamp"] = getTimestamp();
publishJson(_topic_stats, doc);
}
void MQTTBridge::publishPacketRx(mesh::Packet* pkt, int len, float snr, float rssi) {
if (!_mqtt_client.connected()) return;
if (isDuplicate(reinterpret_cast<uint8_t*>(pkt), len)) {
return;
}
JsonDocument doc;
doc["gateway_id"] = _gateway_id;
doc["direction"] = "rx";
doc["len"] = len;
doc["payload_type"] = pkt->getPayloadType();
doc["route_type"] = pkt->isRouteDirect() ? "direct" : "flood";
doc["path_len"] = pkt->path_len;
doc["payload_len"] = pkt->payload_len;
doc["snr"] = snr;
doc["rssi"] = rssi;
doc["timestamp"] = getTimestamp();
char hex_buf[MAX_TRANS_UNIT * 2 + 1];
size_t hex_len = 0;
for (int i = 0; i < len && hex_len < sizeof(hex_buf) - 2; i++) {
sprintf(&hex_buf[hex_len], "%02x", reinterpret_cast<uint8_t*>(pkt)[i]);
hex_len += 2;
}
hex_buf[hex_len] = '\0';
doc["raw"] = hex_buf;
publishJson(_topic_packets_rx, doc);
}
void MQTTBridge::publishPacketTx(mesh::Packet* pkt, int len) {
if (!_mqtt_client.connected()) return;
if (isDuplicate(reinterpret_cast<uint8_t*>(pkt), len)) {
return;
}
JsonDocument doc;
doc["gateway_id"] = _gateway_id;
doc["direction"] = "tx";
doc["len"] = len;
doc["payload_type"] = pkt->getPayloadType();
doc["route_type"] = pkt->isRouteDirect() ? "direct" : "flood";
doc["path_len"] = pkt->path_len;
doc["payload_len"] = pkt->payload_len;
doc["timestamp"] = getTimestamp();
char hex_buf[MAX_TRANS_UNIT * 2 + 1];
size_t hex_len = 0;
for (int i = 0; i < len && hex_len < sizeof(hex_buf) - 2; i++) {
sprintf(&hex_buf[hex_len], "%02x", reinterpret_cast<uint8_t*>(pkt)[i]);
hex_len += 2;
}
hex_buf[hex_len] = '\0';
doc["raw"] = hex_buf;
publishJson(_topic_packets_tx, doc);
}
void MQTTBridge::publishAdvert(const uint8_t* pubkey, uint32_t timestamp,
const uint8_t* app_data, size_t app_data_len) {
if (!_mqtt_client.connected()) return;
JsonDocument doc;
doc["gateway_id"] = _gateway_id;
char node_id[17];
for (int i = 0; i < 8; i++) {
sprintf(&node_id[i * 2], "%02x", pubkey[i]);
}
doc["node_id"] = node_id;
doc["advert_timestamp"] = timestamp;
doc["timestamp"] = getTimestamp();
if (app_data_len > 0) {
char hex_buf[128];
size_t hex_len = 0;
for (size_t i = 0; i < app_data_len && hex_len < sizeof(hex_buf) - 2; i++) {
sprintf(&hex_buf[hex_len], "%02x", app_data[i]);
hex_len += 2;
}
hex_buf[hex_len] = '\0';
doc["app_data"] = hex_buf;
}
publishJson(_topic_adverts, doc);
}
void MQTTBridge::publishMessage(const char* topic, const char* payload, bool retained) {
if (_mqtt_client.publish(topic, payload, retained)) {
_messages_sent++;
} else {
Serial.printf("[MQTTS] Publish failed to %s\n", topic);
}
}
void MQTTBridge::publishJson(const char* topic, JsonDocument& doc, bool retained) {
char buffer[512];
size_t len = serializeJson(doc, buffer, sizeof(buffer));
if (len > 0 && len < sizeof(buffer)) {
publishMessage(topic, buffer, retained);
}
}
uint32_t MQTTBridge::fnv1a_hash(const uint8_t* data, size_t len) {
uint32_t hash = 2166136261u;
for (size_t i = 0; i < len; i++) {
hash ^= data[i];
hash *= 16777619u;
}
return hash;
}
bool MQTTBridge::isDuplicate(const uint8_t* data, size_t len) {
uint32_t hash = fnv1a_hash(data, len);
for (size_t i = 0; i < HASH_TABLE_SIZE; i++) {
if (_seen_hashes[i] == hash) {
return true;
}
}
_seen_hashes[_hash_index] = hash;
_hash_index = (_hash_index + 1) % HASH_TABLE_SIZE;
return false;
}
void MQTTBridge::mqttCallback(char* topic, uint8_t* payload, unsigned int length) {
if (_instance) {
_instance->handleMessage(topic, payload, length);
}
}
void MQTTBridge::handleMessage(char* topic, uint8_t* payload, unsigned int length) {
_messages_received++;
if (strncmp(topic, _topic_cmd_prefix, strlen(_topic_cmd_prefix)) == 0) {
const char* cmd = topic + strlen(_topic_cmd_prefix);
Serial.printf("[MQTTS] Command received: %s\n", cmd);
if (strcmp(cmd, "reboot") == 0) {
Serial.println("[MQTTS] Reboot requested");
publishStatus();
delay(100);
ESP.restart();
} else if (strcmp(cmd, "status") == 0) {
publishStatus();
}
if (_command_callback) {
_command_callback(cmd, payload, length);
}
}
}
const char* MQTTBridge::getTimestamp() {
static char buf[32];
uint32_t now = _rtc->getCurrentTime();
DateTime dt = DateTime(now);
snprintf(buf, sizeof(buf), "%04d-%02d-%02dT%02d:%02d:%02dZ",
dt.year(), dt.month(), dt.day(), dt.hour(), dt.minute(), dt.second());
return buf;
}
#endif // WITH_MQTT || WITH_ETHERNET