#if defined(WITH_MQTT) || defined(WITH_ETHERNET) #include "MQTTBridge.h" #include #include // Static instance for callback MQTTBridge* MQTTBridge::_instance = nullptr; MQTTBridge::MQTTBridge(INetworkManager* network, mesh::PacketManager* mgr, mesh::RTCClock* rtc) : _network(network), _mqtt_client(), _state(MQTTState::DISCONNECTED), _mgr(mgr), _rtc(rtc), _last_connect_attempt(0), _last_status_publish(0), _last_stats_publish(0), _connected_since(0), _messages_sent(0), _messages_received(0), _reconnect_count(0), _hash_index(0), _command_callback(nullptr), _initialized(false) { memset(&_config, 0, sizeof(_config)); memset(_self_pubkey, 0, sizeof(_self_pubkey)); memset(_gateway_id, 0, sizeof(_gateway_id)); memset(_seen_hashes, 0, sizeof(_seen_hashes)); _instance = this; } void MQTTBridge::begin(const MQTTConfig& config, const uint8_t* self_pubkey) { _config = config; memcpy(_self_pubkey, self_pubkey, 32); // Generate gateway ID from first 8 bytes of pubkey for (int i = 0; i < 8; i++) { sprintf(&_gateway_id[i * 2], "%02x", _self_pubkey[i]); } setupTopics(); // Configure client based on TLS setting if (_config.use_tls) { _wifi_client_secure.setInsecure(); // Skip cert verification _mqtt_client.setClient(_wifi_client_secure); Serial.printf("[MQTT] TLS enabled\n"); } else { _mqtt_client.setClient(_wifi_client); Serial.printf("[MQTT] Plain TCP\n"); } _mqtt_client.setServer(_config.broker, _config.port); _mqtt_client.setCallback(mqttCallback); _mqtt_client.setKeepAlive(_config.keepalive_secs > 0 ? _config.keepalive_secs : 60); _mqtt_client.setBufferSize(512); _initialized = true; _state = MQTTState::DISCONNECTED; Serial.printf("[MQTT] Initialized: %s:%d, prefix=%s, gateway=%s\n", _config.broker, _config.port, _config.topic_prefix, _gateway_id); if (_config.enabled && _network->isConnected()) { attemptConnection(); } } void MQTTBridge::loop() { if (!_initialized || !_config.enabled) return; if (!_network->isConnected()) { if (_state == MQTTState::CONNECTED) { _state = MQTTState::DISCONNECTED; Serial.println("[MQTTS] WiFi lost, disconnected"); } return; } switch (_state) { case MQTTState::DISCONNECTED: if (millis() - _last_connect_attempt > 5000) { attemptConnection(); } break; case MQTTState::CONNECTING: break; case MQTTState::CONNECTED: if (!_mqtt_client.connected()) { _state = MQTTState::DISCONNECTED; Serial.println("[MQTTS] Connection lost"); _last_connect_attempt = millis(); } else { _mqtt_client.loop(); if (millis() - _last_status_publish > 60000) { publishStatus(); _last_status_publish = millis(); } } break; case MQTTState::ERROR: if (millis() - _last_connect_attempt > 30000) { _state = MQTTState::DISCONNECTED; } break; } } void MQTTBridge::end() { if (!_initialized) return; if (_mqtt_client.connected()) { JsonDocument doc; doc["status"] = "offline"; doc["gateway_id"] = _gateway_id; doc["timestamp"] = getTimestamp(); publishJson(_topic_status, doc, true); _mqtt_client.disconnect(); } _state = MQTTState::DISCONNECTED; _initialized = false; Serial.println("[MQTTS] Stopped"); } void MQTTBridge::attemptConnection() { if (!_network->isConnected()) return; _state = MQTTState::CONNECTING; _last_connect_attempt = millis(); Serial.printf("[MQTTS] Connecting to %s:%d...\n", _config.broker, _config.port); String client_id = String(_config.client_id); if (client_id.length() == 0) { client_id = "meshcore-" + String(_gateway_id); } char lwt_topic[80]; snprintf(lwt_topic, sizeof(lwt_topic), "%s/gateway/%s/status", _config.topic_prefix, _gateway_id); bool connected = false; if (strlen(_config.user) > 0) { connected = _mqtt_client.connect(client_id.c_str(), _config.user, _config.password, lwt_topic, 1, true, "{\"status\":\"offline\"}"); } else { connected = _mqtt_client.connect(client_id.c_str(), lwt_topic, 1, true, "{\"status\":\"offline\"}"); } if (connected) { _state = MQTTState::CONNECTED; _connected_since = millis(); _reconnect_count++; Serial.println("[MQTTS] Connected!"); subscribeToCommands(); publishStatus(); _last_status_publish = millis(); } else { int rc = _mqtt_client.state(); Serial.printf("[MQTTS] Connection failed, rc=%d\n", rc); _state = MQTTState::ERROR; } } void MQTTBridge::setupTopics() { snprintf(_topic_status, sizeof(_topic_status), "%s/gateway/%s/status", _config.topic_prefix, _gateway_id); snprintf(_topic_packets_rx, sizeof(_topic_packets_rx), "%s/packets/rx", _config.topic_prefix); snprintf(_topic_packets_tx, sizeof(_topic_packets_tx), "%s/packets/tx", _config.topic_prefix); snprintf(_topic_adverts, sizeof(_topic_adverts), "%s/adverts", _config.topic_prefix); snprintf(_topic_stats, sizeof(_topic_stats), "%s/gateway/%s/stats", _config.topic_prefix, _gateway_id); snprintf(_topic_cmd_prefix, sizeof(_topic_cmd_prefix), "%s/gateway/%s/cmd/", _config.topic_prefix, _gateway_id); } void MQTTBridge::subscribeToCommands() { char topic[100]; snprintf(topic, sizeof(topic), "%s#", _topic_cmd_prefix); _mqtt_client.subscribe(topic); Serial.printf("[MQTTS] Subscribed to: %s\n", topic); } void MQTTBridge::updateConfig(const MQTTConfig& config) { bool reconnect_needed = (strcmp(_config.broker, config.broker) != 0 || _config.port != config.port || strcmp(_config.user, config.user) != 0 || strcmp(_config.password, config.password) != 0); _config = config; setupTopics(); if (reconnect_needed && _mqtt_client.connected()) { _mqtt_client.disconnect(); _state = MQTTState::DISCONNECTED; _last_connect_attempt = 0; } } void MQTTBridge::publishStatus() { if (!_mqtt_client.connected()) return; JsonDocument doc; doc["status"] = "online"; doc["gateway_id"] = _gateway_id; doc["ip"] = _network->getLocalIP().toString(); doc["rssi"] = _network->getRSSI(); doc["uptime_secs"] = (_connected_since > 0) ? (millis() - _connected_since) / 1000 : 0; doc["free_heap"] = ESP.getFreeHeap(); doc["timestamp"] = getTimestamp(); publishJson(_topic_status, doc, true); } void MQTTBridge::publishStats(uint32_t uptime_secs, uint32_t packets_rx, uint32_t packets_tx, uint32_t air_time_secs, int16_t noise_floor) { if (!_mqtt_client.connected()) return; JsonDocument doc; doc["gateway_id"] = _gateway_id; doc["uptime_secs"] = uptime_secs; doc["packets_rx"] = packets_rx; doc["packets_tx"] = packets_tx; doc["air_time_secs"] = air_time_secs; doc["noise_floor"] = noise_floor; doc["mqtt_msgs_sent"] = _messages_sent; doc["mqtt_msgs_recv"] = _messages_received; doc["timestamp"] = getTimestamp(); publishJson(_topic_stats, doc); } void MQTTBridge::publishPacketRx(mesh::Packet* pkt, int len, float snr, float rssi) { if (!_mqtt_client.connected()) return; if (isDuplicate(reinterpret_cast(pkt), len)) { return; } JsonDocument doc; doc["gateway_id"] = _gateway_id; doc["direction"] = "rx"; doc["len"] = len; doc["payload_type"] = pkt->getPayloadType(); doc["route_type"] = pkt->isRouteDirect() ? "direct" : "flood"; doc["path_len"] = pkt->path_len; doc["payload_len"] = pkt->payload_len; doc["snr"] = snr; doc["rssi"] = rssi; doc["timestamp"] = getTimestamp(); char hex_buf[MAX_TRANS_UNIT * 2 + 1]; size_t hex_len = 0; for (int i = 0; i < len && hex_len < sizeof(hex_buf) - 2; i++) { sprintf(&hex_buf[hex_len], "%02x", reinterpret_cast(pkt)[i]); hex_len += 2; } hex_buf[hex_len] = '\0'; doc["raw"] = hex_buf; publishJson(_topic_packets_rx, doc); } void MQTTBridge::publishPacketTx(mesh::Packet* pkt, int len) { if (!_mqtt_client.connected()) return; if (isDuplicate(reinterpret_cast(pkt), len)) { return; } JsonDocument doc; doc["gateway_id"] = _gateway_id; doc["direction"] = "tx"; doc["len"] = len; doc["payload_type"] = pkt->getPayloadType(); doc["route_type"] = pkt->isRouteDirect() ? "direct" : "flood"; doc["path_len"] = pkt->path_len; doc["payload_len"] = pkt->payload_len; doc["timestamp"] = getTimestamp(); char hex_buf[MAX_TRANS_UNIT * 2 + 1]; size_t hex_len = 0; for (int i = 0; i < len && hex_len < sizeof(hex_buf) - 2; i++) { sprintf(&hex_buf[hex_len], "%02x", reinterpret_cast(pkt)[i]); hex_len += 2; } hex_buf[hex_len] = '\0'; doc["raw"] = hex_buf; publishJson(_topic_packets_tx, doc); } void MQTTBridge::publishAdvert(const uint8_t* pubkey, uint32_t timestamp, const uint8_t* app_data, size_t app_data_len) { if (!_mqtt_client.connected()) return; JsonDocument doc; doc["gateway_id"] = _gateway_id; char node_id[17]; for (int i = 0; i < 8; i++) { sprintf(&node_id[i * 2], "%02x", pubkey[i]); } doc["node_id"] = node_id; doc["advert_timestamp"] = timestamp; doc["timestamp"] = getTimestamp(); if (app_data_len > 0) { char hex_buf[128]; size_t hex_len = 0; for (size_t i = 0; i < app_data_len && hex_len < sizeof(hex_buf) - 2; i++) { sprintf(&hex_buf[hex_len], "%02x", app_data[i]); hex_len += 2; } hex_buf[hex_len] = '\0'; doc["app_data"] = hex_buf; } publishJson(_topic_adverts, doc); } void MQTTBridge::publishMessage(const char* topic, const char* payload, bool retained) { if (_mqtt_client.publish(topic, payload, retained)) { _messages_sent++; } else { Serial.printf("[MQTTS] Publish failed to %s\n", topic); } } void MQTTBridge::publishJson(const char* topic, JsonDocument& doc, bool retained) { char buffer[512]; size_t len = serializeJson(doc, buffer, sizeof(buffer)); if (len > 0 && len < sizeof(buffer)) { publishMessage(topic, buffer, retained); } } uint32_t MQTTBridge::fnv1a_hash(const uint8_t* data, size_t len) { uint32_t hash = 2166136261u; for (size_t i = 0; i < len; i++) { hash ^= data[i]; hash *= 16777619u; } return hash; } bool MQTTBridge::isDuplicate(const uint8_t* data, size_t len) { uint32_t hash = fnv1a_hash(data, len); for (size_t i = 0; i < HASH_TABLE_SIZE; i++) { if (_seen_hashes[i] == hash) { return true; } } _seen_hashes[_hash_index] = hash; _hash_index = (_hash_index + 1) % HASH_TABLE_SIZE; return false; } void MQTTBridge::mqttCallback(char* topic, uint8_t* payload, unsigned int length) { if (_instance) { _instance->handleMessage(topic, payload, length); } } void MQTTBridge::handleMessage(char* topic, uint8_t* payload, unsigned int length) { _messages_received++; if (strncmp(topic, _topic_cmd_prefix, strlen(_topic_cmd_prefix)) == 0) { const char* cmd = topic + strlen(_topic_cmd_prefix); Serial.printf("[MQTTS] Command received: %s\n", cmd); if (strcmp(cmd, "reboot") == 0) { Serial.println("[MQTTS] Reboot requested"); publishStatus(); delay(100); ESP.restart(); } else if (strcmp(cmd, "status") == 0) { publishStatus(); } if (_command_callback) { _command_callback(cmd, payload, length); } } } const char* MQTTBridge::getTimestamp() { static char buf[32]; uint32_t now = _rtc->getCurrentTime(); DateTime dt = DateTime(now); snprintf(buf, sizeof(buf), "%04d-%02d-%02dT%02d:%02d:%02dZ", dt.year(), dt.month(), dt.day(), dt.hour(), dt.minute(), dt.second()); return buf; } #endif // WITH_MQTT || WITH_ETHERNET