教程:构建基于 Coreflux MQTT 与托管数据库的IoT数据管道
MQTT 代理 通过发布-订阅消息模式连接物联网设备和应用程序,使其成为现代 物联网 基础设施的重要组成部分。Coreflux 是一个 低代码 MQTT 代理,增加了实时数据处理和转换功能,让你可以直接与 DigitalOcean 托管数据库(包括 MongoDB、PostgreSQL、MySQL 和 OpenSearch)集成,而无需编写自定义集成代码。 你将学到什么: 本教程将引导你部署一个完整的物联网数据管道——从在 DigitalOcean 上设置托管数据库集群和 Coreflux MQTT 代理,到配置安全的 VPC 网络、使用 Coreflux 的物联网语言 (LoT) 构建数据转换模型,以及自动将处理后的物联网数据存储到你选择的数据库中。最终你将获得一个可用于生产环境的设置,能够处理物联网应用的实时消息传递和持久存储。 在深入了解分步部署过程之前,以下是你将学到的关键点: 本教程为需要实时消息传递结合持久数据存储以及搜索、分析或关系查询等高级功能的物联网应用提供了一个可用于生产环境的基础。 在本教程结束时,你将得到: Coreflux 通过物联网语言编程语言在 DigitalOcean 云平台上提供轻量级 MQTT 代理和数据管道工具,以实现高效的物联网通信。 MQTT(消息队列遥测传输)是一种轻量级的、发布-订阅网络协议,在物联网生态系统中被广泛采用。专为受限设备和低带宽、高延迟或不稳定的网络设计,MQTT 能够在带宽受限的环境中实现高效、实时的消息传递。 Coreflux 提供了一个轻量级 MQTT 代理,以促进物联网设备与应用程序之间的高效、实时通信,包括每个用例所必需的实时数据转换功能。为可扩展性和可靠性而构建,Coreflux 专为低延迟和高吞吐量至关重要的环境量身定制。 无论你是构建一个小型物联网项目还是部署工业监控系统,Coreflux 都能处理设备之间的消息路由和数据流。 在 DigitalOcean 云平台上使用 Coreflux,你将获得: 数据处理: 在你的数据所在之处集中处理你的数据处理需求,确保实时数据处理。 在开始本 MQTT 代理 部署教程之前,你需要: 预计时间: 本教程大约需要 30-45 分钟完成,具体取决于数据库配置时间(通常每个数据库集群需要 1-5 分钟)。 首先,你将创建一个虚拟私有云 (VPC),以确保你的 物联网 服务和 MQTT 代理 之间的安全通信,无需公开访问。 为物联网自动化配置你的 VPC: VPC 将为你所有的物联网资源提供隔离的网络,确保 Coreflux MQTT 代理 和 托管数据库 之间的安全通信。有关 VPC 配置的更多详细信息,请参阅我们关于创建 VPC 网络的教程。 根据你的物联网应用需求,选择以下数据库选项之一: 当你的物联网工作负载需要关系模式、强一致性 和 高级 SQL 分析,并由自动备份、监控和维护支持时,DigitalOcean 上的托管 PostgreSQL 是一个很好的选择。 为物联网自动化配置你的 PostgreSQL 集群: 根据你的 物联网 需求选择你的计划: 托管数据库 创建过程通常需要 1-5 分钟。完成后,你将被重定向到数据库概览页面,在那里你可以查看连接详细信息并执行管理操作。 系统将提示你进行入门步骤,显示你的连接详细信息,你可以配置入站访问规则(建议限制为你的 IP 和仅 VPC)。 (可选操作)限制入站连接: 对于连接详细信息,你将看到两个选项 - 公共网络和 VPC 网络。第一个用于像 DBeaver 这样的工具进行外部访问,而第二个将由 Coreflux 服务用于访问数据库。 记下提供的连接详细信息,包括公共访问和 VPC 访问(每种都有不同的详细信息): 你可以使用提供的连接参数,使用公共访问凭证通过 DBeaver 测试 PostgreSQL 连接: 为了更好的安全性和组织性,为你的 物联网自动化 应用程序创建一个专用用户和数据库。这也可以通过 DBeaver 或 CLI 完成,但 DigitalOcean 提供了一种用户友好的方法: 创建用户: 创建数据库: 注意: 你可能需要更改数据库内的用户权限,以便能够创建表、插入和选择数据。对于 PostgreSQL,使用 GRANT CREATE, INSERT, SELECT ON DATABASE coreflux-broker-data TO coreflux-broker-client; 授予必要的权限。对于 MySQL,使用 GRANT CREATE, INSERT, SELECT ON coreflux-broker-data.* TO 'coreflux-broker-client'@'%';。 当你想要熟悉的 SQL、广泛的生态系统支持以及处理备份、更新和监控的完全托管服务时,DigitalOcean 上的托管 MySQL 是结构化、事务性物联网数据的理想选择。 为物联网自动化配置你的 MySQL 集群: 根据你的 物联网 需求选择你的计划: 托管数据库 创建过程通常需要 1-5 分钟。完成后,你将被重定向到数据库概览页面,在那里你可以查看连接详细信息并执行管理操作。 系统将提示你进行入门步骤,显示你的连接详细信息,你可以配置入站访问规则(建议限制为你的 IP 和仅 VPC)。 (可选操作)限制入站连接: 对于连接详细信息,你将看到两个选项 - 公共网络和 VPC 网络。第一个用于像 DBeaver 这样的工具进行外部访问,而第二个将由 Coreflux 服务用于访问数据库。 记下提供的连接详细信息,包括公共访问和 VPC 访问(每种都有不同的详细信息): 你可以使用提供的连接参数,使用公共访问凭证通过 DBeaver 测试 MySQL 连接。 注意: 你可能需要更改 DBeaver 的驱动程序设置——设置 allowPublicKeyRetrieval = true。 为了更好的安全性和组织性,为你的 物联网自动化 应用程序创建一个专用用户和数据库。这也可以通过 DBeaver 或 CLI 完成,但 DigitalOcean 提供了一种用户友好的方法: 创建用户: 创建数据库: 托管 MongoDB 非常适合灵活或不断演变的物联网负载,让你能够存储异构的传感器文档,而无需严格模式,同时平台处理复制、备份和监控。 为物联网自动化配置你的 MongoDB 集群: 根据你的 物联网 需求选择你的计划: 托管数据库 创建过程通常需要 1-5 分钟。完成后,你将被重定向到数据库概览页面,在那里你可以查看连接详细信息并执行管理操作。 系统将提示你进行入门步骤,显示你的连接详细信息,你可以配置入站访问规则(建议限制为你的 IP 和仅 VPC)。 (可选)限制入站连接: 对于连接详细信息,你将看到两个选项:公共网络和 VPC 网络。第一个用于像 MongoDB Compass 这样的工具进行外部访问,而第二个将由 Coreflux 服务用于访问数据库。 记下提供的连接详细信息,包括公共访问和 VPC 访问(每种都有不同的详细信息): 你可以使用 MongoDB Compass 或提供的连接字符串,使用公共访问凭证测试 MongoDB 连接: 为了更好的安全性和组织性,为你的 物联网自动化 应用程序创建一个专用用户和数据库。这也可以通过 MongoDB Compass 或 CLI 完成,但 DigitalOcean 提供了一种用户友好的方法: 创建用户: 创建数据库: 托管 OpenSearch 专为高容量物联网数据的搜索、日志分析和时间序列仪表板而设计,该服务为你管理集群健康、扩展和索引存储。 为物联网自动化配置你的 OpenSearch 集群: 根据你的 物联网 需求选择你的计划: 托管数据库 创建过程通常需要 1-5 分钟。完成后,你将被重定向到数据库概览页面,在那里你可以查看连接详细信息并执行管理操作。 系统将提示你进行入门步骤,显示你的连接详细信息,你可以配置入站访问规则(建议限制为你的 IP 和仅 VPC)。 (可选)限制入站连接: 对于连接详细信息,你将看到两个选项:公共网络和 VPC 网络。第一个用于工具的外部访问,而第二个将由 Coreflux 服务用于访问数据库。你还将看到访问 OpenSearch 仪表板的 URL 和参数。 记下提供的连接详细信息,包括公共访问和 VPC 访问(每种都有不同的详细信息): 你可以使用提供的凭证通过 OpenSearch 仪表板测试 OpenSearch 连接: 为 MQTT 代理 部署配置你的 droplet: 为你的 物联网 工作负载选择大小: 选择身份验证方法: SSH 密钥:推荐用于提高安全性 最终确定详细信息: 采用与Coreflux Droplet相同的方法,选择Docker作为市场应用镜像。 一旦你的droplet运行起来,通过已定义的认证方法或Droplet主页上提供的Web控制台,使用SSH连接到它: 使用Docker运行Coreflux MQTT代理: 这个Docker命令: 验证MQTT代理是否在运行: 你应该看到一个正在运行的容器: 你可以通过MQTT客户端(如MQTT Explorer)访问MQTT代理,以验证对代理的访问,无论采用何种部署方法。 对于生产环境的物联网自动化部署,配置防火墙规则以限制访问: 配置MQTT代理安全的入站规则: 关于详细的防火墙配置,请参考DigitalOcean的防火墙快速入门教程。生产提示: 将MQTT端口1883限制在特定的源IP或VPC范围,并且对于外部设备连接,优先使用端口1884(带TLS的MQTT)。如果你需要额外的安全层,请考虑使用带有私有网络的DigitalOcean应用平台。 用于Visual Studio Code的LoT(Language of Things)Notebook扩展提供了一个集成的低代码开发环境,用于MQTT代理编程和物联网自动化。了解更多关于Coreflux的Language of Things (LoT)用于低代码物联网自动化的信息。 配置与你的Coreflux MQTT代理的连接,当在顶部栏提示时或通过点击底部左侧栏的MQTT按钮时,使用默认凭据: 假设没有错误,你将在底部左侧栏看到与代理的MQTT连接状态。 对于这个用例,我们将通过一个转换管道将原始数据集成到数据库中。然而,由于在演示中没有连接到任何MQTT设备,我们将利用LoT的能力,并使用一个Action来模拟设备数据。 在LoT中,Action是一种可执行的逻辑,由特定事件触发,例如定时间隔、主题更新或其他操作或系统组件的显式调用。Actions允许与MQTT主题、内部变量和负载进行动态交互,促进复杂的物联网自动化工作流。 因此,我们可以使用一个以定义的时间间隔在特定主题中生成数据的Action,然后由我们将在下面定义的管道的其余部分使用。 你可以下载包含示例项目的github仓库。 使用低代码的LoT(Language of Things)界面创建一个Action来生成模拟传感器数据: 在提供的Notebook中,你还有一个Action可以执行递增计数器来模拟数据,作为提供Action的替代方案。 当你运行这个Action时,它将: 在LoT Notebook界面中显示同步状态 Coreflux中的模型用于转换、聚合和计算来自输入MQTT主题的值,并将结果发布到新主题。它们是创建适用于你多个数据源的UNS - 统一命名空间 - 的基础。 因此,通过该模型,你可以定义原始物联网数据的结构与转换方式,适用于单个设备,也支持同时处理多个设备(借助通配符+实现)。模型还作为用于可扩展存储到托管数据库的关键数据模式。 这个低代码模型: 由于我们使用Action生成了两个模拟传感器/机器,我们可以看到模型结构自动应用于两者,同时生成了一个json对象和各个单独的主题。 选择与你在步骤2中选择的数据库相匹配的数据库集成部分。 在本节中,你将学习如何将处理后的物联网数据存储到DigitalOcean上的PostgreSQL托管数据库中。 要将处理后的物联网数据存储到PostgreSQL托管数据库中,你需要在Coreflux中定义一个Route。Route使用简单、低代码的配置指定数据如何从你的MQTT代理发送到你的PostgreSQL集群: 使用来自DigitalOcean的你自己的PostgreSQL连接详细信息替换,并在你的LoT Notebook中运行该Route。重要提示: 为了更好的安全性和更低的延迟,请使用VPC连接详细信息(而非公共连接)。VPC主机名和端口与公共连接字符串不同 - 请检查你的数据库集群的连接详细信息页面以获取这两个选项。 修改你的LoT模型以使用数据库路由进行可扩展存储,通过将此添加到模型的末尾: 此外,添加一个带有主题的参数,以便在你的托管数据库中为每个条目提供唯一标识符。 部署此更新后的操作后,所有数据在更新时应自动存储在数据库中。 MySQL是一种广泛使用的关系数据库管理系统,非常适合大规模存储和分析物联网数据。在本节中,你将学习如何将你的Coreflux MQTT代理连接到DigitalOcean上的托管MySQL数据库,以便你的实时设备数据能够安全可靠地持久化,用于分析、报告或与其他应用程序集成。 要启用此集成,你必须在Coreflux的LoT(Language of Things)中定义一个Route,指示处理后的数据应该发送到哪里以及如何发送。下面是路由数据到MySQL数据库所需的低代码格式。请务必根据需要替换你自己的连接详细信息: 使用来自DigitalOcean的你自己的MySQL连接详细信息替换,并在你的LoT Notebook中运行该Route。重要提示: 为了更好的安全性和更低的延迟,请使用VPC连接详细信息(而非公共连接)。如果你遇到连接问题,请验证 修改你的LoT模型以使用数据库路由进行可扩展存储,通过将此添加到模型的末尾: 此外,添加一个带有主题的参数,以便在你的托管数据库中为每个条目提供唯一标识符。 部署此更新后的操作后,所有数据在更新时应自动存储在数据库中。 MongoDB是一种NoSQL数据库,非常适合存储和查询具有灵活模式的物联网数据。在本节中,你将学习如何将你的Coreflux MQTT代理连接到DigitalOcean上的托管MongoDB数据库,以便你的实时设备数据能够安全可靠地持久化,用于分析、报告或与其他应用程序集成。 要启用此集成,你必须在Coreflux的LoT(Language of Things)中定义一个Route,指示处理后的数据应该发送到哪里以及如何发送。下面是路由数据到MongoDB数据库所需的低代码格式。请务必根据需要替换你自己的连接详细信息: 使用来自DigitalOcean的你自己的MongoDB连接详细信息替换,并在你的LoT Notebook中运行该Route。重要提示: 当可用时,请使用VPC连接字符串格式。连接字符串应包括 修改你的LoT模型以使用数据库路由进行可扩展存储,通过将此添加到模型的末尾: 此外,添加一个带有主题的参数,以便在你的托管数据库中为每个条目提供唯一标识符。 部署此更新后的操作后,所有数据在更新时应自动存储在数据库中。 OpenSearch是一种分布式搜索和分析引擎,专为大规模数据处理和实时分析而设计。在本节中,你将学习如何将你的Coreflux MQTT代理连接到DigitalOcean上的托管OpenSearch数据库,以便你的实时设备数据能够安全可靠地持久化,用于分析、报告或与其他应用程序集成。 要启用此集成,你必须在Coreflux的LoT(Language of Things)中定义一个Route,指示处理后的数据应该发送到哪里以及如何发送。下面是路由数据到OpenSearch数据库所需的低代码格式。请务必根据需要替换你自己的连接详细信息: 使用来自DigitalOcean的你自己的OpenSearch连接详细信息替换,并在你的LoT Notebook中运行该Route。重要提示: 当可用时,请使用VPC基础URL(而非公共URL)。基础URL格式通常为 修改你的LoT模型以使用数据库路由进行可扩展存储,通过将此添加到模型的末尾: 此外,添加一个带有主题的参数,以便在你的托管数据库中为每个条目提供唯一标识符。 部署此更新后的操作后,所有数据在更新时应自动存储在数据库中。 使用DBeaver连接到你的PostgreSQL托管数据库以验证可扩展存储: 正如我们之前看到的,所有数据都可在MQTT代理中用于其他用途和集成。 使用MongoDB Compass连接到你的MongoDB托管数据库以验证可扩展存储: 你应该看到具有类似结构的实时数据文档: 正如我们之前看到的,所有数据都可在MQTT代理中用于其他用途和集成。 使用DBeaver连接到你的MySQL托管数据库以验证可扩展存储: 与其他集成一样,所有数据也可在MQTT代理中用于其他用途和下游集成。 使用提供的URL和凭据打开OpenSearchDashboards: 打开菜单并选择索引管理选项 返回主页并在菜单中选择发现选项 正如我们之前看到的,所有数据都可在MQTT代理中用于其他用途和集成。 趋势分析:利用你数据库的能力来分析随时间变化的趋势: 你通过定义指向目标服务(PostgreSQL、MySQL、MongoDB或OpenSearch)的LoTRoute来将Coreflux MQTT代理与托管数据库集成。每个路由使用适当的连接参数(服务器或连接字符串、端口、数据库名称、用户名、密码和SSL/TLS选项),并自动将MQTT消息有效负载持久化到表、集合或索引中。一旦定义好路由,你就使用 可以。Coreflux设计为一个低代码集成层,因此你无需编写应用程序代码或外部ETL作业来持久化数据。对于每种数据库类型,你配置一个LoT路由(例如, 你的MQTT物联网数据的最佳托管数据库取决于你的数据结构、查询需求和分析目标。使用下面的比较表来帮助你决定: 提示: 你可以通过配置多个Coreflux路由同时使用多个托管数据库。这使得可以从同一个MQTT流中,将结构化的物联网数据存储在PostgreSQL或MySQL中,在OpenSearch中聚合日志和指标,并在MongoDB中收集非结构化或无模式数据。 Coreflux将所有处理后的值保留在MQTT主题上,供实时消费、仪表板或额外管道使用,而Routes则将相同的建模数据持久化到你的数据库中,用于历史查询。在实践中,你可以订阅主题以进行即时反应(警报、控制回路),并查询PostgreSQL/MySQL/MongoDB/OpenSearch以进行聚合、趋势和长期分析。这种双路径设计反映了MQTT和物联网数据集成教程中的常见模式,其中代理提供实时消息传递,而数据库提供持久存储和分析。 当部署在DigitalOcean上时,你可以使用VPC网络来保持Coreflux MQTT代理和数据库之间的所有通信私密。VPC将你的资源与公共互联网访问隔离开来,并且DigitalOcean托管数据库支持连接的TLS加密。此外,你可以为你的Coreflux应用程序创建具有有限权限的专用数据库用户,遵循最小权限原则。 是的。这种架构反映了生产环境中MQTT和数据库集成所使用的模式,其中代理前端处理设备流量,而托管数据库层提供持久性和分析。DigitalOcean托管数据库提供自动备份、高可用性和监控,而Coreflux MQTT代理可以水平扩展以处理高消息吞吐量。对于生产环境,你还应该配置防火墙规则、使用强凭据、为MQTT和数据库连接启用TLS,并根据预期的消息量来调整你的droplet和集群大小。 可以。MQTT代理通常部署在私有网络或边缘环境中,公共资源一致指出,只要客户端可以访问代理,MQTT就可以在没有公共互联网的情况下工作。使用DigitalOcean,你可以将Coreflux和你的数据库保持在VPC内部,并且只暴露绝对必要的内容(例如,VPN、堡垒主机或有限的防火墙规则)。如果你需要混合或多站点架构,你还可以将选定的主题与其他代理或云区域同步。 MQTT针对轻量级、事件驱动的消息传递进行了优化;数据库则针对存储和查询进行了优化。存储每一条原始消息可能会变得昂贵或嘈杂,因此最佳实践建议仔细建模数据(例如,聚合指标、过滤主题或降采样)。极低功耗设备或超受限网络可能难以维持持久连接或处理TLS开销,在这种情况下,你可能需要调整QoS级别、批处理和保留策略。只要你在设计中考虑到这些权衡,MQTT加上托管数据库对于大多数物联网场景都能很好地工作。 你应该根据物联网数据结构、可扩展性以及你希望如何查询设备数据来选择托管数据库。下表总结了每个选项的优势: 将Coreflux MQTT代理与DigitalOcean的托管数据库服务(PostgreSQL、MongoDB、MySQL或OpenSearch)集成,为你提供了实时物联网数据处理和存储的完整设置。按照本教程,你已经使用低代码开发实践构建了一个收集、处理和存储物联网数据的自动化管道。 借助Coreflux的架构和你选择的数据库的存储特性,你可以处理大量的实时数据并查询它以获取洞察。无论你是监控工业系统、跟踪环境传感器还是管理智慧城市基础设施,这种设置都让你能够基于实时MQTT主题和历史数据库查询做出数据驱动的决策。 了解更多关于DigitalOcean托管数据库的信息,以及DigitalOcean 针对 IoT行业的产品服务支持,可咨询 DigitalOcean 中国区独家战略合作伙伴卓普云AI Droplet(aidroplet.com)。 你可以尝试提供的用例或使用Coreflux和DigitalOcean实现你自己的用例。你还可以在DigitalOcean Droplet市场或通过Coreflux网站获取免费的Coreflux MQTT代理。使用托管数据库部署 Coreflux MQTT 代理
关键要点
你将构建什么
Coreflux 与 DigitalOcean 合作
什么是 MQTT?
关于 Coreflux
数据集成: 轻松与其他 DigitalOcean 服务(如托管数据库 PostgreSQL、MongoDB、MySQL 或 OpenSearch)集成,确保为你的所有数据需求提供一个单一且简单的生态系统。
可扩展性: 轻松处理不断增长的数据和设备数量,而不会影响性能。
可靠性: 确保在所有连接的设备之间进行一致且可靠的消息传递。
准备工作
步骤 1 — 为物联网自动化创建网络基础设施
为安全的 MQTT 通信创建 VPC 网络

步骤 2 — 为可扩展存储设置托管数据库
设置 PostgreSQL 托管数据库

为 MQTT 代理集成配置 PostgreSQL 数据库访问


测试 PostgreSQL 数据库连接

创建 PostgreSQL 应用程序数据库和用户(可选)
设置 MySQL 托管数据库

为 MQTT 代理集成配置 MySQL 数据库访问


测试 MySQL 数据库连接

创建 MySQL 应用程序数据库和用户(可选)
设置 MongoDB 托管数据库

为 MQTT 代理集成配置 MongoDB 数据库访问


测试 MongoDB 数据库连接
mongodb://username:password@mongodb-host:27017/defaultauthdb?ssl=true
创建 MongoDB 应用程序数据库和用户(可选)
设置 OpenSearch 托管数据库

为 MQTT 代理集成配置 OpenSearch 数据库访问


测试 OpenSearch 数据库连接

步骤 3 — 在 DigitalOcean Droplet 上部署 Coreflux MQTT 代理
创建 DigitalOcean Droplet



替代方案 - 在Docker镜像Droplet上使用Docker安装Coreflux MQTT代理
ssh root@your-droplet-ip
docker run -d \
--name coreflux \
-p 1883:1883 \
-p 1884:1884 \
-p 5000:5000 \
-p 443:443 \
coreflux/coreflux-mqtt-broker-t:1.6.3docker ps
通过使用默认值连接到MQTT代理来验证部署

步骤4 — 为安全的物联网通信配置防火墙规则(可选)
步骤5 — 使用Coreflux的Language of Things设置物联网数据集成
安装LoT Notebook扩展

连接到你的MQTT代理

步骤6 — 通过Actions在MQTT代理中创建数据
生成模拟物联网数据
DEFINE ACTION RANDOMIZEMachineData
ON EVERY 10 SECONDS DO
PUBLISH TOPIC "raw_data/machine1" WITH RANDOM BETWEEN 0 AND 10
PUBLISH TOPIC "raw_data/station2" WITH RANDOM BETWEEN 0 AND 60
步骤7 — 为实时处理创建数据转换模型
使用Language of Things定义数据模型
DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
ADD "energy_wh" WITH (energy * 1000)
ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
ADD "timestamp" WITH TIMESTAMP "UTC"
步骤8 — 为可扩展存储设置数据库集成
PostgreSQL集成
DEFINE ROUTE PostgreSQL_Log WITH TYPE POSTGRESQL
ADD SQL_CONFIG
WITH SERVER "db-postgresql.db.onmyserver.com"
WITH PORT 25060
WITH DATABASE "defaultdb"
WITH USERNAME "doadmin"
WITH PASSWORD "AVNS_pass"
WITH USE_SSL TRUE
WITH TRUST_SERVER_CERTIFICATE FALSE为PostgreSQL数据库存储更新模型
STORE IN "PostgreSQL_Log"
WITH TABLE "MachineProductionData"DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
ADD "energy_wh" WITH (energy * 1000)
ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
ADD "timestamp" WITH TIMESTAMP "UTC"
STORE IN "PostgreSQL_Log"
WITH TABLE "MachineProductionData"MySQL集成
DEFINE ROUTE MySQL_Log WITH TYPE MYSQL
ADD SQL_CONFIG
WITH SERVER "db-mysql.db.onmyserver.com"
WITH PORT 25060
WITH DATABASE "defaultdb"
WITH USERNAME "doadmin"
WITH PASSWORD "AVNS_pass"
WITH USE_SSL TRUE
WITH TRUST_SERVER_CERTIFICATE FALSETRUST_SERVER_CERTIFICATE是否已为你的MySQL版本正确设置 - 某些版本需要TRUE,而其他版本则使用FALSE。为MySQL数据库存储更新模型
STORE IN "MySQL_Log"
WITH TABLE "MachineProductionData"DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
ADD "energy_wh" WITH (energy * 1000)
ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
ADD "timestamp" WITH TIMESTAMP "UTC"
STORE IN "MySQL_Log"
WITH TABLE "MachineProductionData"MongoDB集成
DEFINE ROUTE mongo_route WITH TYPE MONGODB
ADD MONGODB_CONFIG
WITH CONNECTION_STRING "mongodb+srv://<username>:<password>@<cluster-uri>/<database>?tls=true&authSource=admin&replicaSet=<replica-set>"
WITH DATABASE "admin"tls=true和authSource=admin参数。有关MongoDB连接故障排除,请参阅我们关于连接MongoDB的教程。为MongoDB数据库存储更新模型
STORE IN "mongo_route"
WITH TABLE "MachineProductionData"DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
ADD "energy_wh" WITH (energy * 1000)
ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
ADD "timestamp" WITH TIMESTAMP "UTC"
STORE IN "mongo_route"
WITH TABLE "MachineProductionData"OpenSearch集成
DEFINE ROUTE OpenSearch_log WITH TYPE OPENSEARCH
ADD OPENSEARCH_CONFIG
WITH BASE_URL "https://my-opensearch-cluster:9200"
WITH USERNAME "myuser"
WITH PASSWORD "mypassword"
WITH USE_SSL TRUE
WITH IGNORE_CERT_ERRORS FALSEhttps://your-cluster-hostname:9200。对于OpenSearch仪表板访问,请使用数据库集群详细信息中提供的单独的仪表板URL。有关更多详细信息,请参阅我们的OpenSearch快速入门。为OpenSearch数据库存储更新模型
STORE IN "OpenSearch_Log"
WITH TABLE "MachineProductionData"DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
ADD "energy_wh" WITH (energy * 1000)
ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
ADD "timestamp" WITH TIMESTAMP "UTC"
STORE IN "OpenSearch_Log"
WITH TABLE "MachineProductionData"步骤9 — 验证完整的物联网自动化管道
监控实时数据流
验证PostgreSQL存储


验证MongoDB存储

{
"_id": {
"$oid": "68626dc3e8385cbe9a1666c3"
},
"energy": 36,
"energy_wh": 36000,
"production_status": "active",
"production_count": 31,
"stoppage": 0,
"maintenance_alert": false,
"timestamp": "2025-06-30 10:58:11",
"device_name": "station2"
}验证MySQL存储
coreflux-broker-data数据库(或你为数据库指定的名称)MachineProductionData表中存储的记录

验证OpenSearch存储



步骤10 - 扩展你的用例和集成
测试LoT能力
构建分析和可视化
优化和扩展你的物联网基础设施
常见问题解答
1. 如何将Coreflux MQTT代理与托管数据库集成?
STORE IN指令将其附加到Model,这样每个处理后的消息都会被写入你选择的数据库。2. 我能否在不编写自定义集成代码的情况下将MQTT数据直接保存到数据库?
PostgreSQL_Log、MySQL_Log、mongo_route或OpenSearch_Log),然后使用STORE IN "<route_name>" WITH TABLE "MachineProductionData"扩展你的模型。Coreflux处理连接池、重试和错误处理,因此你可以专注于建模主题和转换,而不是样板数据库代码。3. 我应该为MQTT物联网数据存储选择哪种托管数据库?
数据库 最适合 示例用例 PostgreSQL 强一致性、关系模式、复杂的SQL查询 工业传感器网络、事务性事件、需要跨连接数据集的分析 MySQL 关系数据、结构化查询、广泛的兼容性 库存系统、生产指标、传统业务记录 MongoDB 灵活、不断演进的模式;文档存储 具有可变负载的互联设备、具有变化格式的物联网遥测 OpenSearch 全文搜索、分析、仪表板、日志索引 时间序列分析、监控、事件日志、物联网搜索和可视化 4. 这种架构如何处理实时和历史分析?
5. Coreflux和托管数据库之间的连接有多安全?
6. 这个设置是否适用于生产环境物联网部署?
7. 我能否在没有公共互联网访问的情况下,或在混合环境中运行MQTT代理?
8. 在物联网数据中使用MQTT和数据库是否存在任何限制或权衡?
9. 我如何为我的物联网项目在PostgreSQL、MySQL、MongoDB和OpenSearch之间做出选择?
数据库 当...时最佳 典型用例 关键优势 PostgreSQL 你需要复杂的关系查询、强一致性和事务完整性(ACID支持)。 工业传感器网络、将设备数据与生产相关联、需要对连接的数据集进行分析 关系模式、高级SQL、一致性 MySQL 你的工作负载是结构化的,具有广泛的工具和兼容性需求。 库存跟踪、传统业务系统、生产指标 更简单的关系需求、广泛支持 MongoDB 你的设备负载和模式不断演变,或者你希望使用灵活的、基于文档的存储进行快速原型设计。 具有可变格式的物联网遥测、快速开发、半结构化数据 灵活的模式、易于扩展、快速原型设计 OpenSearch 你需要分析、搜索或对大容量的物联网数据(日志、时间序列、事件)进行仪表板展示。 搜索传感器数据、日志分析、可视化、基于关键字/时间的查询 搜索、全文、分析、快速聚合 结论




















































































































































































