标签 消息中间件 下的文章

阿里云 RocketMQ 4.0 介绍

阿里云 RocketMQ 4.0 产品是阿里云早期基于 Apache RocketMQ 构建的分布式消息中间件,主要面向企业级消息传递和异步解耦场景。RocketMQ 4.0 在发布时已具备高吞吐、低延迟、可扩展的核心特性,支持顺序消息、事务消息、定时/延时消息等多种能力,帮助开发者快速实现系统间的可靠通信。相比更高版本,RocketMQ 4.0 在弹性伸缩、可观测性和集成易用性方面能力有限,更多依赖人工运维和监控工具。但通过合理部署与监控,仍能够满足大多数分布式系统的消息传递需求,为业务提供基础的高可用性和可靠性保障。

观测云

观测云是一款专为 IT 工程师打造的全链路可观测产品,它集成了基础设施监控、应用程序性能监控和日志管理,为整个技术栈提供实时可观察性。这款产品能够帮助工程师全面了解端到端的用户体验追踪,了解应用内函数的每一次调用,以及全面监控云时代的基础设施。此外,观测云还具备快速发现系统安全风险的能力,为数字化时代提供安全保障。

采集方法

  1. 登录观测云控制台
  2. 点击【集成】菜单,选择【云账号管理】
  3. 点击【添加云账号】,选择【阿里云】,填写界面所需的信息,如之前已配置过云账号信息,则忽略此步骤
  4. 点击【测试】,测试成功后点击【保存】,如果测试失败,请检查相关配置信息是否正确,并重新测试
  5. 点击【云账号管理】列表上可以看到已添加的云账号,点击相应的云账号,进入详情页
  6. 点击云账号详情页的【集成】按钮,在未安装列表下,找到阿里云 RocketMQ 4.0,点击【安装】按钮,弹出安装界面安装即可。

图片

关键指标

Metric IdMetric NameDimensionsStatisticsUni
ReadyMessages已就绪消息量(Group)account_name,InstanceNameAverage,Maximumcount
ReadyMessagesPerGidTopic已就绪消息量(Group&Topic)account_name,InstanceNameAverage,Maximumcount
ReceiveMessageCountPerGid消费者每分钟接收消息数量(Group)account_name,InstanceNameAverage,Maximumcount/min
ReceiveMessageCountPerGidTopic消费者每分钟接收消息数量(Group&Topic)account_name,InstanceNameAverage,Maximumcount/min
ReceiveMessageCountPerInstance消费者每分钟接收消息数的数量(Instance)account_name,InstanceNameAverage,Maximumcount/min
ReceiveMessageCountPerTopic消费者每分钟接收消息的数量(Topic)account_name,InstanceNameAverage,Maximumcount/min
SendDLQMessageCountPerGid每分钟产生死信消息的数量(Group)account_name,InstanceNameAverage,Maximumcount/min
SendDLQMessageCountPerGidTopic每分钟产生死信消息的数量(Group&Topic)account_name,InstanceNameAverage,Maximumcount/min
SendMessageCountPerInstance生产者每分钟发送消息数量(Instance)account_name,InstanceNameAverage,Maximumcount/min
SendMessageCountPerTopic生产者每分钟发送消息数量(Topic)account_name,InstanceNameAverage,Maximumcount/min
ThrottledReceiveRequestsPerGid每分钟(GroupId)消费被限流次数account_name,InstanceNameAverage,Maximumcounts/min
ThrottledReceiveRequestsPerGidTopic每分钟(GroupId&Topic)消费被限流次数account_name,InstanceNameAverage,Maximumcounts/min
ThrottledReceiveRequestsPerInstance每分钟(Instance)消费被限流次数account_name,InstanceNameAverage,Maximumcounts/min
ThrottledSendRequestsPerInstance每分钟(Instance)发送被限流次数account_name,InstanceNameAverage,Maximumcounts/min
ThrottledSendRequestsPerTopic每分钟(Topic)发送被限流次数account_name,InstanceNameAverage,Maximumcounts/min

场景视图

登录观测云控制台,点击「场景」 -「新建仪表板」,输入 “阿里云 RocketMQ”, 选择 “阿里云 RocketMQ4监控视图”,点击 “确定” 即可添加视图。

图片

图片

监控器(告警)

ReadyMessagesPerGidTopic 消息堆积量异常

简要描述:消息堆积量异常通常表示某个 Group 或 Group&Topic 维度下的待消费消息数持续增加,说明消费者处理速度低于生产速度。这可能会导致消息延迟变大,甚至出现业务处理超时或丢弃风险。及时监控和处理堆积量异常,有助于发现消费性能瓶颈或消费者实例异常,保障消息系统的稳定性与业务的连续性。

图片

ReceiveMessageCountPerGid / PerTopic

简要描述:消费者接收消息速率异常通常表示某个 Group、Topic 或整个实例的消费吞吐量低于预期。这可能源于消费者宕机、线程不足、消费逻辑耗时过长或网络瓶颈。持续的消费速率下降会导致消息堆积增加,从而影响业务的实时性。监控该指标可帮助及时发现和定位消费环节的问题,确保生产与消费之间的速率平衡。

图片

总结

通过将阿里云 RocketMQ 4.0 的监控数据接入观测云,用户可实现更直观的运行监控与异常告警。观测云能够采集并展示消息堆积量、消费速率等关键指标,及时发现消费者性能瓶颈或消息延迟问题。借助智能告警与可视化视图,用户可快速定位异常、优化消费逻辑,从而提升系统稳定性与运维效率。整体而言,该方案帮助企业在传统 RocketMQ 4.0 环境下实现现代化可观测运维。

在 Go 业务开发中,我们经常遇到这样的场景:

  • 环境切换:本地开发用 NATS 或 RabbitMQ 贪图轻快,线上却要接入 Kafka 或 AWS SQS 。
  • 代码耦合:业务逻辑被底层 MQ 的 SDK 对象(如 *rocketmq.Producer)绑定,一旦想换驱动,几乎要重写整个消息发送逻辑。
  • 配置坑多:每个 MQ 的参数设置五花八门,一不小心传错了参数,程序却静默运行,等到上线出事才发现配置没生效。

为了解决这些痛点,我发起了 Unified MQ Broker for Go 项目。它就像是 MQ 领域的 "DBAL"(类似于 SQL 领域的 GORM 或数据库驱动层),让你通过一套 API 就能无缝切换多种消息中间件。

🚀 v0.2.0 重磅更新

经过一段时间的打磨,我们刚刚发布了 v0.2.0 版本。这次更新不只是增加了驱动,更是在“健壮性”和“性能”上做了深度优化:

1. 🛡️ 独创“选项追踪” (Option Tracking)

  • 痛点:如果你给 Kafka 传了一个 SQS 的 DeduplicationID,大部分 SDK 会选择静默忽略。
  • 方案:v0.2.0 引入了审计机制。如果底层适配器没有读取你传入的某个配置项,系统会在连接或发布时发出显式警告。彻底告别因拼写错误或参数误用导致的配置无效。

2. ⚡ 高性能“智能序列化” (Smart Serialization)

  • 优化:针对原始 []bytestring 数据实现了零拷贝路径,跳过冗余的 json.Marshal
  • 战果:压测显示,在高吞吐场景下,序列化性能提升了 5 倍以上(单次操作仅需 ~16ns )。

3. 🏗️ 延迟绑定 (Late Binding)

  • NewBroker 现在仅做静态配置。
  • 真正的网络 IO 、TCP 建连和 SDK 初始化全部推迟到 Connect() 时执行,方便与依赖注入框架(如 Wire )集成。

4. 🌐 全平台支持

目前已完美支持:RocketMQ, Kafka, RabbitMQ, NATS, AWS SQS, GCP Pub/Sub


💻 核心代码预览

无论底层是哪种 MQ ,你的业务代码只需要关心这一套统一逻辑:

import "github.com/qvcloud/broker"

// 切换驱动只需要换一行初始化,业务代码 0 改动
b := rabbitmq.NewBroker(broker.Addrs("amqp://..."))
b.Connect()

// 注入统一的中间件(如 OpenTelemetry 链路追踪)
b.Init(broker.Middleware(otel.Middleware))

// 统一的订阅 API
b.Subscribe("orders.created", func(ctx context.Context, event broker.Event) error {
    fmt.Println("收到订单:", string(event.Message().Body))
    return nil // 返回 nil 自动 Ack ,返回 error 自动 Nack/Retry
})

// 统一的发布 API
b.Publish(context.Background(), "orders.created", &broker.Message{
    Body: []byte(`{"id": 1001}`),
})

传送门

GitHub: https://github.com/qvcloud/broker

  • 核心理念: 接口驱动、高性能、原生支持 OpenTelemetry 。

如果你也深受 MQ 适配之苦,或者想为你的分布式系统寻找一个更规范的通信抽象,欢迎来试用、吐槽或贡献代码!如果你觉得不错,给个 Star 就是最大的支持。 🌟