标签 MongoDB 下的文章

使用托管数据库部署 Coreflux MQTT 代理

MQTT 代理 通过发布-订阅消息模式连接物联网设备和应用程序,使其成为现代 物联网 基础设施的重要组成部分。Coreflux 是一个 低代码 MQTT 代理,增加了实时数据处理和转换功能,让你可以直接与 DigitalOcean 托管数据库(包括 MongoDBPostgreSQLMySQLOpenSearch)集成,而无需编写自定义集成代码。

你将学到什么: 本教程将引导你部署一个完整的物联网数据管道——从在 DigitalOcean 上设置托管数据库集群和 Coreflux MQTT 代理,到配置安全的 VPC 网络、使用 Coreflux 的物联网语言 (LoT) 构建数据转换模型,以及自动将处理后的物联网数据存储到你选择的数据库中。最终你将获得一个可用于生产环境的设置,能够处理物联网应用的实时消息传递和持久存储。

关键要点

在深入了解分步部署过程之前,以下是你将学到的关键点:

  • 在 DigitalOcean 上部署托管数据库集群(PostgreSQL、MongoDB、MySQL 或 OpenSearch),用于可扩展的物联网数据存储。
  • 使用 Marketplace 镜像或 Docker 在 DigitalOcean Droplet (DigitalOcean的VPC)上设置 Coreflux MQTT 代理。
  • 创建安全的 VPC 网络以连接你的 MQTT 代理和数据库,无需公开暴露。
  • 使用 Coreflux 的物联网语言 (LoT) 构建实时数据管道,实现低代码物联网自动化。
  • 自动转换和存储物联网数据,从 MQTT 主题到数据库表、集合或索引。
  • 验证端到端数据流,从模拟传感器通过转换模型到数据库存储。

本教程为需要实时消息传递结合持久数据存储以及搜索、分析或关系查询等高级功能的物联网应用提供了一个可用于生产环境的基础。

你将构建什么

在本教程结束时,你将得到:

  • 一个用于可扩展存储托管数据库集群(PostgreSQLMongoDBMySQLOpenSearch
  • 一台运行 Coreflux MQTT 代理DigitalOcean Droplet
  • 一个用于安全 物联网通信的虚拟私有云 (VPC) 网络
  • 使用 LoT Notebook 扩展进行的实时数据模拟
  • 低代码数据转换模型和数据库集成路由
  • 用于 物联网自动化 的完整 数据集成与转换 管道

Coreflux 与 DigitalOcean 合作

Coreflux 通过物联网语言编程语言在 DigitalOcean 云平台上提供轻量级 MQTT 代理和数据管道工具,以实现高效的物联网通信。

什么是 MQTT?

MQTT(消息队列遥测传输)是一种轻量级的、发布-订阅网络协议,在物联网生态系统中被广泛采用。专为受限设备和低带宽、高延迟或不稳定的网络设计,MQTT 能够在带宽受限的环境中实现高效、实时的消息传递。

关于 Coreflux

Coreflux 提供了一个轻量级 MQTT 代理,以促进物联网设备与应用程序之间的高效、实时通信,包括每个用例所必需的实时数据转换功能。为可扩展性和可靠性而构建,Coreflux 专为低延迟和高吞吐量至关重要的环境量身定制。

无论你是构建一个小型物联网项目还是部署工业监控系统,Coreflux 都能处理设备之间的消息路由和数据流。

在 DigitalOcean 云平台上使用 Coreflux,你将获得:

数据处理: 在你的数据所在之处集中处理你的数据处理需求,确保实时数据处理。
数据集成: 轻松与其他 DigitalOcean 服务(如托管数据库 PostgreSQL、MongoDB、MySQL 或 OpenSearch)集成,确保为你的所有数据需求提供一个单一且简单的生态系统。
可扩展性: 轻松处理不断增长的数据和设备数量,而不会影响性能。
可靠性: 确保在所有连接的设备之间进行一致且可靠的消息传递。

Coreflux MQTT 和托管数据库架构概述

准备工作

在开始本 MQTT 代理 部署教程之前,你需要:

  • 一个 DigitalOcean 帐户,可在DigitalOcean.com注册,支持绑定信用卡、支付宝或数字货币
  • 了解 MQTT 协议概念和 物联网 架构
  • Visual Studio Code(用于 LoT Notebook 扩展)

预计时间: 本教程大约需要 30-45 分钟完成,具体取决于数据库配置时间(通常每个数据库集群需要 1-5 分钟)。

步骤 1 — 为物联网自动化创建网络基础设施

为安全的 MQTT 通信创建 VPC 网络

首先,你将创建一个虚拟私有云 (VPC),以确保你的 物联网 服务和 MQTT 代理 之间的安全通信,无需公开访问。

  1. 登录你的 DigitalOcean 控制面板
  2. 从左侧导航栏进入 网络VPC
  3. 点击 创建 VPC 网络

DigitalOcean VPC 创建屏幕

  1. 物联网自动化配置你的 VPC:

    • 名称:coreflux-integrations-vpc(或你的 VPC 名称)
    • 数据中心区域:选择法兰克福(或你首选的区域)
    • IP 范围:使用默认值或根据需要配置
    • 描述:为你的 MQTT 代理和数据库 网络添加有意义的描述
  2. 点击 创建 VPC 网络

VPC 将为你所有的物联网资源提供隔离的网络,确保 Coreflux MQTT 代理托管数据库 之间的安全通信。有关 VPC 配置的更多详细信息,请参阅我们关于创建 VPC 网络的教程。

步骤 2 — 为可扩展存储设置托管数据库

根据你的物联网应用需求,选择以下数据库选项之一:

  • PostgreSQL:适用于需要关系查询、ACID 合规性和复杂关系的结构化数据
  • MySQL:适用于结构化工作负载和具有强一致性及广泛工具支持的事务性查询
  • MongoDB:适用于具有可变模式的灵活文档存储和快速开发
  • OpenSearch:适用于高级搜索、分析、日志分析和时间序列数据可视化

设置 PostgreSQL 托管数据库

当你的物联网工作负载需要关系模式强一致性高级 SQL 分析,并由自动备份、监控和维护支持时,DigitalOcean 上的托管 PostgreSQL 是一个很好的选择。

DigitalOcean 托管 PostgreSQL 集群设置

  1. DigitalOcean 控制面板,导航到 数据库
  2. 点击 创建数据库集群
  3. 物联网自动化配置你的 PostgreSQL 集群:

    • 数据库引擎:选择 PostgreSQL
    • 版本:选择最新的稳定版本
    • 数据中心区域:选择法兰克福(与你的 VPC 相同)
    • VPC 网络:选择你创建的 coreflux-integrations-vpc
    • 数据库集群名称:postgresql-coreflux-test
    • 项目:选择你的目标项目
  4. 根据你的 物联网 需求选择你的计划:

    1. 对于开发:基础 计划,1 GB RAM
    2. 对于生产:通用型 或更高,用于可扩展存储
  5. 点击 创建数据库集群

托管数据库 创建过程通常需要 1-5 分钟。完成后,你将被重定向到数据库概览页面,在那里你可以查看连接详细信息并执行管理操作。

为 MQTT 代理集成配置 PostgreSQL 数据库访问

系统将提示你进行入门步骤,显示你的连接详细信息,你可以配置入站访问规则(建议限制为你的 IP 和仅 VPC)。

  1. 点击 开始使用 来配置你的 PostgreSQL 数据库
  2. (可选操作)限制入站连接:

    • 添加你本地计算机的 IP 以进行管理访问
    • droplet 将通过 VPC 网络自动获得允许

PostgreSQL 入站访问和 VPC 规则

对于连接详细信息,你将看到两个选项 - 公共网络和 VPC 网络。第一个用于像 DBeaver 这样的工具进行外部访问,而第二个将由 Coreflux 服务用于访问数据库。

PostgreSQL 公共和 VPC 连接详细信息

  1. 记下提供的连接详细信息,包括公共访问和 VPC 访问(每种都有不同的详细信息):

    • 主机:你的数据库主机名
    • 用户:默认管理员用户
    • 密码:自动生成的安全密码
    • 数据库:身份验证数据库名称
测试 PostgreSQL 数据库连接

你可以使用提供的连接参数,使用公共访问凭证通过 DBeaver 测试 PostgreSQL 连接:

在 DBeaver 中测试 PostgreSQL 连接

创建 PostgreSQL 应用程序数据库和用户(可选)

为了更好的安全性和组织性,为你的 物联网自动化 应用程序创建一个专用用户和数据库。这也可以通过 DBeaver 或 CLI 完成,但 DigitalOcean 提供了一种用户友好的方法:

  1. 转到你的 托管数据库 集群中的 用户与数据库 选项卡
  2. 创建用户

    • 用户名:coreflux-broker-client
    • 密码:自动生成
  3. 创建数据库

    • 数据库名称:coreflux-broker-data

注意: 你可能需要更改数据库内的用户权限,以便能够创建表、插入和选择数据。对于 PostgreSQL,使用 GRANT CREATE, INSERT, SELECT ON DATABASE coreflux-broker-data TO coreflux-broker-client; 授予必要的权限。对于 MySQL,使用 GRANT CREATE, INSERT, SELECT ON coreflux-broker-data.* TO 'coreflux-broker-client'@'%';。

设置 MySQL 托管数据库

当你想要熟悉的 SQL、广泛的生态系统支持以及处理备份、更新和监控的完全托管服务时,DigitalOcean 上的托管 MySQL结构化、事务性物联网数据的理想选择。

DigitalOcean 托管 MySQL 集群设置

  1. DigitalOcean 控制面板,导航到 数据库
  2. 点击 创建数据库集群
  3. 物联网自动化配置你的 MySQL 集群:

    • 数据库引擎:选择 MySQL
    • 版本:选择最新的稳定版本
    • 数据中心区域:选择法兰克福(与你的 VPC 相同)
    • VPC 网络:选择你创建的 coreflux-integrations-vpc
    • 数据库集群名称:mysql-coreflux-test
    • 项目:选择你的目标项目
  4. 根据你的 物联网 需求选择你的计划:

    • 对于开发:基础 计划,1 GB RAM
    • 对于生产:通用型 或更高,用于可扩展存储
  5. 点击 创建数据库集群

托管数据库 创建过程通常需要 1-5 分钟。完成后,你将被重定向到数据库概览页面,在那里你可以查看连接详细信息并执行管理操作。

为 MQTT 代理集成配置 MySQL 数据库访问

系统将提示你进行入门步骤,显示你的连接详细信息,你可以配置入站访问规则(建议限制为你的 IP 和仅 VPC)。

  1. 点击 开始使用 来配置你的 MySQL 数据库
  2. (可选操作)限制入站连接:

    • 添加你本地计算机的 IP 以进行管理访问
    • droplet 将通过 VPC 网络自动获得允许

MySQL 入站访问和 VPC 规则

对于连接详细信息,你将看到两个选项 - 公共网络和 VPC 网络。第一个用于像 DBeaver 这样的工具进行外部访问,而第二个将由 Coreflux 服务用于访问数据库。

MySQL 公共和 VPC 连接详细信息

  1. 记下提供的连接详细信息,包括公共访问和 VPC 访问(每种都有不同的详细信息):

    • 主机:你的数据库主机名
    • 用户:默认管理员用户
    • 密码:自动生成的安全密码
    • 数据库:身份验证数据库名称
测试 MySQL 数据库连接

你可以使用提供的连接参数,使用公共访问凭证通过 DBeaver 测试 MySQL 连接。

注意: 你可能需要更改 DBeaver 的驱动程序设置——设置 allowPublicKeyRetrieval = true。

在 DBeaver 中测试 MySQL 连接

创建 MySQL 应用程序数据库和用户(可选)

为了更好的安全性和组织性,为你的 物联网自动化 应用程序创建一个专用用户和数据库。这也可以通过 DBeaver 或 CLI 完成,但 DigitalOcean 提供了一种用户友好的方法:

  1. 转到你的 托管数据库 集群中的 用户与数据库 选项卡
  2. 创建用户

    • 用户名:coreflux-broker-client
    • 密码:自动生成
  3. 创建数据库

    • 数据库名称:coreflux-broker-data

设置 MongoDB 托管数据库

托管 MongoDB 非常适合灵活或不断演变的物联网负载,让你能够存储异构的传感器文档,而无需严格模式,同时平台处理复制、备份和监控。

创建托管 MongoDB 集群

  1. DigitalOcean 控制面板,导航到 数据库
  2. 点击 创建数据库集群
  3. 物联网自动化配置你的 MongoDB 集群:

    • 数据库引擎:选择 MongoDB
    • 版本:选择最新的稳定版本
    • 数据中心区域:选择法兰克福(与你的 VPC 相同)
    • VPC 网络:选择你创建的 coreflux-integrations-vpc
    • 数据库集群名称:mongodb-coreflux-test
    • 项目:选择你的目标项目
  4. 根据你的 物联网 需求选择你的计划:

    • 对于开发:基础 计划,1 GB RAM
    • 对于生产:通用型 或更高,用于可扩展存储
  5. 点击 创建数据库集群

托管数据库 创建过程通常需要 1-5 分钟。完成后,你将被重定向到数据库概览页面,在那里你可以查看连接详细信息并执行管理操作。

为 MQTT 代理集成配置 MongoDB 数据库访问

系统将提示你进行入门步骤,显示你的连接详细信息,你可以配置入站访问规则(建议限制为你的 IP 和仅 VPC)。

  1. 点击 开始使用 来配置你的 MongoDB 数据库
  2. (可选)限制入站连接:

    • 添加你本地计算机的 IP 以进行管理访问
    • droplet 将通过 VPC 网络自动获得允许

为 MQTT 代理集成配置数据库访问

对于连接详细信息,你将看到两个选项:公共网络和 VPC 网络。第一个用于像 MongoDB Compass 这样的工具进行外部访问,而第二个将由 Coreflux 服务用于访问数据库。

MongoDB 连接详细信息

  1. 记下提供的连接详细信息,包括公共访问和 VPC 访问(每种都有不同的详细信息):

    • 主机:你的数据库主机名
    • 用户:默认管理员用户
    • 密码:自动生成的安全密码
    • 数据库:身份验证数据库名称
测试 MongoDB 数据库连接

你可以使用 MongoDB Compass 或提供的连接字符串,使用公共访问凭证测试 MongoDB 连接:

mongodb://username:password@mongodb-host:27017/defaultauthdb?ssl=true

测试数据库连接

创建 MongoDB 应用程序数据库和用户(可选)

为了更好的安全性和组织性,为你的 物联网自动化 应用程序创建一个专用用户和数据库。这也可以通过 MongoDB Compass 或 CLI 完成,但 DigitalOcean 提供了一种用户友好的方法:

  1. 转到你的 托管数据库 集群中的 用户与数据库 选项卡
  2. 创建用户

    • 用户名:coreflux-broker-client
    • 密码:自动生成
  3. 创建数据库

    • 数据库名称:coreflux-broker-data

设置 OpenSearch 托管数据库

托管 OpenSearch 专为高容量物联网数据的搜索、日志分析和时间序列仪表板而设计,该服务为你管理集群健康、扩展和索引存储。

创建托管 OpenSearch 集群

  1. DigitalOcean 控制面板,导航到 数据库
  2. 点击 创建数据库集群
  3. 物联网自动化配置你的 OpenSearch 集群:

    • 数据库引擎:选择 OpenSearch
    • 版本:选择最新的稳定版本
    • 数据中心区域:选择法兰克福(与你的 VPC 相同)
    • VPC 网络:选择你创建的 coreflux-integrations-vpc
    • 数据库集群名称:opensearch-coreflux-test
    • 项目:选择你的目标项目
  4. 根据你的 物联网 需求选择你的计划:

    1. 对于开发:基础 计划,1 GB RAM
    2. 对于生产:通用型 或更高,用于可扩展存储
  5. 点击 创建数据库集群

托管数据库 创建过程通常需要 1-5 分钟。完成后,你将被重定向到数据库概览页面,在那里你可以查看连接详细信息并执行管理操作。

为 MQTT 代理集成配置 OpenSearch 数据库访问

系统将提示你进行入门步骤,显示你的连接详细信息,你可以配置入站访问规则(建议限制为你的 IP 和仅 VPC)。

  1. 点击 开始使用 来配置你的 OpenSearch 数据库
  2. (可选)限制入站连接:

    • 添加你本地计算机的 IP 以进行管理访问
    • droplet 将通过 VPC 网络自动获得允许

配置数据库访问

对于连接详细信息,你将看到两个选项:公共网络和 VPC 网络。第一个用于工具的外部访问,而第二个将由 Coreflux 服务用于访问数据库。你还将看到访问 OpenSearch 仪表板的 URL 和参数。

连接详细信息

  1. 记下提供的连接详细信息,包括公共访问和 VPC 访问(每种都有不同的详细信息):

    • 主机:你的数据库主机名
    • 用户:默认管理员用户
    • 密码:自动生成的安全密码
    • 数据库:身份验证数据库名称
测试 OpenSearch 数据库连接

你可以使用提供的凭证通过 OpenSearch 仪表板测试 OpenSearch 连接:

测试数据库连接

步骤 3 — 在 DigitalOcean Droplet 上部署 Coreflux MQTT 代理

创建 DigitalOcean Droplet

  1. 在你的 DigitalOcean 控制面板中导航到 Droplets
  2. 点击 创建 Droplet

创建新的 DigitalOcean Droplet

  1. MQTT 代理 部署配置你的 droplet

    • 选择区域:法兰克福(与你的托管数据库相同)
    • VPC 网络:选择 coreflux-integrations-vpc
    • 选择镜像:转到 Marketplace 选项卡
    • 搜索 “Coreflux” 并从 Marketplace 中选择 Coreflux

从 Marketplace 选择 Coreflux

  1. 为你的 物联网 工作负载选择大小

    • 对于开发:基础 计划,2 GB 内存
    • 对于生产:基础通用型 计划,4+ GB 内存以获得可扩展性能
  2. 选择身份验证方法

    • SSH 密钥:推荐用于提高安全性

      1. 可以使用 ssh-keygen 在本地创建密钥
    • 密码:备选方案
  3. 最终确定详细信息

    • 主机名:coreflux-test-broker
    • 项目:选择你的项目
    • 标签:为 DevOps 组织添加相关标签
  4. 点击 创建 Droplet
  5. 查看 Droplet 主页并等待其完成部署

Droplet 部署进行中

替代方案 - 在Docker镜像Droplet上使用Docker安装Coreflux MQTT代理

采用与Coreflux Droplet相同的方法,选择Docker作为市场应用镜像。

一旦你的droplet运行起来,通过已定义的认证方法或Droplet主页上提供的Web控制台,使用SSH连接到它:

ssh root@your-droplet-ip

SSH连接到Coreflux droplet

使用Docker运行Coreflux MQTT代理

docker run -d \
  --name coreflux \
  -p 1883:1883 \
  -p 1884:1884 \
  -p 5000:5000 \
  -p 443:443 \
  coreflux/coreflux-mqtt-broker-t:1.6.3

这个Docker命令:

  • 以分离模式运行容器 (-d)
  • 将容器命名为 coreflux
  • 暴露MQTT和Web界面所需的端口
  • 使用最新的Coreflux镜像

验证MQTT代理是否在运行:

docker ps

你应该看到一个正在运行的容器:

Docker中运行的Coreflux容器

通过使用默认值连接到MQTT代理来验证部署

你可以通过MQTT客户端(如MQTT Explorer)访问MQTT代理,以验证对代理的访问,无论采用何种部署方法。

MQTT Explorer连接到Coreflux代理

步骤4 — 为安全的物联网通信配置防火墙规则(可选)

对于生产环境的物联网自动化部署,配置防火墙规则以限制访问:

  1. 导航到网络防火墙
  2. 点击创建防火墙
  3. 配置MQTT代理安全的入站规则:

    • SSH:来自你IP的端口22
    • MQTT:来自你的物联网应用程序源的端口1883
    • 带TLS的MQTT:用于安全的带TLS的MQTT的端口1884
    • WebSocket:用于通过WebSocket的MQTT的端口5000
    • 带TLS的WebSocket:用于通过带TLS的WebSocket的MQTT的端口443
  4. 将防火墙应用到你的droplet

关于详细的防火墙配置,请参考DigitalOcean的防火墙快速入门教程。生产提示: 将MQTT端口1883限制在特定的源IP或VPC范围,并且对于外部设备连接,优先使用端口1884(带TLS的MQTT)。如果你需要额外的安全层,请考虑使用带有私有网络的DigitalOcean应用平台。

步骤5 — 使用Coreflux的Language of Things设置物联网数据集成

安装LoT Notebook扩展

用于Visual Studio Code的LoTLanguage of ThingsNotebook扩展提供了一个集成的低代码开发环境,用于MQTT代理编程和物联网自动化。了解更多关于Coreflux的Language of Things (LoT)用于低代码物联网自动化的信息。

  1. 打开Visual Studio Code
  2. 转到扩展(Ctrl+Shift+X)
  3. 搜索"LoT Notebooks"
  4. 安装由Coreflux提供的LoT VSCode Notebooks扩展

Visual Studio Code中的LoT Notebook扩展

连接到你的MQTT代理

配置与你的Coreflux MQTT代理的连接,当在顶部栏提示时或通过点击底部左侧栏的MQTT按钮时,使用默认凭据:

  • 用户:root
  • 密码:coreflux

假设没有错误,你将在底部左侧栏看到与代理的MQTT连接状态。

VS Code中的Coreflux MQTT连接状态

步骤6 — 通过Actions在MQTT代理中创建数据

对于这个用例,我们将通过一个转换管道将原始数据集成到数据库中。然而,由于在演示中没有连接到任何MQTT设备,我们将利用LoT的能力,并使用一个Action来模拟设备数据。

在LoT中,Action是一种可执行的逻辑,由特定事件触发,例如定时间隔、主题更新或其他操作或系统组件的显式调用。Actions允许与MQTT主题、内部变量和负载进行动态交互,促进复杂的物联网自动化工作流。

因此,我们可以使用一个以定义的时间间隔在特定主题中生成数据的Action,然后由我们将在下面定义的管道的其余部分使用。

你可以下载包含示例项目的github仓库。

生成模拟物联网数据

使用低代码LoTLanguage of Things)界面创建一个Action来生成模拟传感器数据:

DEFINE ACTION RANDOMIZEMachineData
ON EVERY 10 SECONDS DO
    PUBLISH TOPIC "raw_data/machine1" WITH RANDOM BETWEEN 0 AND 10
    PUBLISH TOPIC "raw_data/station2" WITH RANDOM BETWEEN 0 AND 60

在提供的Notebook中,你还有一个Action可以执行递增计数器来模拟数据,作为提供Action的替代方案。

运行LoT操作以生成模拟物联网数据

当你运行这个Action时,它将:

  • 自动部署到MQTT代理
  • 每10秒生成一次模拟的物联网传感器数据
  • 实时数据发布到特定的MQTT主题
  • LoT Notebook界面中显示同步状态

    • 此状态显示LoT Notebook上的代码是否与代理中运行的代码不同,或者是否完全缺失

步骤7 — 为实时处理创建数据转换模型

使用Language of Things定义数据模型

Coreflux中的模型用于转换、聚合和计算来自输入MQTT主题的值,并将结果发布到新主题。它们是创建适用于你多个数据源的UNS - 统一命名空间 - 的基础。

因此,通过该模型,你可以定义原始物联网数据的结构与转换方式,适用于单个设备,也支持同时处理多个设备(借助通配符+实现)。模型还作为用于可扩展存储托管数据库的关键数据模式。

DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"

    ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER

    ADD "energy_wh" WITH (energy * 1000)

    ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")

    ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)

    ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)

    ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)

    ADD "timestamp" WITH TIMESTAMP "UTC"

这个低代码模型:

  • 使用通配符+自动应用到所有机器
  • 通过乘以1000将能量转换为瓦时(energy_wh)
  • 根据能量阈值确定生产状态
  • 跟踪生产计数和停机事件
  • 向所有实时数据点添加时间戳
  • 从主题结构中提取机器ID
  • 将结构化数据发布到Simulator/Machine/Data主题(将+替换为每个匹配触发器/源数据格式的主题)

由于我们使用Action生成了两个模拟传感器/机器,我们可以看到模型结构自动应用于两者,同时生成了一个json对象和各个单独的主题。

Coreflux模型发布的转换后的MQTT数据

步骤8 — 为可扩展存储设置数据库集成

选择与你在步骤2中选择的数据库相匹配的数据库集成部分。

PostgreSQL集成

在本节中,你将学习如何将处理后的物联网数据存储到DigitalOcean上的PostgreSQL托管数据库中。

要将处理后的物联网数据存储到PostgreSQL托管数据库中,你需要在Coreflux中定义一个Route。Route使用简单、低代码的配置指定数据如何从你的MQTT代理发送到你的PostgreSQL集群:

DEFINE ROUTE PostgreSQL_Log WITH TYPE POSTGRESQL

    ADD SQL_CONFIG

        WITH SERVER "db-postgresql.db.onmyserver.com"

        WITH PORT 25060

        WITH DATABASE "defaultdb"

        WITH USERNAME "doadmin"

        WITH PASSWORD "AVNS_pass"

        WITH USE_SSL TRUE

        WITH TRUST_SERVER_CERTIFICATE FALSE

使用来自DigitalOcean的你自己的PostgreSQL连接详细信息替换,并在你的LoT Notebook中运行该Route重要提示: 为了更好的安全性和更低的延迟,请使用VPC连接详细信息(而非公共连接)。VPC主机名和端口与公共连接字符串不同 - 请检查你的数据库集群的连接详细信息页面以获取这两个选项。

为PostgreSQL数据库存储更新模型

修改你的LoT模型以使用数据库路由进行可扩展存储,通过将此添加到模型的末尾:

STORE IN "PostgreSQL_Log"

    WITH TABLE "MachineProductionData"

此外,添加一个带有主题的参数,以便在你的托管数据库中为每个条目提供唯一标识符。

DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"

    ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER

    ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"

    ADD "energy_wh" WITH (energy * 1000)

    ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")

    ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)

    ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)

    ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)

    ADD "timestamp" WITH TIMESTAMP "UTC"

    STORE IN "PostgreSQL_Log"

        WITH TABLE "MachineProductionData"

部署此更新后的操作后,所有数据在更新时应自动存储在数据库中。

MySQL集成

MySQL是一种广泛使用的关系数据库管理系统,非常适合大规模存储和分析物联网数据。在本节中,你将学习如何将你的Coreflux MQTT代理连接到DigitalOcean上的托管MySQL数据库,以便你的实时设备数据能够安全可靠地持久化,用于分析、报告或与其他应用程序集成。

要启用此集成,你必须在Coreflux的LoT(Language of Things)中定义一个Route,指示处理后的数据应该发送到哪里以及如何发送。下面是路由数据到MySQL数据库所需的低代码格式。请务必根据需要替换你自己的连接详细信息:

DEFINE ROUTE MySQL_Log WITH TYPE MYSQL
    ADD SQL_CONFIG
        WITH SERVER "db-mysql.db.onmyserver.com"
        WITH PORT 25060
        WITH DATABASE "defaultdb"
        WITH USERNAME "doadmin"
        WITH PASSWORD "AVNS_pass"
        WITH USE_SSL TRUE
        WITH TRUST_SERVER_CERTIFICATE FALSE

使用来自DigitalOcean的你自己的MySQL连接详细信息替换,并在你的LoT Notebook中运行该Route重要提示: 为了更好的安全性和更低的延迟,请使用VPC连接详细信息(而非公共连接)。如果你遇到连接问题,请验证TRUST_SERVER_CERTIFICATE是否已为你的MySQL版本正确设置 - 某些版本需要TRUE,而其他版本则使用FALSE

为MySQL数据库存储更新模型

修改你的LoT模型以使用数据库路由进行可扩展存储,通过将此添加到模型的末尾:

STORE IN "MySQL_Log"
    WITH TABLE "MachineProductionData"

此外,添加一个带有主题的参数,以便在你的托管数据库中为每个条目提供唯一标识符。

DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
    ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
    ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
    ADD "energy_wh" WITH (energy * 1000)
    ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
    ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
    ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
    ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
    ADD "timestamp" WITH TIMESTAMP "UTC"
    STORE IN "MySQL_Log"
        WITH TABLE "MachineProductionData"

部署此更新后的操作后,所有数据在更新时应自动存储在数据库中。

MongoDB集成

MongoDB是一种NoSQL数据库,非常适合存储和查询具有灵活模式的物联网数据。在本节中,你将学习如何将你的Coreflux MQTT代理连接到DigitalOcean上的托管MongoDB数据库,以便你的实时设备数据能够安全可靠地持久化,用于分析、报告或与其他应用程序集成。

要启用此集成,你必须在Coreflux的LoT(Language of Things)中定义一个Route,指示处理后的数据应该发送到哪里以及如何发送。下面是路由数据到MongoDB数据库所需的低代码格式。请务必根据需要替换你自己的连接详细信息:

DEFINE ROUTE mongo_route WITH TYPE MONGODB
    ADD MONGODB_CONFIG
        WITH CONNECTION_STRING "mongodb+srv://<username>:<password>@<cluster-uri>/<database>?tls=true&authSource=admin&replicaSet=<replica-set>"
        WITH DATABASE "admin"

使用来自DigitalOcean的你自己的MongoDB连接详细信息替换,并在你的LoT Notebook中运行该Route。重要提示: 当可用时,请使用VPC连接字符串格式。连接字符串应包括tls=trueauthSource=admin参数。有关MongoDB连接故障排除,请参阅我们关于连接MongoDB的教程。

为MongoDB数据库存储更新模型

修改你的LoT模型以使用数据库路由进行可扩展存储,通过将此添加到模型的末尾:

STORE IN "mongo_route"
    WITH TABLE "MachineProductionData"

此外,添加一个带有主题的参数,以便在你的托管数据库中为每个条目提供唯一标识符。

DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
    ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
    ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
    ADD "energy_wh" WITH (energy * 1000)
    ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
    ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
    ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
    ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
    ADD "timestamp" WITH TIMESTAMP "UTC"
    STORE IN "mongo_route"
        WITH TABLE "MachineProductionData"

部署此更新后的操作后,所有数据在更新时应自动存储在数据库中。

OpenSearch集成

OpenSearch是一种分布式搜索和分析引擎,专为大规模数据处理和实时分析而设计。在本节中,你将学习如何将你的Coreflux MQTT代理连接到DigitalOcean上的托管OpenSearch数据库,以便你的实时设备数据能够安全可靠地持久化,用于分析、报告或与其他应用程序集成。

要启用此集成,你必须在Coreflux的LoT(Language of Things)中定义一个Route,指示处理后的数据应该发送到哪里以及如何发送。下面是路由数据到OpenSearch数据库所需的低代码格式。请务必根据需要替换你自己的连接详细信息:

DEFINE ROUTE OpenSearch_log WITH TYPE OPENSEARCH
    ADD OPENSEARCH_CONFIG
        WITH BASE_URL "https://my-opensearch-cluster:9200"
        WITH USERNAME "myuser"
        WITH PASSWORD "mypassword"
        WITH USE_SSL TRUE
        WITH IGNORE_CERT_ERRORS FALSE

使用来自DigitalOcean的你自己的OpenSearch连接详细信息替换,并在你的LoT Notebook中运行该Route。重要提示: 当可用时,请使用VPC基础URL(而非公共URL)。基础URL格式通常为https://your-cluster-hostname:9200。对于OpenSearch仪表板访问,请使用数据库集群详细信息中提供的单独的仪表板URL。有关更多详细信息,请参阅我们的OpenSearch快速入门。

为OpenSearch数据库存储更新模型

修改你的LoT模型以使用数据库路由进行可扩展存储,通过将此添加到模型的末尾:

STORE IN "OpenSearch_Log"
    WITH TABLE "MachineProductionData"

此外,添加一个带有主题的参数,以便在你的托管数据库中为每个条目提供唯一标识符。

DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
    ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
    ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
    ADD "energy_wh" WITH (energy * 1000)
    ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
    ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
    ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
    ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
    ADD "timestamp" WITH TIMESTAMP "UTC"
    STORE IN "OpenSearch_Log"
        WITH TABLE "MachineProductionData"

部署此更新后的操作后,所有数据在更新时应自动存储在数据库中。

步骤9 — 验证完整的物联网自动化管道

监控实时数据流

  1. MQTT Explorer:使用MQTT客户端验证实时数据发布
  2. 数据库客户端:连接以验证数据的存储(PostgreSQL使用DBeaver,MongoDB使用MongoDB Compass,OpenSearch使用OpenSearch Dashboards)

验证PostgreSQL存储

使用DBeaver连接到你的PostgreSQL托管数据库以验证可扩展存储

  1. 使用来自你的DigitalOcean数据库的连接字符串
  2. 导航到 coreflux-broker-data 数据库(或你为数据库指定的名称)
  3. 检查 MachineProductionData 表中存储的记录

显示存储的物联网记录的PostgreSQL表

正如我们之前看到的,所有数据都可在MQTT代理中用于其他用途和集成。

带有实时机器数据的MQTT主题

验证MongoDB存储

使用MongoDB Compass连接到你的MongoDB托管数据库以验证可扩展存储

  1. 使用来自你的DigitalOcean数据库的连接字符串
  2. 导航到 coreflux-broker-data 数据库(或你为数据库指定的名称)
  3. 检查 MachineProductionData 集合中存储的文档

检查数据库存储

你应该看到具有类似结构的实时数据文档:

{
  "_id": {
    "$oid": "68626dc3e8385cbe9a1666c3"
  },
  "energy": 36,
  "energy_wh": 36000,
  "production_status": "active",
  "production_count": 31,
  "stoppage": 0,
  "maintenance_alert": false,
  "timestamp": "2025-06-30 10:58:11",
  "device_name": "station2"
}

正如我们之前看到的,所有数据都可在MQTT代理中用于其他用途和集成。

验证MySQL存储

使用DBeaver连接到你的MySQL托管数据库以验证可扩展存储:

  1. 使用来自你的DigitalOcean数据库的连接字符串
  2. 导航到coreflux-broker-data数据库(或你为数据库指定的名称)
  3. 检查MachineProductionData表中存储的记录

验证MySQL中存储的物联网记录

与其他集成一样,所有数据也可在MQTT代理中用于其他用途和下游集成。

监控实时数据流

验证OpenSearch存储

使用提供的URL和凭据打开OpenSearchDashboards

  1. 打开菜单并选择索引管理选项

    1. 在菜单中选择索引选项,查看你的表名是否出现在列表中

检查数据库存储

  1. 返回主页并在菜单中选择发现选项

    1. 按照提供的步骤创建索引模式
    2. 返回到发现页面,你应该会看到你的数据

检查数据库存储

正如我们之前看到的,所有数据都可在MQTT代理中用于其他用途和集成。

检查数据库存储

步骤10 - 扩展你的用例和集成

测试LoT能力

  • 发布示例数据:使用MQTT Explorer将示例数据集发布到你的Coreflux代理。尝试不同的负载结构和不同的模型/操作,查看它们如何处理并存储到你选择的数据库中。
  • 数据验证:验证你数据库中的数据与你发布的有效负载是否匹配。使用你的数据库客户端(PostgreSQL使用DBeaver,MongoDB使用MongoDB Compass,OpenSearch使用OpenSearch Dashboards)检查一致性和准确性,确保你的物联网自动化集成按预期工作。比较时间戳、字段转换和数据类型,以验证你的实时数据管道。
  • 实时监控:使用另一个MQTT数据源(例如具有MQTT连接功能的简单传感器)设置连续的实时数据馈送。观察Coreflux和你的数据库如何处理传入的物联网数据流,并探索数据检索和查询的响应时间。

构建分析和可视化

  • 创建仪表板:与Grafana等可视化工具集成,创建显示你的物联网数据的仪表板,从实时MQTT主题和历史数据库查询中提取数据。跟踪指标,如设备正常运行时间、传感器读数、生产计数或来自你自动化系统的维护警报。了解如何使用我们的教程设置DigitalOcean托管数据库与Prometheus和Grafana的监控。对于实时仪表板,直接订阅MQTT主题;对于历史趋势和聚合,查询你的数据库。
  • 趋势分析:利用你数据库的能力来分析随时间变化的趋势:

    • PostgreSQL:使用SQL查询进行复杂的关系分析
    • MongoDB:使用聚合框架进行基于文档的分析
    • OpenSearch:使用高级分析和搜索能力进行全文搜索和时间序列分析
  • 多数据库集成:探索集成其他托管数据库,如用于非结构化数据的MongoDB,用于关系数据的PostgreSQL,用于结构化查询的MySQL,或用于高级分析和搜索的OpenSearch。使用Coreflux路由将数据同时发送到多个目的地。

优化和扩展你的物联网基础设施

  • 负载测试:使用LoT Notebook或自动化脚本通过同时发布多条消息来模拟高流量。监控你的Coreflux MQTT代理和数据库集群如何处理负载,并识别你的数据管道中的任何瓶颈。
  • 扩展DigitalOcean提供垂直和水平扩展选项。随着你的物联网数据需求增长,增加droplet资源(CPU、RAM或存储)。扩展你的托管数据库集群以处理更大的数据集,并配置自动扩展警报,以便在接近资源限制时通知你。

常见问题解答

1. 如何将Coreflux MQTT代理与托管数据库集成?

你通过定义指向目标服务(PostgreSQL、MySQL、MongoDB或OpenSearch)的LoTRoute来将Coreflux MQTT代理与托管数据库集成。每个路由使用适当的连接参数(服务器或连接字符串、端口、数据库名称、用户名、密码和SSL/TLS选项),并自动将MQTT消息有效负载持久化到表、集合或索引中。一旦定义好路由,你就使用STORE IN指令将其附加到Model,这样每个处理后的消息都会被写入你选择的数据库。

2. 我能否在不编写自定义集成代码的情况下将MQTT数据直接保存到数据库?

可以。Coreflux设计为一个低代码集成层,因此你无需编写应用程序代码或外部ETL作业来持久化数据。对于每种数据库类型,你配置一个LoT路由(例如,PostgreSQL_LogMySQL_Logmongo_routeOpenSearch_Log),然后使用STORE IN "<route_name>" WITH TABLE "MachineProductionData"扩展你的模型。Coreflux处理连接池、重试和错误处理,因此你可以专注于建模主题和转换,而不是样板数据库代码。

3. 我应该为MQTT物联网数据存储选择哪种托管数据库?

你的MQTT物联网数据的最佳托管数据库取决于你的数据结构、查询需求和分析目标。使用下面的比较表来帮助你决定:

数据库最适合示例用例
PostgreSQL强一致性、关系模式、复杂的SQL查询工业传感器网络、事务性事件、需要跨连接数据集的分析
MySQL关系数据、结构化查询、广泛的兼容性库存系统、生产指标、传统业务记录
MongoDB灵活、不断演进的模式;文档存储具有可变负载的互联设备、具有变化格式的物联网遥测
OpenSearch全文搜索、分析、仪表板、日志索引时间序列分析、监控、事件日志、物联网搜索和可视化

提示: 你可以通过配置多个Coreflux路由同时使用多个托管数据库。这使得可以从同一个MQTT流中,将结构化的物联网数据存储在PostgreSQL或MySQL中,在OpenSearch中聚合日志和指标,并在MongoDB中收集非结构化或无模式数据。

4. 这种架构如何处理实时和历史分析?

Coreflux将所有处理后的值保留在MQTT主题上,供实时消费、仪表板或额外管道使用,而Routes则将相同的建模数据持久化到你的数据库中,用于历史查询。在实践中,你可以订阅主题以进行即时反应(警报、控制回路),并查询PostgreSQL/MySQL/MongoDB/OpenSearch以进行聚合、趋势和长期分析。这种双路径设计反映了MQTT和物联网数据集成教程中的常见模式,其中代理提供实时消息传递,而数据库提供持久存储和分析。

5. Coreflux和托管数据库之间的连接有多安全?

当部署在DigitalOcean上时,你可以使用VPC网络来保持Coreflux MQTT代理和数据库之间的所有通信私密。VPC将你的资源与公共互联网访问隔离开来,并且DigitalOcean托管数据库支持连接的TLS加密。此外,你可以为你的Coreflux应用程序创建具有有限权限的专用数据库用户,遵循最小权限原则。

6. 这个设置是否适用于生产环境物联网部署?

是的。这种架构反映了生产环境中MQTT和数据库集成所使用的模式,其中代理前端处理设备流量,而托管数据库层提供持久性和分析。DigitalOcean托管数据库提供自动备份、高可用性和监控,而Coreflux MQTT代理可以水平扩展以处理高消息吞吐量。对于生产环境,你还应该配置防火墙规则、使用强凭据、为MQTT和数据库连接启用TLS,并根据预期的消息量来调整你的droplet和集群大小。

7. 我能否在没有公共互联网访问的情况下,或在混合环境中运行MQTT代理?

可以。MQTT代理通常部署在私有网络或边缘环境中,公共资源一致指出,只要客户端可以访问代理,MQTT就可以在没有公共互联网的情况下工作。使用DigitalOcean,你可以将Coreflux和你的数据库保持在VPC内部,并且只暴露绝对必要的内容(例如,VPN、堡垒主机或有限的防火墙规则)。如果你需要混合或多站点架构,你还可以将选定的主题与其他代理或云区域同步。

8. 在物联网数据中使用MQTT和数据库是否存在任何限制或权衡?

MQTT针对轻量级、事件驱动的消息传递进行了优化;数据库则针对存储和查询进行了优化。存储每一条原始消息可能会变得昂贵或嘈杂,因此最佳实践建议仔细建模数据(例如,聚合指标、过滤主题或降采样)。极低功耗设备或超受限网络可能难以维持持久连接或处理TLS开销,在这种情况下,你可能需要调整QoS级别、批处理和保留策略。只要你在设计中考虑到这些权衡,MQTT加上托管数据库对于大多数物联网场景都能很好地工作。

9. 我如何为我的物联网项目在PostgreSQL、MySQL、MongoDB和OpenSearch之间做出选择?

你应该根据物联网数据结构、可扩展性以及你希望如何查询设备数据来选择托管数据库。下表总结了每个选项的优势:

数据库当...时最佳典型用例关键优势
PostgreSQL你需要复杂的关系查询、强一致性和事务完整性(ACID支持)。工业传感器网络、将设备数据与生产相关联、需要对连接的数据集进行分析关系模式、高级SQL、一致性
MySQL你的工作负载是结构化的,具有广泛的工具和兼容性需求。库存跟踪、传统业务系统、生产指标更简单的关系需求、广泛支持
MongoDB你的设备负载和模式不断演变,或者你希望使用灵活的、基于文档的存储进行快速原型设计。具有可变格式的物联网遥测、快速开发、半结构化数据灵活的模式、易于扩展、快速原型设计
OpenSearch你需要分析、搜索或对大容量的物联网数据(日志、时间序列、事件)进行仪表板展示。搜索传感器数据、日志分析、可视化、基于关键字/时间的查询搜索、全文、分析、快速聚合

结论

将Coreflux MQTT代理与DigitalOcean的托管数据库服务(PostgreSQL、MongoDB、MySQL或OpenSearch)集成,为你提供了实时物联网数据处理和存储的完整设置。按照本教程,你已经使用低代码开发实践构建了一个收集、处理和存储物联网数据的自动化管道。

借助Coreflux的架构和你选择的数据库的存储特性,你可以处理大量的实时数据并查询它以获取洞察。无论你是监控工业系统、跟踪环境传感器还是管理智慧城市基础设施,这种设置都让你能够基于实时MQTT主题和历史数据库查询做出数据驱动的决策。

了解更多关于DigitalOcean托管数据库的信息,以及DigitalOcean 针对 IoT行业的产品服务支持,可咨询 DigitalOcean 中国区独家战略合作伙伴卓普云AI Droplet(aidroplet.com)

你可以尝试提供的用例或使用Coreflux和DigitalOcean实现你自己的用例。你还可以在DigitalOcean Droplet市场或通过Coreflux网站获取免费的Coreflux MQTT代理

Apifox 新版本上线啦!

看看本次版本更新主要涵盖的重点内容,有没有你所关注的功能特性:

  • 支持创建 MCP Client 以调试 MCP Server
  • 支持创建「测试套件」
  • 测试报告全面重构,支持结构化展示
  • 调试能力持续升级

    • 支持查看 HTTP 版本、TLS 协议等网络信息
    • array 类型参数的子元素支持直接选择枚举值
    • 调试 SSE 接口时,支持 \r\n 换行符
  • 支持导入 Hoppscotch 的 Collection
  • 优化邀请他人加入项目的流程
  • 用户反馈优化

    • 解决 MongoDB 数据库的密码包含特殊字符 % 时无法连接成功的问题
    • 解决绑定了手机号的用户无法通过“忘记密码”功能重置密码的问题

将 Apifox 更新至最新版,一起开启全新体验吧!

支持创建 MCP Client 以调试 MCP Server

Apifox 升级后,支持创建 MCP Client,开发者可以像调试 API 一样,直接调试 MCP Server 的ToolsResourcesPrompts。同时支持STDIOStreamable HTTP 协议,并可自动配置 OAuth 2.0 认证流程,极大简化连接与认证流程,全面提升 AI Agent 的开发与调试效率。

更多关于 MCP 的内容,可以前往 帮助文档查看。

支持创建 MCP Client 以调试 MCP Server

支持创建「测试套件」

Apifox 推出了「测试套件」功能,支持添加单接口用例和场景用例,并可在「静态」与「动态」两种模式间切换:

  • 静态模式用于精确指定需要执行的测试项,内容不会变动,且支持灵活调整测试步骤的顺序。
  • 动态模式可通过规则自动筛选需执行的测试项。每次运行时,系统会实时扫描项目,将所有符合条件的最新用例纳入执行。此模式下,仅可整体删除或修改过滤条件,无法单独删除组内的某个动态项。

场景用例侧重于测试流程的编排,测试套件则聚焦于灵活高效的测试执行,两者并非互相替代,而是分层协作,面向不同的使用场景,结合使用可以满足自动化测试的多样化需求。

更多关于测试套件的具体教程,可以查看往期内容《 测试用例越堆越多?用 Apifox 测试套件让自动化回归更易维护》。

支持创建「测试套件」

测试报告全面重构,支持结构化展示

最新版本的 Apifox 对测试报告界面进行了全面重构,新版测试报告支持结构化展示所有测试步骤,让测试结果的层次关系更加清晰明了。用户可以快速定位每个测试步骤的执行情况和结果,从而更高效地分析测试问题,提升测试结果的可读性和分析效率。

测试报告全面重构,支持结构化展示

调试能力持续升级

支持查看 HTTP 版本、TLS 协议等网络信息

更新 Apifox 后,开发者在调试接口时可直接查看 HTTP 版本、TLS 协议等详细网络信息,从而快速掌握接口请求的通信细节,有助于进行更精确的问题定位和性能分析。

同时,支持查看更详细的响应大小信息,包括 Body 和 Header 的大小,以及压缩前后的 Body 大小,便于判断 gzip 等压缩功能是否正常工作。

支持查看 HTTP 版本、TLS 协议等网络信息

array 类型参数的子元素支持直接选择枚举值

调试接口时,如果 array 类型参数的子元素包含枚举值,用户可直接从列表中选择,无需手动输入,简化了配置流程,提升参数配置的便捷性与准确性,使接口调试更加高效流畅。

array 类型参数的子元素支持直接选择枚举值

调试 SSE 接口时,支持 \r\n 换行符

Apifox 优化了 SSE 接口的调试体验。SSE 规范采用 \n\n 作为标准换行符,但一些实际业务中常用 \r\n (Windows 换行符) 进行分隔。Apifox 现已兼容并正确解析 \r\n 换行符,确保 SSE 流式数据能够以标准或非标准格式都能正确显示,帮助开发者更高效、准确地调试和验证 SSE 接口响应内容。

支持导入 Hoppscotch 的 Collection

Apifox 现已支持导入 Hoppscotch Collection 数据,帮助团队更便捷地将 Hoppscotch 项目迁移到 Apifox,进一步扩展了数据迁移的兼容性,降低迁移成本,提升团队协作的灵活性。

支持导入 Hoppscotch 的 Collection

优化邀请他人加入项目的流程

Apifox 对项目成员邀请流程进行了优化,除了链接邀请和邮箱邀请外,还支持直接从团队成员列表中选择成员加入项目,并可在邀请时同步设置项目权限,大幅简化了成员管理流程,让团队协作配置更加便捷高效。

优化邀请他人加入项目的流程

用户反馈优化

解决 MongoDB 数据库的密码包含特殊字符%时无法连接成功的问题

根据用户反馈,我们已修复 MongoDB 数据库连接中存在的问题:当数据库密码包含特殊字符 % 时,Apifox 现能正确处理并成功建立连接,进一步提升了数据库连接功能的稳定性和兼容性。

解决绑定了手机号的用户无法通过“忘记密码”功能重置密码的问题

现在通过"忘记密码"功能可以正常重置密码,确保用户在需要时能顺利找回账号访问权限,提升账户安全管理功能的完整性与可用性。

了解更多

当然,Apifox 产品团队为大家带来的新功能远不止以上这些:

  • 优化了前后置操作的界面
  • 优化了测试报告列表,支持结构化展示、筛选
  • 支持复制测试套件的分享链接
  • 支持调整测试套件静态步骤内资源的顺序
  • 导入 OpenAPI/Swagger 数据时,支持 Query 类型的 HTTP 方法和 additionalOperation
  • 优化了变量预览弹窗的触发时间,让其有一个合理的延迟
  • 在付费页进行续费时,逻辑更合理与友好
  • 解决在接口返回的响应数据上点击右键,没有 Copy JSONPath 等功能的问题
  • 解决当根目录的可见性为内部时,WebSocket 接口仍被发布到公开在线文档的问题
  • 解决未绑定支付方式的团队无法被正确转入组织的问题

除了新增功能,我们也对产品细节和使用体验进行了优化,具体修改内容可点击「阅读原文」前往 Apifox 更新日志查看,有任何问题欢迎在Apifox 用户群与我们交流沟通。

同时,Apifox 提供企业私有化部署版本,通过本地化部署、客制化服务,协助企业进一步提升研发团队效能。

欢迎各位用户对 Apifox 继续提出使用反馈和优化意见,我们会持续优化更新,致力于为用户提供更优秀的产品功能和更极致的使用体验!

熟悉 Spring Boot 3 的开发者,都知道它在简化开发流程、提高开发效率方面的出色表现吧!但是,在实际业务场景中,大家肯定都碰到过这样的棘手问题:订单数据存放在 MySQL 里,库存数据在 PostgreSQL 中,用户数据又保存在 MongoDB 中,当多种数据源同时存在时,想要实现统一查询简直比登天还难。

所以呢,今天我就亮出我的“终极大招”——Apache Calcite,着重给大家讲讲它怎样与 Spring Boot 3 实现无缝集成,还会分享一些可以直接拿来使用的经典应用场景。掌握了这一招,多数据源查询的难题就能轻松解决啦!

一、核心认知:Apache Calcite 为何是多数据源查询的利器?

在动手集成前,咱们先把核心逻辑搞明白:为啥 Calcite 能成为多数据源查询的“万能钥匙”?它的核心优势到底在哪?

1.1 不止是查询引擎:Calcite 的核心定位

Apache Calcite 本质是一个动态数据管理框架,而非传统的数据库。它最核心的价值在于“解耦”——将数据存储与数据查询分离,无论数据存在哪里、是什么格式,都能通过统一的 SQL 接口进行查询。

说通俗点,Calcite 就像个“超级数据翻译官”——不管数据藏在哪个数据源里、是什么格式,你只要写一套标准 SQL,它就能翻译成对应数据源能懂的指令,最后把结果整理成统一格式返回。这也是它能搞定多数据源查询的核心秘诀!

1.2 Calcite 的核心能力拆解

统一 SQL 接口:支持标准 SQL,无论底层是关系型数据库(MySQL、PostgreSQL)、非关系型数据库(MongoDB、Redis),还是文件(CSV、Parquet)、大数据引擎(Hive、Spark),都能通过同一套 SQL 查询。

  1. 强大的查询优化:内置基于规则和成本的查询优化器,能自动优化 SQL 执行计划,提升查询效率,尤其是在复杂多表关联、跨数据源查询场景下,优化效果明显。
  2. 灵活的数据源适配:通过“适配器(Adapter)”机制适配不同数据源,社区已提供大量现成适配器,也支持自定义开发,适配特殊数据源。
  3. 轻量级集成:核心依赖体积小,无复杂依赖,可轻松集成到 Spring Boot、Spring Cloud 等主流 Java 开发框架中,无需单独部署独立服务(也支持独立部署)。

    二、重点实战:Spring Boot 3 集成 Calcite 核心步骤

    既然大家都熟悉 Spring Boot 3 的基础操作,我就不啰嗦项目搭建这些常规步骤了,直接聚焦 Calcite 集成的核心环节,每一步都附完整代码和避坑提醒,跟着做就能成!

2.1 核心依赖引入

第一步先引依赖,在 pom.xml 里加好 Calcite 核心包、对应数据源的适配器,再配上 MyBatis Plus 的核心依赖(替换掉原来的 Jdbc 依赖就行),具体如下:

<!-- Calcite 核心依赖 -->
<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-core</artifactId>
    <version>1.36.0</version> 
</dependency>

<!-- MySQL 适配器(用于适配 MySQL 数据源) -->
<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-mysql</artifactId>
    <version>1.36.0</version>
</dependency>

<!-- MongoDB 适配器(用于适配 MongoDB 数据源) -->
<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-mongodb</artifactId>
    <version>1.36.0</version>
</dependency>

<!-- Spring Boot 与 MyBatis Plus 集成核心依赖 -->
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.5.5</version> <!-- 适配 Spring Boot 3 的稳定版 -->
</dependency>

<!-- 数据库连接池依赖(MyBatis Plus 需连接池支持) -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>1.2.20</version>
</dependency>

这里有 3 个避坑点必须强调下:

  1. Calcite 所有组件版本要统一,不然容易出现类加载异常;
  2. MyBatis Plus 得选适配 Spring Boot 3 的版本(3.5.3+);
  3. 一定要加连接池依赖,不然 Calcite 数据源没法被 MyBatis Plus 正常管理。

    2.2 核心配置:Calcite 模型文件编写

    模型文件是 Calcite 识别数据源的关键,一般用 JSON 格式,放在 resources 目录下命名为 calcite-model.json 就行。下面给大家一个适配 MySQL 和 MongoDB 双数据源的示例,直接改改连接信息就能用:

{
  "version": "1.0",
  "defaultSchema": "ecommerce",
  "schemas": [
    {
      "name": "ecommerce",
      "type": "custom",
      "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
      "operand": {
        "jdbcUrl": "jdbc:mysql://localhost:3306/ecommerce_order?useSSL=false&serverTimezone=UTC",
        "username": "root",
        "password": "123456",
        "driver": "com.mysql.cj.jdbc.Driver"
      }
    },
    {
      "name": "user_mongo",
      "type": "custom",
      "factory": "org.apache.calcite.adapter.mongodb.MongoSchema$Factory",
      "operand": {
        "host": "localhost",
        "port": 27017,
        "database": "user_db",
        "collection": "user_info"
      }
    }
  ]
}

几个关键配置给大家解释清楚,避免踩坑:

  1. defaultSchema:默认查询的 Schema,可省略,查询时需指定 Schema 名称(如 ecommerce.order、user_mongo.user_info)。
  2. factory:对应数据源的适配器工厂类,Calcite 已为主流数据源提供现成工厂,自定义数据源需实现自己的 Factory。
  3. operand:数据源连接参数,根据数据源类型不同配置不同参数(如 MySQL 的 jdbcUrl、MongoDB 的 host/port)。

    2.3 Spring Boot 集成 Calcite + MyBatis Plus 核心配置

    这一步是核心,主要分两步走:

    配置好 Calcite 数据源;
    让 MyBatis Plus 用上这个数据源,顺便把 mapper 扫描、分页插件这些基础参数配好。直接上配置类代码:

    import com.baomidou.mybatisplus.annotation.DbType;
    import com.baomidou.mybatisplus.autoconfigure.ConfigurationCustomizer;
    import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
    import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
    import org.apache.calcite.jdbc.CalciteConnection;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
    
    import javax.sql.DataSource;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.util.Properties;
    
    @Configuration
    // MyBatis Plus  mapper 接口扫描(指定 mapper 包路径)
    @MapperScan(basePackages = "com.example.calcite.mapper")
    public class CalciteMybatisPlusConfig {
    
     // 1. 配置 Calcite 数据源(核心,与原逻辑一致)
     @Bean
     public DataSource calciteDataSource() throws Exception {
         Properties props = new Properties();
         props.setProperty("model", "classpath:calcite-model.json");
         Connection connection = DriverManager.getConnection("jdbc:calcite:", props);
         CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
         return calciteConnection.getDataSource();
     }
    
     // 2. 配置 MyBatis Plus 的 SqlSessionFactory,指定使用 Calcite 数据源
     @Bean
     public SqlSessionFactory sqlSessionFactory(DataSource calciteDataSource) throws Exception {
         MybatisSqlSessionFactoryBean sessionFactory = new MybatisSqlSessionFactoryBean();
         // 注入 Calcite 数据源
         sessionFactory.setDataSource(calciteDataSource);
         // 配置 mapper.xml 文件路径(如果使用 XML 方式编写 SQL)
         sessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver()
                 .getResources("classpath:mapper/*.xml"));
         // 配置 MyBatis Plus 全局参数(可选)
         org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
         configuration.setMapUnderscoreToCamelCase(true); // 下划线转驼峰
         sessionFactory.setConfiguration(configuration);
         // 注入 MyBatis Plus 插件(如分页插件)
         sessionFactory.setPlugins(mybatisPlusInterceptor());
         return sessionFactory.getObject();
     }
    
     // 3. MyBatis Plus 分页插件(可选,复杂查询分页用)
     @Bean
     public MybatisPlusInterceptor mybatisPlusInterceptor() {
         MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
         interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL)); // 适配 Calcite 兼容的 MySQL 语法
         return interceptor;
     }
    
     // 4. 配置事务管理器(可选,需要事务支持时添加)
     @Bean
     public PlatformTransactionManager transactionManager(DataSource calciteDataSource) {
         return new DataSourceTransactionManager(calciteDataSource);
     }
    }

    核心逻辑给大家捋一捋:先通过 Calcite 创建统一的数据源,再把它注入到 MyBatis Plus 的 SqlSessionFactory 里。这样一来,咱们后续写代码就完全是 MyBatis Plus 的熟悉风格了,不管是 Mapper 接口还是 XML 映射文件,都能直接用,跨数据源查询的复杂逻辑全交给 Calcite 处理。

2.4 核心查询实现(MyBatis Plus 风格)

接下来就是大家最熟悉的查询实现环节了,我用 MyBatis Plus 最常用的“Mapper 接口+注解”和“XML”两种方式来演示,还是以 MySQL 订单表和 MongoDB 用户表的关联查询为例,大家可以根据自己的习惯选:

(1)定义实体类(对应跨数据源查询结果,可使用 lombok 简化代码)

import lombok.Data;

@Data
public class UserOrderVO {
    private String orderId;      // 订单 ID(来自 MySQL)
    private String orderTime;    // 下单时间(来自 MySQL)
    private BigDecimal amount;   // 订单金额(来自 MySQL)
    private String userName;     // 用户名(来自 MongoDB)
    private String phone;        // 手机号(来自 MongoDB)
    private String userId;       // 用户 ID(关联字段)
}

(2)定义 Mapper 接口(MyBatis Plus 风格,无需编写实现类)

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.List;

// 继承 BaseMapper,获得 MyBatis Plus 基础 CRUD 能力
public interface UserOrderMapper extends BaseMapper<UserOrderVO> {
    // 注解方式编写跨数据源关联 SQL
    @Select("SELECT " +
            "o.order_id AS orderId, o.order_time AS orderTime, o.amount, " +
            "u.user_name AS userName, u.phone, o.user_id AS userId " +
            "FROM ecommerce.order o " +  // ecommerce:MySQL 的 Schema;order:订单表
            "JOIN user_mongo.user_info u " +  // user_mongo:MongoDB 的 Schema;user_info:用户表
            "ON o.user_id = u.user_id " +
            "WHERE o.user_id = #{userId}")
    List<UserOrderVO> queryUserOrderByUserId(@Param("userId") String userId);

}

(3)编写 Service 层

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
import java.util.List;

@Service
public class UserOrderServiceImpl extends ServiceImpl<UserOrderMapper, UserOrderVO> implements UserOrderService {
    @Override
    public List<UserOrderVO> getUserOrderByUserId(String userId) {
        // 调用 Mapper 接口方法,实现跨数据源查询
        return baseMapper.queryUserOrderByUserId(userId);
        // 若使用 XML 方式:return baseMapper.queryUserOrderByUserIdWithXml(userId);
    }
}

(4)编写 Controller 层

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;

@RestController
public class CrossDataSourceQueryController {
    @Autowired
    private UserOrderService userOrderService;

    @GetMapping("/user/order/{userId}")
    public List<UserOrderVO> queryUserOrder(@PathVariable String userId) {
        // 调用 Service 方法,返回跨数据源查询结果
        return userOrderService.getUserOrderByUserId(userId);
    }
}

最后再划 3 个重点,确保大家少走弯路:

  1. 实体类字段要和查询结果列名对应,用别名适配下划线转驼峰更省心;
  2. Mapper 接口继承 BaseMapper 后,MyBatis Plus 的分页、条件构造器这些功能都能直接用,复杂查询也能轻松搞定;
  3. 咱们写的都是标准 SQL,Calcite 会自动解析适配不同数据源,完全不影响大家原来的开发习惯。

    三、深度解析:Calcite 的经典使用场景

    讲完了集成步骤,再跟大家深度拆解下 Calcite 的经典落地场景。毕竟技术最终要服务于业务,这些场景都是我在实际项目中常用到的,拿来就能用!

第一个经典场景是多系统数据融合查询,这也是企业级中台的核心需求。做企业级中台的小伙伴肯定深有体会,大型企业里数据都是分散的——订单系统用 MySQL,用户系统用 MongoDB 存行为数据,库存系统用 PostgreSQL。要是想做“用户-订单-库存”全链路分析,传统做法得分别调三个系统的接口,再在业务层手动整合数据,不仅效率低,还容易出错。用 Calcite 分别适配这三个数据源后,只要写一套标准 SQL 就能实现跨数据源关联查询,咱们用 Spring Boot 3 搭好接口服务,业务层完全不用管数据存在哪,专注核心业务逻辑就行,亲测开发效率能提升 50%以上,再也不用写重复的接口调用和数据整合代码,而且 Calcite 的查询优化器会自动优化关联逻辑,查询效率也能跟上。

第二个场景是实时数据与离线数据联动查询,做电商的小伙伴应该经常遇到这类需求。比如实时订单数据存在 Kafka 里,历史订单数据存在 Hive 里,运营需要实时查看“今日订单+近 30 天历史订单”的汇总数据来做实时监控和决策。这种情况不用麻烦地把 Kafka 数据同步到 Hive,也不用把 Hive 数据同步到实时库,直接用 Calcite 的 Kafka 适配器(calcite-kafka)和 Hive 适配器(calcite-hive),就能把实时流数据和离线数据放到同一个查询体系里,写一条 SQL 就能实现“实时+离线”数据的联合查询,既省了大量数据同步成本,又能兼顾实时性和准确性,还支持增量查询。

第三个场景是自定义数据源适配,主要解决特殊格式数据查询的难题。企业里总有很多 CSV、Excel、Parquet 格式的文件数据,传统做法是先把这些文件导入数据库才能查询,步骤又多又耗时,尤其是临时做数据分析的时候,导入数据库的成本太高了。而 Calcite 内置了文件适配器(calcite-file),支持直接查询这些文件数据,根本不用导入数据库。咱们再结合 Spring Boot 3 的文件上传功能,还能实现“文件上传后直接用 SQL 查询”的需求,临时分析数据超方便。如果有企业内部的特殊格式文件,比如自定义的二进制文件,也可以自己实现 Calcite 的 SchemaFactory 和 TableFactory 接口,写个自定义适配器,就能适配这些特殊数据源了。

四、避坑指南:集成注意事项与优化建议

4.1 这些坑一定要避开!

  • 适配器版本要统一:Calcite 核心依赖和各数据源适配器的版本必须一致,不然很容易出现类加载异常,这个坑我踩过,大家一定要注意。
  • 模型文件配置要规范:Schema 名称、表名要清晰,别重复;数据源的地址、端口、账号密码这些连接参数一定要准确,错一个就会连接失败。
  • 要考虑数据源性能:跨数据源查询的性能取决于最慢的那个数据源,所以要确保每个数据源自身性能没问题,不然会拖慢整个查询。

    4.2 优化小技巧,查询更快更稳

  • 启用 Calcite 缓存:配置一下 Calcite 的元数据缓存和查询计划缓存,能减少重复解析和元数据查询的时间,提升查询效率。
  • 优化 SQL 写法:尽量避免复杂的多表关联,能把过滤条件下推到数据源的就尽量下推。虽然 Calcite 会自动优化,但手动优化后的效果会更好。
  • 自定义优化规则:如果是特别复杂的业务场景,可以自己实现 Calcite 的 OptimizerRule 接口,写自定义的查询优化规则,进一步提升查询效率。

    五、本文总结

    最后总结一下,对于熟悉 Spring Boot 3 的咱们来说,集成 Calcite 的关键就是理解它“统一查询”的核心思想,把模型文件写对、核心 Bean 配置好,就能快速实现多数据源查询能力了。https://mybj123.com/28732.html

MongoDB 最近修补了CVE-2025-14847,这是一个影响多个支持和遗留 MongoDB 服务器版本的漏洞。根据披露,该漏洞可以被未认证的攻击者以较低的复杂度远程利用,可能导致敏感数据和凭证的外泄。

 

这个漏洞被称为 MongoBleed,以臭名昭著的Heartbleed命名,CVSS 得分为8.7,由对 zlib 压缩网络流量处理不当触发,允许未经身份验证的攻击者泄露未初始化的内存,并可能从受影响的 MongoDB 服务器窃取敏感数据,如凭证或令牌。根据Wiz的安全研究人员,该漏洞正在被广泛利用。

 

正如 MongoDB 的声明所述,MongoDB Atlas 上的托管实例已经被修补,但是如果自托管 MongoDB 不更新,仍然存在风险。强烈建议组织立即应用安全补丁,或禁用压缩并限制网络暴露。Merav Bar、Amitai Cohen、Yaara Shriki 和 Gili Tikochinski 解释:

 

CVE-2025-14847 源于 MongoDB 服务器基于 zlib 的网络消息解压缩逻辑中的一个缺陷,该逻辑在认证之前进行了处理。通过发送畸形的压缩网络数据包,未经身份验证的攻击者可以触发服务器错误处理解压缩的消息长度,导致返回给客户端未初始化堆内存。

 

根据 Wiz 文章,42%的云环境中至少有一个易受攻击的 MongoDB 实例,Censys 报告称全球大约有 87,000 台服务器存在潜在的风险。由于该漏洞可以在没有认证或用户交互的情况下被利用,暴露在互联网上的数据库服务器面临特别高的风险。Wiz 团队补充道:

 

在代码层面,这个漏洞是由 message_compressor_zlib.cpp 中的错误长度处理引起的。受影响的逻辑返回了分配的缓冲区大小(output.length()),而不是实际解压缩数据的长度,从而允许过小或畸形的有效载荷暴露相邻的堆内存。

 

这个漏洞影响了自2017年以来发布的所有 MongoDB 版本。Linkfields Innovations 的软件开发人员 Gourav Boiri评论道

 

MongoBleed 突出了即使是成熟的数据库,当暴露或打补丁时,也可能成为关键的攻击面。预认证内存泄露、主动漏洞攻击和 87K+暴露实例——提醒我们,数据库安全就是基础设施安全。

 

在“简单解释MongoBleed”的文章中,Stanislav Kozlovski解释了这一漏洞的工作原理,并警告说:

 

它非常容易被利用——只需要连接到数据库(不需要认证)。截至撰写本文时,它已经被修复,但一些 EOL 版本(3.6、4.0、4.2)将不会得到修复。

 

InfoSec 创始人和实践者Eric Capuano解释了如何从日志中检测数据库服务器是否被利用。在一个流行的Reddit帖子中,用户 misteryuub 争论道:

 

很多人争论说开源代码比闭源代码更安全,或者安全问题会在开源代码中更快被发现。这种级别的漏洞存在是对这个论点的反驳。

 

Kozlovski 不同意:

 

当人们说开源更安全时,他们通常指的是有活跃社区的开源项目。Mongo 在 2017 年似乎没有这个,因为引入这个漏洞的 PR 没有在公共 GitHub 上被审查。

 

MongoDB补丁版本现在可用于从 4.4 到 8.0 的所有支持版本。像Percona Server for MongoDB这样的分支也受到上游漏洞的影响

 

https://www.infoq.com/news/2026/01/mongodb-mongobleed-vulnerability/