我们将创建一个多订阅的MQTT网关,它可以同时订阅多个主题,并将消息转发到其他MQTT代理或者处理这些消息。
这里我们假设这个网关作为一个中间人,可以订阅多个主题,然后对消息进行一些处理(例如:过滤、转换等),然后发布到另一个MQTT代理或者同一个代理的其他主题。
为了简单起见,我们这里只实现一个基本的框架:订阅多个主题,并将接收到的消息打印出来,同时也可以选择将消息转发到另一个主题(可选)。
我们将使用paho-mqtt库。如果你还没有安装,请先安装:pip install paho-mqtt
这个网关将具有以下功能:
1. 连接到MQTT代理(可以指定多个代理,但这里我们只连接一个代理,但可以订阅多个主题)
2. 订阅多个主题
3. 处理接收到的消息(回调函数)
4. 可以选择将消息转发到另一个主题(如果需要)
注意:这个示例中,我们只连接一个MQTT代理,但是可以订阅多个主题。如果你需要连接多个代理,那么需要为每个代理创建客户端并连接。
我们将创建一个类来实现,这样更易于管理。
假设我们只需要连接一个MQTT代理,但是要订阅多个主题,并且可以将消息转发到同一个代理的另一个主题(或者不同的代理,如果需要连接多个代理,则需要扩展)。
这里我们只处理一个代理的情况,转发到同一个代理的另一个主题。
如果需要转发到不同的代理,则需要另一个客户端实例,但为了简单,我们先不考虑。
步骤:
1. 创建MQTT客户端,设置回调函数
2. 连接MQTT代理
3. 订阅多个主题
4. 在消息回调中处理消息,并可以选择转发
我们还可以添加一些功能,比如日志记录、消息处理函数(可由用户自定义)等。
让我们开始编写代码。
MQTT 多订阅网关实现
下面是一个基于 Python 的 MQTT 多订阅网关实现,它能够同时订阅多个 MQTT 主题,并对接收到的消息进行处理和转发。
安装依赖
首先需要安装 paho-mqtt 库:
pip install paho-mqtt
完整代码
import json
import logging
import time
import threading
from typing import Dict, List, Callable, Any
import paho.mqtt.client as mqtt
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("MQTTGateway")
class MQTTMultiSubscriptionGateway:
def __init__(
self,
broker_host: str,
broker_port: int = 1883,
client_id: str = None,
username: str = None,
password: str = None,
keepalive: int = 60
):
"""
MQTT 多订阅网关初始化
Args:
broker_host: MQTT 代理地址
broker_port: MQTT 代理端口,默认 1883
client_id: 客户端ID,默认自动生成
username: 用户名(可选)
password: 密码(可选)
keepalive: 心跳间隔,默认60秒
"""
self.broker_host = broker_host
self.broker_port = broker_port
self.client_id = client_id or f"mqtt_gateway_{int(time.time())}"
self.username = username
self.password = password
self.keepalive = keepalive
# 主题到处理函数的映射
self.topic_handlers: Dict[str, Callable[[str, Any], None]] = {}
# 创建MQTT客户端
self.client = mqtt.Client(client_id=self.client_id, protocol=mqtt.MQTTv311)
# 设置回调函数
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect
# 设置认证信息(如果提供)
if self.username and self.password:
self.client.username_pw_set(self.username, self.password)
# 连接状态
self.connected = False
self.connection_lock = threading.Lock()
def _on_connect(self, client, userdata, flags, rc):
"""连接回调函数"""
if rc == 0:
logger.info(f"Connected to MQTT broker at {self.broker_host}:{self.broker_port}")
self.connected = True
# 订阅所有已注册的主题
if self.topic_handlers:
topics = list(self.topic_handlers.keys())
logger.info(f"Subscribing to topics: {topics}")
# 为每个主题单独订阅,以便更好的QoS控制
for topic in topics:
client.subscribe(topic, qos=1)
else:
logger.error(f"Connection failed with result code {rc}")
self.connected = False
def _on_message(self, client, userdata, msg):
"""消息接收回调函数"""
try:
topic = msg.topic
logger.debug(f"Received message on topic: {topic}")
# 查找对应的处理函数
if topic in self.topic_handlers:
try:
# 尝试解析JSON负载
try:
payload = json.loads(msg.payload.decode())
except (json.JSONDecodeError, UnicodeDecodeError):
payload = msg.payload.decode()
# 调用处理函数
self.topic_handlers[topic](topic, payload)
except Exception as e:
logger.error(f"Error processing message on topic {topic}: {e}")
else:
logger.warning(f"No handler registered for topic: {topic}")
except Exception as e:
logger.error(f"Error in message callback: {e}")
def _on_disconnect(self, client, userdata, rc):
"""断开连接回调函数"""
logger.info(f"Disconnected from MQTT broker with result code {rc}")
self.connected = False
# 非主动断开时尝试重新连接
if rc != 0:
logger.info("Attempting to reconnect...")
self.reconnect()
def register_topic_handler(self, topic: str, handler: Callable[[str, Any], None]):
"""
注册主题和处理函数
Args:
topic: MQTT主题
handler: 处理函数,接收两个参数(topic, payload)
"""
self.topic_handlers[topic] = handler
logger.info(f"Registered handler for topic: {topic}")
# 如果已经连接,立即订阅该主题
if self.connected:
self.client.subscribe(topic, qos=1)
def remove_topic_handler(self, topic: str):
"""移除主题和处理函数"""
if topic in self.topic_handlers:
del self.topic_handlers[topic]
logger.info(f"Removed handler for topic: {topic}")
# 如果已经连接,取消订阅
if self.connected:
self.client.unsubscribe(topic)
def connect(self):
"""连接到MQTT代理"""
with self.connection_lock:
if not self.connected:
try:
logger.info(f"Connecting to MQTT broker at {self.broker_host}:{self.broker_port}")
self.client.connect(self.broker_host, self.broker_port, self.keepalive)
self.client.loop_start()
except Exception as e:
logger.error(f"Failed to connect to MQTT broker: {e}")
raise
def disconnect(self):
"""断开与MQTT代理的连接"""
with self.connection_lock:
if self.connected:
logger.info("Disconnecting from MQTT broker")
self.client.loop_stop()
self.client.disconnect()
def reconnect(self):
"""重新连接到MQTT代理"""
self.disconnect()
time.sleep(5) # 等待5秒后重连
self.connect()
def publish(self, topic: str, payload: Any, qos: int = 1, retain: bool = False):
"""
发布消息到MQTT代理
Args:
topic: 主题
payload: 消息内容
qos: 服务质量等级 (0, 1, 2)
retain: 是否保留消息
"""
try:
# 如果payload是字典,转换为JSON字符串
if isinstance(payload, dict):
payload = json.dumps(payload)
result = self.client.publish(topic, payload, qos=qos, retain=retain)
# 等待消息发布完成
result.wait_for_publish()
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logger.debug(f"Message published to {topic}")
else:
logger.error(f"Failed to publish message to {topic}, error code: {result.rc}")
except Exception as e:
logger.error(f"Error publishing message to {topic}: {e}")
def get_connected_status(self):
"""获取连接状态"""
return self.connected
# 示例使用
if __name__ == "__main__":
# 创建网关实例
gateway = MQTTMultiSubscriptionGateway(
broker_host="localhost", # 替换为你的MQTT代理地址
broker_port=1883,
client_id="mqtt_gateway_001"
)
# 定义处理函数
def handle_sensor_data(topic, payload):
"""处理传感器数据"""
print(f"Received sensor data on {topic}: {payload}")
# 这里可以添加数据处理逻辑,如存储、转发等
def handle_device_status(topic, payload):
"""处理设备状态"""
print(f"Received device status on {topic}: {payload}")
# 这里可以添加状态处理逻辑
# 注册主题和处理函数
gateway.register_topic_handler("sensors/temperature", handle_sensor_data)
gateway.register_topic_handler("sensors/humidity", handle_sensor_data)
gateway.register_topic_handler("devices/+/status", handle_device_status)
try:
# 连接到MQTT代理
gateway.connect()
# 保持程序运行
while True:
time.sleep(1)
# 示例:发布消息
if gateway.get_connected_status():
gateway.publish("gateway/status", {"status": "online", "timestamp": time.time()})
except KeyboardInterrupt:
print("Shutting down...")
finally:
gateway.disconnect()
功能说明
这个 MQTT 多订阅网关具有以下功能:
- 多主题订阅:可以同时订阅多个 MQTT 主题
- 主题处理函数注册:为每个主题注册独立的处理函数
- 自动重连:连接断开时自动尝试重新连接
- 消息发布:支持向 MQTT 代理发布消息
- 异常处理:完善的错误处理和日志记录
使用示例
- 修改
broker_host
为你的 MQTT 代理地址 - 根据需要添加或修改主题和处理函数
- 运行程序:
python mqtt_gateway.py
扩展建议
你可以根据实际需求扩展这个网关:
- 添加消息持久化功能
- 实现消息转换和路由
- 添加认证和加密功能
- 集成数据库存储
- 添加 REST API 接口用于监控和控制
这个实现提供了一个灵活的框架,可以根据具体应用场景进行定制和扩展。