PYTHON MQTT多订阅网关

我们将创建一个多订阅的MQTT网关,它可以同时订阅多个主题,并将消息转发到其他MQTT代理或者处理这些消息。

这里我们假设这个网关作为一个中间人,可以订阅多个主题,然后对消息进行一些处理(例如:过滤、转换等),然后发布到另一个MQTT代理或者同一个代理的其他主题。

为了简单起见,我们这里只实现一个基本的框架:订阅多个主题,并将接收到的消息打印出来,同时也可以选择将消息转发到另一个主题(可选)。

我们将使用paho-mqtt库。如果你还没有安装,请先安装:pip install paho-mqtt

这个网关将具有以下功能:

1. 连接到MQTT代理(可以指定多个代理,但这里我们只连接一个代理,但可以订阅多个主题)

2. 订阅多个主题

3. 处理接收到的消息(回调函数)

4. 可以选择将消息转发到另一个主题(如果需要)

注意:这个示例中,我们只连接一个MQTT代理,但是可以订阅多个主题。如果你需要连接多个代理,那么需要为每个代理创建客户端并连接。

我们将创建一个类来实现,这样更易于管理。

假设我们只需要连接一个MQTT代理,但是要订阅多个主题,并且可以将消息转发到同一个代理的另一个主题(或者不同的代理,如果需要连接多个代理,则需要扩展)。

这里我们只处理一个代理的情况,转发到同一个代理的另一个主题。

如果需要转发到不同的代理,则需要另一个客户端实例,但为了简单,我们先不考虑。

步骤:

1. 创建MQTT客户端,设置回调函数

2. 连接MQTT代理

3. 订阅多个主题

4. 在消息回调中处理消息,并可以选择转发

我们还可以添加一些功能,比如日志记录、消息处理函数(可由用户自定义)等。

让我们开始编写代码。

MQTT 多订阅网关实现

下面是一个基于 Python 的 MQTT 多订阅网关实现,它能够同时订阅多个 MQTT 主题,并对接收到的消息进行处理和转发。

安装依赖

首先需要安装 paho-mqtt 库:

pip install paho-mqtt

完整代码

import json
import logging
import time
import threading
from typing import Dict, List, Callable, Any
import paho.mqtt.client as mqtt

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("MQTTGateway")

class MQTTMultiSubscriptionGateway:
    def __init__(
        self,
        broker_host: str,
        broker_port: int = 1883,
        client_id: str = None,
        username: str = None,
        password: str = None,
        keepalive: int = 60
    ):
        """
        MQTT 多订阅网关初始化
        
        Args:
            broker_host: MQTT 代理地址
            broker_port: MQTT 代理端口,默认 1883
            client_id: 客户端ID,默认自动生成
            username: 用户名(可选)
            password: 密码(可选)
            keepalive: 心跳间隔,默认60秒
        """
        self.broker_host = broker_host
        self.broker_port = broker_port
        self.client_id = client_id or f"mqtt_gateway_{int(time.time())}"
        self.username = username
        self.password = password
        self.keepalive = keepalive
        
        # 主题到处理函数的映射
        self.topic_handlers: Dict[str, Callable[[str, Any], None]] = {}
        
        # 创建MQTT客户端
        self.client = mqtt.Client(client_id=self.client_id, protocol=mqtt.MQTTv311)
        
        # 设置回调函数
        self.client.on_connect = self._on_connect
        self.client.on_message = self._on_message
        self.client.on_disconnect = self._on_disconnect
        
        # 设置认证信息(如果提供)
        if self.username and self.password:
            self.client.username_pw_set(self.username, self.password)
            
        # 连接状态
        self.connected = False
        self.connection_lock = threading.Lock()
        
    def _on_connect(self, client, userdata, flags, rc):
        """连接回调函数"""
        if rc == 0:
            logger.info(f"Connected to MQTT broker at {self.broker_host}:{self.broker_port}")
            self.connected = True
            
            # 订阅所有已注册的主题
            if self.topic_handlers:
                topics = list(self.topic_handlers.keys())
                logger.info(f"Subscribing to topics: {topics}")
                
                # 为每个主题单独订阅,以便更好的QoS控制
                for topic in topics:
                    client.subscribe(topic, qos=1)
        else:
            logger.error(f"Connection failed with result code {rc}")
            self.connected = False
            
    def _on_message(self, client, userdata, msg):
        """消息接收回调函数"""
        try:
            topic = msg.topic
            logger.debug(f"Received message on topic: {topic}")
            
            # 查找对应的处理函数
            if topic in self.topic_handlers:
                try:
                    # 尝试解析JSON负载
                    try:
                        payload = json.loads(msg.payload.decode())
                    except (json.JSONDecodeError, UnicodeDecodeError):
                        payload = msg.payload.decode()
                    
                    # 调用处理函数
                    self.topic_handlers[topic](topic, payload)
                except Exception as e:
                    logger.error(f"Error processing message on topic {topic}: {e}")
            else:
                logger.warning(f"No handler registered for topic: {topic}")
                
        except Exception as e:
            logger.error(f"Error in message callback: {e}")
            
    def _on_disconnect(self, client, userdata, rc):
        """断开连接回调函数"""
        logger.info(f"Disconnected from MQTT broker with result code {rc}")
        self.connected = False
        
        # 非主动断开时尝试重新连接
        if rc != 0:
            logger.info("Attempting to reconnect...")
            self.reconnect()
            
    def register_topic_handler(self, topic: str, handler: Callable[[str, Any], None]):
        """
        注册主题和处理函数
        
        Args:
            topic: MQTT主题
            handler: 处理函数,接收两个参数(topic, payload)
        """
        self.topic_handlers[topic] = handler
        logger.info(f"Registered handler for topic: {topic}")
        
        # 如果已经连接,立即订阅该主题
        if self.connected:
            self.client.subscribe(topic, qos=1)
            
    def remove_topic_handler(self, topic: str):
        """移除主题和处理函数"""
        if topic in self.topic_handlers:
            del self.topic_handlers[topic]
            logger.info(f"Removed handler for topic: {topic}")
            
            # 如果已经连接,取消订阅
            if self.connected:
                self.client.unsubscribe(topic)
                
    def connect(self):
        """连接到MQTT代理"""
        with self.connection_lock:
            if not self.connected:
                try:
                    logger.info(f"Connecting to MQTT broker at {self.broker_host}:{self.broker_port}")
                    self.client.connect(self.broker_host, self.broker_port, self.keepalive)
                    self.client.loop_start()
                except Exception as e:
                    logger.error(f"Failed to connect to MQTT broker: {e}")
                    raise
                    
    def disconnect(self):
        """断开与MQTT代理的连接"""
        with self.connection_lock:
            if self.connected:
                logger.info("Disconnecting from MQTT broker")
                self.client.loop_stop()
                self.client.disconnect()
                
    def reconnect(self):
        """重新连接到MQTT代理"""
        self.disconnect()
        time.sleep(5)  # 等待5秒后重连
        self.connect()
        
    def publish(self, topic: str, payload: Any, qos: int = 1, retain: bool = False):
        """
        发布消息到MQTT代理
        
        Args:
            topic: 主题
            payload: 消息内容
            qos: 服务质量等级 (0, 1, 2)
            retain: 是否保留消息
        """
        try:
            # 如果payload是字典,转换为JSON字符串
            if isinstance(payload, dict):
                payload = json.dumps(payload)
                
            result = self.client.publish(topic, payload, qos=qos, retain=retain)
            
            # 等待消息发布完成
            result.wait_for_publish()
            
            if result.rc == mqtt.MQTT_ERR_SUCCESS:
                logger.debug(f"Message published to {topic}")
            else:
                logger.error(f"Failed to publish message to {topic}, error code: {result.rc}")
                
        except Exception as e:
            logger.error(f"Error publishing message to {topic}: {e}")
            
    def get_connected_status(self):
        """获取连接状态"""
        return self.connected

# 示例使用
if __name__ == "__main__":
    # 创建网关实例
    gateway = MQTTMultiSubscriptionGateway(
        broker_host="localhost",  # 替换为你的MQTT代理地址
        broker_port=1883,
        client_id="mqtt_gateway_001"
    )
    
    # 定义处理函数
    def handle_sensor_data(topic, payload):
        """处理传感器数据"""
        print(f"Received sensor data on {topic}: {payload}")
        # 这里可以添加数据处理逻辑,如存储、转发等
        
    def handle_device_status(topic, payload):
        """处理设备状态"""
        print(f"Received device status on {topic}: {payload}")
        # 这里可以添加状态处理逻辑
        
    # 注册主题和处理函数
    gateway.register_topic_handler("sensors/temperature", handle_sensor_data)
    gateway.register_topic_handler("sensors/humidity", handle_sensor_data)
    gateway.register_topic_handler("devices/+/status", handle_device_status)
    
    try:
        # 连接到MQTT代理
        gateway.connect()
        
        # 保持程序运行
        while True:
            time.sleep(1)
            
            # 示例:发布消息
            if gateway.get_connected_status():
                gateway.publish("gateway/status", {"status": "online", "timestamp": time.time()})
                
    except KeyboardInterrupt:
        print("Shutting down...")
    finally:
        gateway.disconnect()

功能说明

这个 MQTT 多订阅网关具有以下功能:

  1. 多主题订阅:可以同时订阅多个 MQTT 主题
  2. 主题处理函数注册:为每个主题注册独立的处理函数
  3. 自动重连:连接断开时自动尝试重新连接
  4. 消息发布:支持向 MQTT 代理发布消息
  5. 异常处理:完善的错误处理和日志记录

使用示例

  1. 修改 broker_host 为你的 MQTT 代理地址
  2. 根据需要添加或修改主题和处理函数
  3. 运行程序:python mqtt_gateway.py

扩展建议

你可以根据实际需求扩展这个网关:

  1. 添加消息持久化功能
  2. 实现消息转换和路由
  3. 添加认证和加密功能
  4. 集成数据库存储
  5. 添加 REST API 接口用于监控和控制

这个实现提供了一个灵活的框架,可以根据具体应用场景进行定制和扩展。

本网站所收集的公开资料部分来源于互联网,其版权归原作者本人所有,本站只是转载和摘录,目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其的真实性能做到合理的研判负责,也不构成任何其他建议,如果有任何侵犯您权益和知识产权的地方,请来邮或来电告知本站,经过核实,我们会及时的进行整理删除,谢谢!
评论 共0条
取消回复 发布评论