标签 Kafka 下的文章

摘要:本文整理自阿里采集分析平台工程技术负责人 吴宝国 老师,在 Flink Forward Asia 2025 城市巡回深圳站中的分享。

Tips:关注「公众号」回复 FFA 2025 查看会后资料~

大家好,我是来自阿里集团平台技术部数据技术与产品部的吴宝国。今天非常荣幸能在这里跟大家分享我们在阿里内部大规模落地 Fluss 的一些实践经验。

首先简单介绍一下我们团队。我们团队主要负责集团内部统一的用户行为采集与分析平台,也就是大家常说的 A+ 平台。我们的核心职责是为手淘、钉钉、高德、饿了么等众多集团内应用提供端到端的用户行为数据采集、处理、分析及服务能力。

1.png

在底层,我们构建了覆盖 Web、小程序、APP(包括 Android、iOS、PC、IOT、鸿蒙、VR 等)以及服务端的全场景采集 SDK 矩阵。在此之上,我们不仅采集用户的行为日志(比如点击、曝光、滑动等),还会融合业务数据(如用户标签、商品信息、订单数据等),构建服务于整个集团的流量域数据公共层。最终,我们通过分析产品帮助业务团队洞察用户行为,驱动运营和产品决策,例如提升广告效果、优化用户体验等。

为了支撑这一庞大体系的实时性需求,我们引入了开源流存储系统 Fluss 作为核心的日志数据实时采集通道。接下来,我将从为什么选择 Fluss、如何保障大规模落地稳定性、具体业务实践案例以及未来规划四个方面展开分享。

一、为什么选择 Fluss?——解决两大核心痛点

在引入 Fluss 之前,我们的实时数据架构长期面临两个根本性挑战。

(1)成本高昂:行式消息队列导致资源浪费严重

2.png

我们过去主要依赖阿里内部的行式消息队列 TT(TimeTunnel)。以手淘的实时流量公共层为例,这张表包含了首页、闪购、搜索等多个业务的数据。每个下游业务(比如推荐系统)都需要一个独立的 Flink 作业来消费这张全量表,然后在作业内进行过滤,只保留自己关心的部分。

这种模式带来了三重成本问题:

  • 存储与流量成本倍增:计费通常基于读写流量。即使每个业务只关心 1% 的数据,也需要为 100% 的全量数据付费。如果有 N 个业务,就要支付 N 倍的费用。
  • Flink CU 资源浪费:Flink 作业需要消耗大量计算单元(CU)来读取、反序列化并丢弃无用的数据。很多时候,作业空跑不做任何逻辑处理,但依然产生高昂开销。
  • 字段冗余读取:一张表可能包含数百个字段,但单个业务往往只需要其中几个。行式存储迫使消费者读取整行数据,造成巨大的 IO 和网络带宽浪费。

Fluss 通过其三大核心能力完美解决了上述问题:

  • 多级分区(Multi-level Partitioning):支持按业务、按场景等维度对数据进行精细划分。
  • 过滤下推(Filter Pushdown):消费者可以在订阅时声明过滤条件,数据在源头即可被精确过滤,避免全量拉取。
  • 列式存储(Columnar Storage):允许消费者只读取所需的字段,极大降低数据消费量和 Flink CU 消耗。

(2)湖流割裂:Lambda 架构的运维与一致性困境

3.png

业界经典的 Lambda 架构虽然能同时提供实时和离线视图,但维护两套独立的批处理和流处理链路,带来了开发、运维成本高企以及数据统计口径不一致等问题。

随着数据湖技术(如 Paimon、Hudi)的发展,湖仓一体架构成为主流,但它通常只能提供分钟级的数据新鲜度。对于搜索、推荐等要求秒级延迟的核心场景,我们仍需引入 Kafka 这类流式中间件,这实际上又回到了 Lambda 架构的老路,导致“湖”与“流”的割裂。

4.png

Fluss 的出现为我们提供了一个统一的解决方案:它既能作为高性能的流存储提供秒级数据新鲜度,又能通过其内置的分层存储(Tiering)能力无缝对接数据湖(如阿里内部的 Alake),真正实现了“湖流一体”,消除了双架构的痛点。

二、首次双11落地情况:大规模生产验证

2025 年的双 11 是 Fluss 在阿里集团的首次大促实战。目前,Fluss 已稳定服务于淘天(含通天塔、阿里妈妈等)、集团数据公共层、饿了么、淘宝闪购、高德、阿里影业等多个核心业务,核心场景主要集中在搜索、推荐、流量等。

5.png

在本次双十一期间,Fluss 展现了强大的承载能力:

  • 数据量:4 PB/天
  • TPS峰值:1 亿
  • BPS峰值:100 GiB/s

这些数据充分证明了 Fluss 在大规模、高并发场景下的稳定性和可靠性。

三、集群部署架构

阿里集团内部的业务特点与云上有所不同,因此我们的部署架构也进行了针对性设计。

我们采用了“大集群 + 区域化部署”的模式。不同地域(如张北、上海)拥有独立的 Fluss 集群,而同一地域内的不同业务(如高德、钉钉、淘天)则通过数据库(DB)级别进行逻辑隔离。数据持久化在阿里自研的分布式文件系统 盘古 上,并通过 Tiering Service 同步至内部数据湖 Alake

6.png

此架构的优势在于:

  • 资源复用:多个业务共享一个大集群,提高资源利用率。
  • 版本收敛:集群数量少,便于统一升级和管理。
  • 运维集约:减少运维复杂度。

但也带来挑战:

  • 运维压力:单一集群机器数量庞大,运维难度增加。
  • 资源隔离:需要额外机制保障不同业务间的资源隔离。

为此,我们开发了独立的 Fluss Manager 来管理账号权限和集群配置,并在 VVP(Fluss 专有空间)中独立部署 Tiering Service(Flink Job),确保其稳定运行。

为了保障如此大规模集群的稳定运行,我们在多个方面进行了深度建设。

(1) 机架感知(Rack Awareness)

为防止物理机或机架故障导致数据丢失,我们实现了严格的副本放置策略。

7.png

  • 机架感知前:三个副本可能分配在同一台物理机上的三个 Pod 上。一旦该物理机故障,将导致三副本数据丢失!
  • 机架感知后:三副本规避策略,不允许分配在同机房-同机架-同物理机上。即使一台物理机故障,仍有两副本工作,保障数据安全。

(2) 监控告警体系

8.png

我们建立了覆盖全栈的立体化监控告警体系:

  • 基础设施监控:包括物理机性能(磁盘容量、读写IO、网络流量、CPU、内存)和 Pod 性能。
  • 服务端监控:监控 CoordinatorServer、Tablet Server 等核心组件的 Metrics 和日志。
  • 远程存储监控:监控 Remote Storage (OSS/Pangu/HDFS) 的 QPS、读写延迟、带宽和容量。
  • 数据湖监控:监控 Alake 的水位、读写情况,防止因数据灌入过载而影响湖仓。
  • 告警服务:基于 Prometheus + SLS 的监控系统,实现及时告警。

四、稳定性建设

(1) 集群扩缩容(Rebalance Feature)

9.png

随着业务增长,集群需要动态扩容。我们实现了 Rebalance 功能:

  1. AdminClient 发起 RebalanceRequest
  2. CoordinatorServer 收到请求后,GoalOptimizer 生成 RebalancePlan
  3. RebalanceExecutor 执行计划,通知 Tablet Server 迁移 Bucket Leader 和 ISR。
  4. 新节点加入后,负载均衡,完成扩容。

(2) 表扩缩容(Bucket Rescale)

10.png

当单表流量增大时,可通过 ALTER TABLE 增加 Bucket 数量。

  1. Client 发起 ALTER TABLE 命令。
  2. Coordinator 计算新增 Bucket 的分布,并更新 Zookeeper 中的 TableAssignment
  3. Coordinator 通知所有 Tablet Server 创建新的 Bucket Replica。
  4. Tablet Server 创建 Replica 并开始接收数据。

注意:客户端需重启以感知新分区,期间消费任务可能有短暂波动。

(3) 无感升级(Controlled Shutdown)

11.png

为保障升级过程对在线作业无明显影响,我们实现了无感升级:

  1. 待下线 Tablet Server 发送 controlledShutdownRequest 给 Coordinator。
  2. Coordinator 执行 

    • 步骤1:重选 Leader(新 Leader 上线)。
    • 步骤2:下线 Follower。
    • 步骤3:关闭其他资源。
  3. 整个过程保证读写延迟波动小于 1 分钟,Leader 持续在线。
  • K8s 侧支持:支持灰度升级、滚动升级和原地升级(kill pod 并秒级拉起),提升升级效率。

(4) Coordinator HA

12.png

Coordinator 是集群的“大脑”。我们为其构建了高可用架构:

  • 主备选举:通过 Zookeeper 实现主备选举。
  • 状态同步:副节点持续监听 ZK 节点变化,保持 CoordinatorContext 一致。
  • 故障恢复:主节点宕机后,副节点自动选举为新主节点,并从 ZK 恢复上下文信息,确保元数据连续性。

(5) 压缩率与网络传输优化

13.png

为应对大规模集群的网络带宽瓶颈,我们集成了 ZSTD 列压缩算法。

  • 实测效果:在淘系数据上,开启 ZSTD 后,存储空间下降 6 倍(8.88TB → 1.52TB)。
  • 性能影响:写吞吐略有提升(3.33M/s → 3.51M/s),读吞吐基本持平(3.06M/s → 3.25M/s),CPU/内存开销可控。

(6) 上线前故障演练计划

14.png

上线前,我们执行了详尽的故障演练计划,模拟极端场景:

  • CoordinatorServer:随机宕机、反复切换 leader、大量建表和分区。
  • TableServer:随机宕机、Remote 存储堆积、Bucket 的 Replica 宕机。
  • Client:读写流量压测、一致性测试、冷数据追数据延迟测试。
  • 其他:网络拥塞、磁盘挂掉、Zookeeper 故障等。

通过这些演练,全面验证了系统的健壮性、容错能力和数据一致性。

五、湖流一体:统一架构的演进

15.png

在湖流一体这块,我们会直接从 Fluss Manager 发起“湖流一体表”的创建操作。创建完成后,会使用 Fluss 的生产账号(而不是业务自己的账号),在 Paimon 中为业务直接创建一张对应的 Paimon 表。

这张 Paimon 表与 Fluss 中的表在命名上完全一致,包括 Namespace 和 DB 名称都保持统一。这样一来,业务在 Paimon 侧可以给这张表打上“湖流一体表”的标记,在 Fluss 侧也能看到它是“湖流一体表”,对业务来说是一张“看起来统一”的表,但在底层实际上是两张独立的物理表。

数据同步方面,我们通过 Tailing Service 集群配合内部 Flink 集群,由生产账号将 Fluss 中的数据以分钟级或秒级的粒度同步到 Paimon。与此同时,在 Tailing Service 上做了一系列 Native 级别的优化,使得整体性能相较于通用的 Flink 接入方式(Flink Native)会更好一些。

六、业务实践案例与核心收益

Fluss 的落地为多个业务场景带来了显著收益,下面我将逐一介绍。

(1)淘宝数据平台:实时数仓重构

截屏2026-01-20 15.30.18.png

  • 原架构:依赖行式消息队列(TT)和离线数仓(MaxCompute/ODPS),数据新鲜度在小时级。
  • 新架构:采用 Fluss + Paimon 湖仓架构,数据新鲜度提升至秒级。
  • 收益

    • 替代行式消息队列,整体成本降低 40% 以上
    • 基于 Fluss 的列更新特性,离线/实时数据回刷时只需更新变更字段,回刷成本大幅降低
    • 简化了数据链路,下游 OLAP 引擎(如 StarRocks)可直接查询 Paimon 表。

(2)淘宝闪购:实时监控与加工

截屏2026-01-20 15.30.28.png

将流量实时 DWD 公共层写入 Fluss,并通过 Tiering Service 持久化到 Paimon。此架构既保障了秒级时效性,又支持高效的 OLAP 分析,真正实现了实时监控,产出效率远超旧版基于物化视图定时调度的方案。

(3)通天塔(AB实验平台):降本增效

截屏2026-01-20 15.30.35.png

  • 痛点:行式存储导致整行消费,资源消耗高(曝光表 44 个字段,平台仅需 13 个);数据探查困难;大 State 作业运维复杂、不稳定。
  • 方案:利用 Fluss 的列裁剪能力,结合 Paimon 存储和 StarRocks 查询。
  • 收益:读 Fluss 的 Flink 作业 CPU 占用减少 59%,内存占用减少 73%,IO 减少 20%。同时,通过 KV 表的 Merge 引擎和 Delta Join 技术,解耦了作业与状态,提升了灵活性。

(4)A+ 采集分析平台:全链路优化

截屏2026-01-20 15.30.42.png

在流量公共层应用 Fluss 的多级分区能力,显著降低了下游消费的数据量,使得下游 Flink CU 消耗降低约 35%,全链路成本降低约 70%

七、未来规划

展望未来,我们将从以下方向持续投入:

截屏2026-01-20 15.31.01.png

  1. 扩大服务规模:将 Fluss 服务推广至更多集团业务,巩固其作为统一实时数据通道的地位。
  2. 全面推进湖流一体:深化 Fluss 与 Paimon/Alake 的集成,打造更成熟、易用的湖流一体解决方案。
  3. 追求更高性能:持续优化 Fluss 内核,在吞吐、延迟、资源利用率等方面达到业界领先水平。
  4. 探索新场景:构建业界领先的 Agent 采集与评测一体化平台,为 AI Agent 在代码、电商、数据等场景的效果评估与优化提供数据基石。

🔥 阿里云流存储 Fluss 于 2026 年 1 月 13 日 正式开启免费公测

基于 Apache Fluss 打造的高性能列式流存储系统,具备毫秒级读写响应、实时数据更新及部分字段更新能力,可替换 Kafka 构建 面向分析的流式存储,结合 DLF(Paimon)等数据湖产品构建 湖流一体架构

🎁 公测活动: 公测期间单用户可 免费使用2个集群,单个集群上限80 Core,如果您在使用过程中向我们提出改进建议或评测报告,我们将依据反馈内容的深度与质量,向优质测评者 赠送定制Fluss周边礼品

流存储Fluss版公测说明:https://help.aliyun.com/zh/flink/realtime-fluss/product-overv...

复制链接或扫描下方二维码:https://survey.aliyun.com/apps/zhiliao/G-2wQFAuV

image

image


更多内容


活动推荐

复制下方链接或者扫描左边二维码

即可免费试用阿里云 Serverless Flink,体验新一代实时计算平台的强大能力!

了解试用详情:https://free.aliyun.com/?productCode=sc

已经在虚拟机部署好Apache DolphinScheduler了,想尝试下在Flink新建一个Flink节点,然后用Flink消费Kafka数据。

Apache DolphinScheduler用的是单机部署,具体操作可以参考官方文档:DolphinScheduler | 文档中心(https://dolphinscheduler.apache.org/zh-cn/docs/3.3.2/guide/in...).

  • 前置条件:已经安装Java 11、DolphinScheduler 3.3.2、Flink 1.18.1、Kafka 3.6.0,Zookeeper用Kafka内置的。建议这些安装都下载二进制的安装包到虚拟机安装,用命令安装的不可控,我下载的二进制包如下:

配置好Flink的环境变量

1、编辑环境变量:

sudo vim ~/.bashrc

增加Flink的路径

2、使环境变量生效:

#使环境变量生效
source ~/.bashrc
#查看环境变量
echo $Flink_HOME

修改Kafka、Flink以及DolphinScheduler的配置文件

因为用的是虚拟机,为了让外面的主机能够访问到虚拟机的网络,需要修改下配置文件

  1. 修改Kafka配置:找到Kafka安装包下的config文件夹,修改config下的server.properties文件,修改listeners是为了外面的主机能够访问到虚拟机的Kafka,还有把advertised.listeners改成虚拟机地址,写样例的时候能连上虚拟机的Kafka地址,不然默认连localhost
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
#192.168.146.132修改成虚拟机ip
advertised.listeners=PLAINTEXT://192.168.146.132:9092

  1. 修改Flink配置:找到Flink安装包下的conf文件夹,修改conf下的Flink-conf.yaml文件,把里面所有的localhost地址全部改成0.0.0.0,以便主机能访问到虚拟机的Flink。还有增加jobmanager和taskmanager的内存
jobmanager.rpc.address: 0.0.0.0
jobmanager.bind-host: 0.0.0.0
jobmanager.cpu.cores: 1
jobmanager.memory.process.size: 1600m
taskmanager.bind-host: 0.0.0.0
taskmanager.host: 0.0.0.0
taskmanager.memory.process.size: 2048m
taskmanager.cpu.cores: 1

  1. 修改Apache DolphinScheduler的配置文件,从Apache DolphinScheduler的启动脚本文件dolphinscheduler-daemon.sh可以看出,配置环境变量用的是bin/env文件夹下的dolphinscheduler_env.sh

查看dolphinscheduler-daemon.sh文件:

修改dolphinscheduler_env.sh文件,新增JAVA、Flink路径:

#修改成自己的JAVA、Flink路径
export JAVA_HOME=/data/jdk-11.0.29
export Flink_HOME=/data/Flink-1.18.1

关闭防火墙,启动应用

启动应用,包括Zookeeper、Kafka、Flink以及Apache DolphinScheduler。

#关闭防火墙
sudo systemctl stop firewalld
 
# 在 Flink 根目录下,执行以下命令启动 Flink 集群
bin/start-cluster.sh
 
# 启动 ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
 
# 启动 Kafka 服务器
bin/Kafka-server-start.sh config/server.properties &
 
#创建 Kafka 主题
bin/Kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
 
#使用命令行生产者发送消息
bin/Kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
 
#消费
bin/Kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

# 启动 Standalone Server 服务
bash ./bin/dolphinscheduler-daemon.sh start standalone-server

测试

测试Flink、Apache DolphinScheduler是否能访问成功。

  1. Flink访问地址:http://localhost:8081/,localhost改成自己虚拟机地址

  1. Apache DolphinScheduler访问地址:http://localhost:12345/dolphinscheduler/ui ,localhost改成自己虚拟机地址即可登录系统 UI。默认的用户名和密码是 admin/dolphinscheduler123

编写样例

用Flink消费Kafka数据,然后打包上传到Apache DolphinScheduler,启动Flink任务:

  1. 编写样例:

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.example</groupId>
    <artifactId>Flink-Kafka-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <Flink.version>1.18.1</Flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <Kafka.version>3.6.0</Kafka.version>
    </properties>
 
    <dependencies>
        <!-- Flink核心依赖 -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-java</artifactId>
            <version>${Flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-streaming-java</artifactId>
            <version>${Flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-clients</artifactId>
            <version>${Flink.version}</version>
        </dependency>
 
        <!-- 连接器基础依赖 -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-connector-base</artifactId>
            <version>${Flink.version}</version>
        </dependency>
 
        <!-- Kafka连接器(关键修改点) -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-connector-Kafka</artifactId>
            <version>3.1.0-1.18</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Kafka</groupId>
            <artifactId>Kafka-clients</artifactId>
            <version>${Kafka.version}</version>
        </dependency>
 
        <!-- 日志依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.36</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>
 
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>https://maven.aliyun.com/repository/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>apache-releases</id>
            <url>https://repository.apache.org/content/repositories/releases/</url>
        </repository>
    </repositories>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.Flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

FlinkKafkaConsumerExample.java

import org.apache.Flink.api.common.functions.FlatMapFunction;
import org.apache.Flink.api.java.tuple.Tuple2;
import org.apache.Flink.api.java.utils.ParameterTool;
import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.Flink.streaming.api.datastream.DataStream;
import org.apache.Flink.streaming.api.functions.ProcessFunction;
import org.apache.Flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.Flink.util.Collector;
import org.apache.Flink.streaming.connectors.Kafka.FlinkKafkaConsumer;
import org.apache.Flink.api.common.serialization.SimpleStringSchema;
import org.apache.Kafka.clients.consumer.ConsumerConfig;
import org.apache.Kafka.common.serialization.StringDeserializer;
 
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
 
 
public class FlinkKafkaConsumerExample {
    private static volatile int messageCount = 0;
    private static volatile boolean shouldStop = false;
    public static void main(String[] args) throws Exception {
        // 设置执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        // Kafka 配置
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092"); // Kafka broker 地址
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消费者组
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
        // 创建 Kafka 消费者
        FlinkKafkaConsumer<String> KafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        KafkaConsumer.setStartFromEarliest(); // 从最早的消息开始消费
        DataStream<String> stream = env.addSource(KafkaConsumer);
 
        // 处理数据:分词和计数
        DataStream<Tuple2<String, Integer>> counts = stream
                .flatMap(new Tokenizer())
                .keyBy(value -> value.f0)
                .sum(1);
 
 
        counts.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {
            @Override
            public void invoke(Tuple2<String, Integer> value, Context context) {
                System.out.println(value);
                messageCount++;
 
                // 检查是否达到停止条件
                if (messageCount >= 2 && !shouldStop) {
                    System.out.println("Processed 2 messages, stopping job.");
                    shouldStop = true; // 设置标志位,表示应该停止
                }
            }
        });
 
        // 执行作业并获取 JobClient
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                // 启动作业并获取 JobClient
                org.apache.Flink.core.execution.JobClient jobClient = env.executeAsync("Flink Kafka WordCount");
                System.out.println("Job ID: " + jobClient.getJobID());
 
                // 监测条件并取消作业
                while (!shouldStop) {
                    Thread.sleep(100); // 每100毫秒检查一次
                }
 
                // 达到停止条件时取消作业
                if (shouldStop) {
                    System.out.println("Cancelling the job...");
                    jobClient.cancel().get(); // 取消作业
                }
 
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
 
        // 在主线程中等待作业结束
        future.join(); // 等待作业完成
    }
 
    // Tokenizer 类用于将输入字符串转化为单词
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
 
}
  1. 打包上传到Apache DolphinScheduler

  1. 新建Flink节点,并启动

在Apache DolphinScheduler的任务实例看启动日志:

在虚拟机启动生产者,输出字符串,然后可以在Flink查看输出Kafka生产的消息:

原文链接:https://blog.csdn.net/Analyze_ing/article/details/156940553

从Kafka到AutoMQ:爱奇艺实时流数据架构演进

概述

本文详细介绍了爱奇艺在处理大规模实时流数据时,从传统Kafka架构向AutoMQ演进的技术历程。为了解决私有云环境下集群扩缩容难、资源利用率低以及运维成本高等挑战,爱奇艺开发了Stream平台与Stream-SDK,实现了业务与底层存储的彻底解耦。随后,公司引入公有云服务并最终切换至基于存算分离架构的AutoMQ,利用其单副本存储和秒级弹性的特性,显著提升了系统的灵活性。这一系列的架构升级不仅优化了数据治理体系,还成功将运营成本降低了70%以上。目前,爱奇艺正持续扩大AutoMQ的应用规模,以进一步实现降本增效的长期目标。

背景

Kafka因其高吞吐、低延时、可扩展的特性,在出现之后迅速成为流数据存储的标准组件,广泛应用于实时大数据场景。爱奇艺的流数据服务也主要基于Kafka构建,随着实时大数据应用越来越广泛,Kafka集群数量、规模越来越大,面临扩缩容繁琐、成本高、难治理等诸多问题与挑战。为解决这些问题,我们进行了Kafka服务化、上云、迁移AutoMQ等一系列探索。

本文将介绍爱奇艺Kafka从私有云迈向公有云、从Kafka到AutoMQ的探索与实践。

流数据在爱奇艺的应用

图1 数据通路

在爱奇艺,流数据的存储组件使用的是Kafka,计算组件主要使用的是Flink,流数据相关的典型数据通路如图1所示,主要包括如下环节:

  • 数据集成:Pingback(端上投递日志)、后端日志、数据库binlog、指标等持续产生的流数据,实时写入数据总线Kafka。
  • 数据仓库:由Flink程序将数据引入到实时(流式)、离线(批式)数仓。在实时数仓中,数据仍然以流数据形态存储在Kafka中,并通过Flink构建实时数仓各层数据。在离线数仓中,流数据将会聚集成批数据存储在Iceberg中,再由 Flink增量消费Iceberg构建离线数仓各层数据。实时数仓具备秒级延时,离线数仓具备分钟级以上延时。
  • 数据开发:数仓的数据通过数据开发平台应用到各业务场景。在实时计算中Kafka也会作为中间流数据的存储用于任务之间的解耦。
  • 数据应用:数据广泛应用到爱奇艺的推荐、搜索、广告、报表等等场景中。数据的价值随着延时增大快速衰减,为了数据价值最大化,近几年主要应用场景都已切换到流数据。

Kafka作为流数据的存储承担数据集成到大数据体系的数据总线、实时数仓存储、实时任务之间解耦等角色。

流数据存储服务:从管集群到管数据

爱奇艺的流数据服务最初以Kafka集群为核心构建,提供集群生命周期管理、Topic管理、消费监控等基础能力。随着业务规模扩大、集群数量和数据量持续增长,逐渐暴露出以下问题:

  1. 业务与集群强耦合:业务代码直接依赖Kafka地址访问集群,一旦需要迁移或调整集群,必须修改业务代码并重新上线,不灵活。同时也无法从平台侧统一识别和监控各业务的读写行为。
  2. 缺乏统一的数据与schema管理:平台没有管理数据描述、schema、数据归属等元数据信息,无法提供数据查找功能,不利于跨团队的数据理解、复用与治理。
  3. 主备数据管理缺失:对重要数据,业务侧通常配置主备链路,但平台侧缺乏对主备关系的统一管理,难以做到一致性保障与故障切换治理。

为了解决上述问题,我们将流数据存储服务升级到了如图2所示的架构,由Stream平台、Stream-SDK、存储组件三部分构成。

图2 流数据服务架构

先介绍下Stream平台,Stream-SDK和存储组件后面介绍。Stream平台由“集群管理”和“数据管理”两大模块组成。集群管理负责集群生命周期与底层资源的统一管理,侧重运维侧能力。数据管理是平台的核心,以“数据为中心”构建,面向数据开发人员提供统一的数据视图和管理能力,核心功能如下:

  1. 逻辑队列:原先“集群+Topic”定位数据的方式,升级为基于“项目+队列(Topic)”的逻辑命名方式,集群仅作为队列的一个属性,消除业务对具体集群的依赖。逻辑队列还支持同时绑定主备两个集群,结合Stream-SDK可实现主备链路的一键切换。
  2. Schema管理:支持为队列配置schema,并自动同步至大数据元数据中心,使队列能够在数据开发平台中自动映射为逻辑表,使用SQL直接处理流数据。
  3. 数据地图:提供队列的多维度查询与检索能力,支持在线申请和授权使用队列,简化跨团队的数据查找和复用流程。
  4. 数据血缘:基于Stream-SDK自动上报的读写端信息,构建应用级的读写血缘链路,帮助快速定位上下游数据关系及影响范围。

Stream-SDK:统一的流数据读写客户端

Stream-SDK是平台提供的统一数据访问客户端,封装了底层原生客户端,兼容Kafka协议和RocketMQ。业务仅需配置“项目+队列”,即可完成数据读写,无需关注具体集群地址或认证方式,从而实现业务代码与底层集群的彻底解耦。

图 3 Stream SDK 读写数据过程

Stream-SDK的数据读写流程如图3所示,主要包括两个阶段:

  1. 配置获取与上报

基于业务提供的项目、队列和Token(用于鉴权),SDK调用Stream平台的配置API,获取队列对应的集群信息、Topic、认证参数等配置,并使用原生客户端执行读写。同时,SDK会通过该API上报客户端IP、消费组、应用名称等信息,平台据此实时构建读写血缘。

  1. 集群变更感知与自动切换

在运行期间,SDK每分钟与Stream平台进行心跳交互,实时感知队列关联的集群是否发生变更。一旦检测到变化,SDK会自动将读写流量切换至新集群,实现无感迁移。

借助Stream-SDK,集群的迁移成本大幅降低,也为后续从私有云迈向公有云、从Kafka切换到AutoMQ的架构演变做好了准备。

Kafka混合多云建设

早期爱奇艺Kafka集群部署在私有云IDC,受制于IDC资源供给模式及Kafka架构固有特性,资源利用率难以保持在合理区间。自2023年起,平台逐步引入多家公有云Kafka,形成混合云架构,在资源弹性、运维效率和成本优化方面取得了显著成效。下文将介绍下上云过程。

私有云Kafka

![
图4 Kafka 架构](https://image.automq.com/20260126bot/atub35.png)

Kafka架构如图4所示,是经典的多副本容错分布式架构,由Broker和Zookeeper两类角色组成:Broker负责数据存储与客户端读写,Zookeeper负责管理集群的元数据与协作状态。在私有云中,Kafka部署在爱奇艺各IDC,其中Zookeeper通常以虚机部署,Broker则根据场景选择虚机或物理机。

私有云模式支撑了公司流数据规模的快速增长,但随着业务体量持续扩大,也逐渐暴露出以下问题:

  1. 集群弹性差:Kafka的Shared Nothing架构虽然简单可靠,但每个Broker上都存储大量数据,导致扩容或缩容时必须在Broker间进行大规模数据迁移。迁移过程耗时长且会影响业务任务的读写性能,使得集群难以实现平滑弹性伸缩。
  2. 资源弹性不足:私有云的物理资源从采购到报废周期较长,难以随业务流量动态变化而快速调整,导致集群资源利用率长期处于“过高或过低”的状态。同时,对于寒暑假、重点直播等短时流量高峰,也难以做到按需扩缩,影响系统整体资源效率与成本优化。

从私有云Kafka到公有云Kafka

为实现降本增效并提升流数据存储的灵活性,我们引入并上线了公有云Kafka产品。

公有云Kafka产品遵循Kafka协议,通过在Stream平台与Stream-SDK中进行统一适配,为业务侧提供一致、无差异的使用体验,实现了私有云与公有云之间统一接入和平滑切换。

借助公有云庞大的资源池和按需创建集群的能力,解决了私有云环境下资源弹性不足的问题,取得20%以上的降本效果。

从Kafka到AutoMQ

公有云Kafka虽然解决了资源弹性不足的问题,但是依然有集群弹性差的问题。新出现的AutoMQ支持秒级弹性吸引了我们的注意。

图 5 AutoMQ 架构

AutoMQ采用存算分离架构,如图所示,具备如下特性:

  1. 共享存储:数据统一存储在对象存储中,Broker不再持有本地数据。为解决对象存储延迟高、IOPS较低的问题AutoMQ引入块存储作为WAL(Write-Ahead Log),数据先写入WAL再进行批量落盘到对象存储。
  2. 单副本存储:云端的块存储和对象存储本身具备多副本特性,已在存储层保证了高可用,因此AutoMQ内部的Topic均采用单副本策略,避免传统Kafka中Broker之间的副本同步开销,大幅降低成本与数据复制压力。
  3. 兼容Kafka协议:AutoMQ基于开源Kafka改造,保留计算层逻辑,替换底层存储实现,完全兼容Kafka协议。
  4. 快速弹性:由于Broker不再存储数据,节点可快速启动或销毁,实现分钟级弹性;同时对象存储按量计费,使资源规模能够与业务流量保持高度匹配,避免资源浪费。

在完成相关性能与稳定性验证后,我们在公有云环境部署了AutoMQ,并将其纳入流数据服务存储体系。通过Stream平台逐步将私有云Kafka、公有云Kafka迁移至AutoMQ,成本进一步降低70%以上。

总结及规划

流数据因其低延时特性,已成为爱奇艺的重要数据通路。随着规模增长,传统私有云Kafka在弹性、成本与治理上逐渐遇到瓶颈,因此,流数据存储架构从“管集群”转向“管数据”,并通过Stream平台与Stream-SDK实现解耦与统一治理。随后引入公有云Kafka和AutoMQ,使系统在弹性、运维效率和成本上都实现了显著提升。

爱奇艺目前约40%的流量已迁移到公有云Kafka或AutoMQ,其中一半是AutoMQ,下一步将继续扩大AutoMQ的使用规模,并探索AutoMQ的自适应自动弹性机制,持续降本。

很多团队在业务发展到一定阶段后,都会认真评估一次:
用户行为分析系统,是继续用现成产品,还是自己搭一套?

实际上,当企业需要埋点分析时,往往已经没有太多时间成本可投入
业务方希望尽快看到数据结果,管理层关注投入产出比,而完全从零自建埋点系统,周期长、风险高、不可控。
因此,基于成熟开源方案快速上线,再按需求自己二开,是目前更常见、也更可控的一种选择。

这篇文章不讨论“埋点的重要性”,只做一件事:
以自建埋点分析系统为参照,给出一个成本参考,并对比基于 ClkLog 开源方案的实际投入。

完全自建一套埋点分析系统成本通常在几十万,且建设周期长、不可控因素多。
基于ClkLog开源方案搭建首期成本可控制在几万最快一周完成部署集成,可以快速交付使用,并具备持续扩展的能力。

一、自建埋点分析系统,通常需要哪些模块?
很多团队低估了“自建”的工作量,下面只列最基础、不可回避的部分
1.数据采集层(SDK + 埋点规范)
这个阶段往往被低估,但实际上 SDK 会长期伴随业务演进,需要持续维护。
2.数据接入与处理层
核心目标是稳定接住数据:常见技术栈包括接入服务 + Kafka / MQ。
3.数据存储层
通常会选择 ClickHouse / Doris / Druid 这类分析型数据库,同时需要设计分区、冷热数据策略。
4.复杂分析计算
这是自建中最耗精力的部分,很多团队会发现:统计不难,难的是保证分析口径正确且性能可用。
5.管理后台与可视化
这部分前端和交互成本往往被严重低估。
6.运维与长期维护
系统上线只是开始,后续还包括各项调优、异常排查等运维工作。

二、为什么很多团队不会选择「完全自建」?
问题不在“能不能做”,而在是否划算
●早期业务验证阶段,数据系统很难直接创造业务价值
●自建系统容错成本高,试错周期长
因此,越来越多团队会选择:
在成熟的开源埋点分析系统基础上建设,而不是从零开始。

三、ClkLog开源方案能解决什么问题?
ClkLog提供了一套可直接落地的开源埋点分析方案,不依赖第三方SaaS服务。全面覆盖了埋点系统中最重、最复杂的核心能力:

1.数据采集层
支持神策SDK与自研鸿蒙SDK
2.数据接受层
进行日志数据接收与存储
3.数据处理层
进行数据处理、归档等服务
4.数据存储层
使用clickhouse进行大量数据查询
5.数据可视化
内置多种成熟分析模型,开箱即用

企业无需从零搭建底层能力,只需要围绕自身业务场景完成部署、运维和少量定制,即可形成一套可用的自有埋点分析系统。

四、基于 ClkLog,企业实际需要投入哪些成本?
1. 基础运行环境(参考)
以 ClkLog 社区版为例,在 1万日活应用规模下,采用Docker方式部署,单台服务器即可满足基础使用需求。
推荐配置参考:8核CPU;32GB内存
在常见的云厂商环境中,约1-2万/年,即可覆盖服务器、云盘、流量、备份等成本。

2. 软件部署与集成
●获取ClkLog代码(Github/Gitee)
●自行部署ClkLog服务(docker部署最快10分钟完成)
●接入埋点SDK(兼容web/小程序/iOS/安卓等)
●常规运维数据库和服务
整体实施周期短,最快一天即可完成部署并交付使用。

3. 业务层面的工作
●埋点规范梳理
●事件与指标定义
●少量业务定制
ClkLog已经内置十几种行业标准分析模型,可供业务直接开箱使用。若还有更多定制业务需要分析,可以通过自定义事件或二次开发来实现,与完全自建相比,省去的是部门团队沟通、大量底层系统设计与长期维护成本。

五、写在最后
对于大多数团队来说,先把系统跑起来、用起来、产生价值,比一开始追求完美更重要。
如果团队希望:
●完全掌控数据
●又不想长期投入基础设施研发
●把精力更多放在业务分析而不是系统本身
那么,基于成熟开源方案搭建自己的埋点分析系统,是一个性价比较高、风险更可控的选择


前言

本节详细聊一下基于envoy的可观测性

日志

首先是日志,配置日志的方式也很简单

static_resources:
  listeners:
    - name: ingress_listener
      address:
        socket_address:
          address: 0.0.0.0
          port_value: 10000
      filter_chains:
        - filters:
            - name: envoy.filters.network.http_connection_manager
              typed_config:
                "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
                stat_prefix: ingress_http
                ...
                access_log:
                - name: envoy.access_loggers.stdout
                  typed_config:
                    "@type": type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StdoutAccessLog
                    log_format:
                      text_format: "[%START_TIME%] \"%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%\" %RESPONSE_CODE% %BYTES_SENT% %DURATION% %REQ(X-REQUEST-ID)% \"%REQ(USER-AGENT)%\" \"%REQ(X-FORWARDED-FOR)%\" %UPSTREAM_HOST% %UPSTREAM_CLUSTER% %RESPONSE_FLAGS%\n"
  • 该配置是将日志输出在控制台,也可以直接输出为文件,然后通过工具采集走path: /var/log/envoy/access.log
  • 也可以直接将日志输出至kafka,并且按比例采集、只采集4xx、5xx等都可以配置,这里就不在赘述了

admin管理页面

envoy有默认的admin页面,方便查看统计信息、打开某些功能的开关等

admin:
  address:
    socket_address:
      address: 0.0.0.0
      port_value: 9901

打开9901页面:

watermarked-envoy_ob_1.png

可以查看相关的统计信息、也可以打开某些开关,功能还是很丰富的

merics接入prometheus

打开了admin之后,就默认提供了相关的prometheus stats http://10.105.148.194:9901/stats/prometheus

这时只需在k8s集群外弄一个prometheus,并且采集该envoy即可

prometheus.yml

global:
  scrape_interval: 5s
  evaluation_interval: 5s

rule_files:
  - /etc/prometheus/*.rules

scrape_configs:
  - job_name: 'prometheus'
    static_configs:
    - targets: ['localhost:9090']

  - job_name: "envoy"
    metrics_path: /stats/prometheus
    static_configs:
    - targets: ["10.105.148.194:9901"]
docker run -d --name prometheus \
  -p 9090:9090 \
  -v ./prometheus.yml:/etc/prometheus/prometheus.yml \
  -v /usr/share/zoneinfo/Asia/Shanghai:/etc/localtime \
  registry.cn-beijing.aliyuncs.com/wilsonchai/prometheus:v3.5.0

traces接入jaeger

jaeger的安装可以参考这里: opentelemetry全链路初探--埋点与jaeger

jaeger启动之后,改造一下envoy的配置,这里要特别注意,不同版本的配置不一样,我这里envoy的版本是:v1.32

static_resources:
  listeners:
    - name: ingress_listener
      filter_chains:
        - filters:
            - name: envoy.filters.network.http_connection_manager
              typed_config:
                ...

                tracing:

                  provider:
                    name: envoy.tracers.opentelemetry
                    typed_config:
                      "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig
                      service_name: envoy-proxy
                      grpc_service:
                        envoy_grpc:
                          cluster_name: jaeger_otlp_collector
                ...

  clusters:
    ...
    - name: jaeger_otlp_collector
      type: LOGICAL_DNS
      connect_timeout: 5s
      lb_policy: ROUND_ROBIN
      http2_protocol_options: {}

      load_assignment:
        cluster_name: jaeger_otlp_collector
        endpoints:
        - lb_endpoints:
          - endpoint:
              address:
                socket_address:
                  address: 10.22.12.178
                  port_value: 4317
    ...

修改完成之后重启下envoy

jaeger成功接收到了来自envoy的trace

watermarked-envoy_ob_2.png
watermarked-envoy_ob_3.png

由于只在envoy配置了trace,没有和后端服务联动,所有只显示了envoy这一段的trace信息,如果要联动后端,可以参考这个系列的文章: 全链路监控配置

小结

至此,logs、metrics、traces三大可观测的指标建设完成,envoy可观测性的建设也结束了

联系我

  • 联系我,做深入的交流

 title=


至此,本文结束
在下才疏学浅,有撒汤漏水的,请各位不吝赐教...

大家好,我是《交易学徒》的作者。

简单介绍下背景:我现在的核心身份是带两个孩子的全职奶爸,副业才是趁着孩子睡着后,在键盘上敲敲打打的独立开发者。

对于我这种“碎片化时间”开发者来说,运维复杂度就是最大的敌人。

几年前写后端,我也迷信“标准答案”:做个服务,起手就是 Docker 编排,Redis 做缓存,Kafka 做解耦,微服务先分几个出来。结果往往是,功能没写几个,光是调网络、修连接超时、查中间件报错,就把孩子午睡的那宝贵两小时耗光了(那时候还没孩子)。

在开发后端时,我陷入了深思:

“对于一个追求极致性能、但只有一个人维护的系统,所谓的‘工业级架构’真的是解药吗?还是毒药?”

最终,我做了一个违背祖宗的决定:做减法。 我删除了 Redis ,移除了 Kafka ,把整个微服务集群塌缩成了一个 Rust 单体应用。

今天想聊聊这背后的思考过程。

一、 复杂度的守恒与转移
我的业务场景看似简单,实则牵一发而动全身。一个简单的“用户平仓”动作,就像推倒了第一块多米诺骨牌:

核心域:结算盈亏,改余额,写数据库。(必须马上做)

通知域:给前端发个弹窗通知“平仓成功”。(晚 0.1 秒没关系)

营销域:判断有没有触发“五连胜”、“以小博大”成就,发奖励。(晚 1 秒没关系)

统计域:计算交易评分,统计分数或者更新等级与交易报表。(晚几秒都行)

在“标准架构”里,我们需要引入 消息队列 (MQ) 来解耦这些逻辑。 但引入 MQ 本质上并没有消除复杂度,只是将“代码复杂度”转移成了“运维复杂度”。

对于团队,运维复杂度可以分摊给同事;但对于我,这意味着我不仅要写代码,还得修服务器。

Rust 给了我另一个选择:利用它极高的性能,把“运维复杂度”重新压回“架构设计”里,用最朴素的方式解决问题。

二、 内存即总线:构建“喊一嗓子”的架构
我利用 Rust 的内存通道特性,构建了一个“超光速大喇叭”。 我不请求数据,我只发布事实。

这个过程,可以用一个生活化的场景来描述:

  1. 定义世界的真相 (The Truth)
    我不写复杂的 XML 或 JSON 定义,我只是在代码里列了一张“事实清单”:

📄 事实 A:有人平仓了(包含:是谁、赚了多少、单号是多少)

📄 事实 B:有人购买商品了

📄 事实 C:AI 分析完成了

编译器会盯着这张清单,保证我发出的每一个“事实”都是格式正确、童叟无欺的。

  1. 极简的生产者 (Fire and Forget)
    在核心交易逻辑里,当数据库事务提交成功后,我只需要做一件事:拿着大喇叭喊一嗓子。

传统架构 (Kafka) 是这样的:

交易模块 -> 打包数据 -> 建立 TCP 连接 -> 三次握手 -> 发送给 Kafka 集群 -> 等待 ACK -> 结束 (这中间任何一步网络抖动,都得处理异常)

我的单体架构是这样的:

交易模块 -> 喊:“老王平仓赚了 100 块!” -> 结束 (纯内存操作,纳秒级完成,快到像是没有发生过)

  1. 静默的消费者 (Sidequest Logic)
    我把原本分散在微服务里的逻辑,变成了几个坐在角落里的“隐形工人”。

比如 “营销服务”,它就像一个在角落里旁听的记分员:

它平时不说话,只听大喇叭。

一听到 “老王平仓赚了 100 块”,它立马翻开小本本查历史。

发现老王已经连赢 4 把了,加上这把正好 5 把。

于是它默默地给老王发了一个“五连绝世”的徽章。

整个过程,核心交易模块完全不知情,也完全不用等待,它喊完那一嗓子就去服务下一个用户了。

三、 深度思考:关于“不可靠”的权衡
很多朋友可能会问:“没有 Kafka 把消息存到硬盘里,万一服务器断电了,你喊的那一嗓子不就丢了吗?”

是的,这是整个架构思考中最痛苦,也是最关键的取舍。 我问了自己两个问题:

Q1:我的程序崩溃概率有多大? Rust 以安全著称,只要代码写得不离谱,它极难崩溃( Panic )。这比 Java 的内存溢出或 Python 的运行时错误要稳健得多。

Q2:丢失数据的代价是什么?

我们可以把数据分成两类:

💰 钱(核心数据): 余额、订单状态。

处理方式: 必须落袋为安。 直接写死在数据库里,绝不依赖“大喇叭”。

🎁 气氛(衍生数据): 弹窗通知、成就徽章、达标奖励、统计报表。

处理方式: 听天由命。 如果真的赶上万年不遇的服务器着火,用户少收到了一个“五连胜”的弹窗,或者报表少统计了一笔,天会塌吗?不会。

结论: 为了 0.001% 的极端掉电风险,去让 99.99% 的时间里的系统背负沉重的中间件包袱,对于独立开发者来说,这是一笔亏本买卖。

四、 结语
当我们谈论“高性能”时,往往想到的是复杂的集群、昂贵的服务器。 但 Simple is fast. (简单即快)

现在的《交易学徒》后端,就是一个 20MB 的小文件。

❌ 没有 Docker 容器编排

❌ 没有 虚拟机调优

❌ 没有 Redis 维护

❌ 没有 服务间通讯

✅ 只有一个跑在单机上的进程,CPU 占用极低,响应速度极快。

这省下来的不仅仅是每年的服务器费用,更是我作为父亲陪伴孩子的宝贵时间。

技术服务于生活,这大概就是独立开发的魅力吧。

关于《交易学徒》
这是我用这套“极简架构”打磨的作品,前端是 Flutter ,后端 Rust 。 希望能给交易员朋友们提供一个干净、流畅、无延迟的练习环境。

官网: https://www.zgjiazu.top

Google Play: https://play.google.com/store/apps/details?id=com.zengkai.jyxtclient

欢迎 V 友们指正。如果你的孩子也吵着要抱抱,那我们就是异父异母的亲兄弟了。😄

这两天技术圈里热议的一件事就是Amazon的流媒体平台Prime Video在2023年3月22日发布了一篇技术博客《规模化Prime Video的音视频监控服务,成本降低90%》,副标题:“从分布式微服务架构到单体应用程序的转变有助于实现更高的规模、弹性和降低成本”,有人把这篇文章在五一期间转到了reddithacker news 上,在Reddit上热议。这种话题与业内推崇的微服务架构形成了鲜明的对比。从“微服务架构”转“单体架构”,还是Amazon干的,这个话题足够劲爆。然后DHH在刚喷完Typescript后继续发文《即便是亚马逊也无法理解Servless或微服务》,继续抨击微服务架构,于是,瞬间引爆技术圈,登上技术圈热搜。

今天上午有好几个朋友在微信里转了三篇文章给我,如下所示:

看看这些标题就知道这些文章要的是流量而不是好好写篇文章。看到第二篇,你还真当 Prime Video 就是 Amazon 的全部么?然后,再看看这些文章后面的跟风评论,我觉得有 80%的人只看标题,而且是连原文都不看的。所以,我想我得写篇文章了……

原文解读

要认清这个问题首先是要认认真真读一读原文,Amazon Prime Video 技术团队的这篇文章并不难读,也没有太多的技术细节,但核心意思如下:

1)这个系统是一个监控系统,用于监控数据千条用户的点播视频流。主要是监控整个视频流运作的质量和效果(比如:视频损坏或是音频不同步等问题),这个监控主要是处理视频帧,所以,他们有一个微服务主要是用来把视频拆分成帧,并临时存在 S3 上,就是下图中的 Media Conversion 服务。

2)为了快速搭建系统,Prime Video团队使用了Serverless 架构,也就是著名的 AWS Lambda 和 AWS Step Functions。前置 Lambda 用来做用户请求的网关,Step Function 用来做监控(探测器),有问题后,就发 SNS 上,Step Function 从 S3 获取 Media Conversion 的数据,然后把运行结果再汇总给一个后置的 Lambda ,并存在 S3 上。

整个架构看上去非常简单 ,一点也不复杂,而且使用了 Serverless 的架构,一点服务器的影子都看不见。实话实说,这样的开发不香吗?我觉得很香啊,方便快捷,完全不理那些无聊的基础设施,直接把代码转成服务,然后用 AWS 的 Lamda + Step Function + SNS + S3 分分钟就搭出一个有模有样的监控系统了,哪里不好了?!

但是他们遇到了一个比较大的问题,就是 AWS Step Function 的伸缩问题,从文章中我看到了两个问题(注意前方高能):

  1. 需要很多很多的并发的 AWS Step Function ,于是达到了帐户的 hard limit。
  2. AWS Step Function 按状态转换收费,所以,贵得受不了了。

注意,这里有两个关键点:1)帐户对 Step Function 有限制,2)Step Function 太贵了用不起

然后,Prime Video 的团队开始解决问题,下面是解决的手段:

1) 把 Media Conversion  和 Step Function 全部写在一个程序里,Media Conversion 跟 Step Function 里的东西通过内存通信,不再走S3了。结果汇总到一个线程中,然后写到 S3.

2)把上面这个单体架构进行分布式部署,还是用之前的 AWS Lambda 来做入门调度。

EC2 的水平扩展没有限制,而且你想买多少 CPU/MEM 的机器由你说了算,而这些视频转码,监控分析的功能感觉就不复杂,本来就应该写在一起,这么做不更香吗?当然更香,比前面的 Serverless 的确更香,因为如下的几个原因:

  1. 不再受 Step Function 的限制了,技术在自己手里,有更大的自由度。
  2. 没有昂贵的 Step Function 云成本的确变得更低了,如果你把 Lambda 换成 Nginx 或 Spring Gateway 或是我司的 Easegress,你把 S3 换成 MinIO,你把 SNS 换成 Kafka,你的成本 还能再低。

独立思考

好了,原文解读完了,你有自己的独立思考了吗?下面是我的独立思考,供你参考:

1)AWS 的 Serverless 也好, 微服务也好,单体也好,在合适的场景也都很香。这就跟汽车一样,跑车,货车,越野车各有各的场景,你用跑车拉货,还是用货车泡妞都不是一个很好的决定。

2)这篇文章中的这个例子中的业务太过简单了,本来就是一两个服务就可以干完的事。就是一个转码加分析的事,要分开的话,就两个微服务就好了(一个转码一个分析),做成流式的。如果不想分,合在一起也没问题了,这个粒度是微服务没毛病。微服务的划分有好些原则,我这里只罗列几个比较重要的原则:

  • 边界上下文。微服务的粒度不能大于领域驱动里的 Bounded Context(具体是什么 大家自行 Google),也就是一个业务域。
  • 单一职责,高内聚,低耦合。把因为相同原因变化的合在一起(内聚),把不同原因变化的分开(解耦)
  • 事务和一致性。对于两个重度依赖的功能,需要完成一个事务和要保证强一致性的,最好不要拆开,要放在一起。
  • 跟组织架构匹配。把同一个团队的东西放在一起,不同团队的分开。

3)Prime Video 遇到的问题不是技术问题,而是 AWS  Step Function 处理能力不足,而且收费还很贵的问题。这个是 AWS 的产品问题,不是技术问题。或者说,这个是Prime Video滥用了Step Function的问题(本来这种大量的数据分析处理就不适合Step Function)。所以,大家不要用一个产品问题来得到微服务架构有问题的结论,这个没有因果关系。试问,如果 Step Funciton 可以无限扩展,性能也很好,而且白菜价,那么 Prime Video 团队还会有动力改成单体吗?他们不会反过来吹爆 Serverless 吗?

4)Prime Video 跟 AWS 是两个独立核算的公司,就像 Amazon 的电商和 AWS 一样,也是两个公司。Amazon 的电商和 AWS 对服务化或是微服务架构的理解和运维,我个人认为这个世界上再也找不到另外一家公司了,包括 Google 或 Microsoft。你有空可以看看本站以前的这篇文章《Steve Yegg对Amazon和Google平台的吐槽》你会了解的更多。

5)Prime Video 这个案例本质上是“下云”,下了 AWS Serverless 的云。云上的成本就是高,一个是费用问题,另一个是被锁定的问题。Prime Video 团队应该很庆幸这个监控系统并不复杂,重写起来也很快,所以,可以很快使用一个更传统的“服务化”+“云计算”的分布式架构,不然,就得像 DHH 那样咬牙下云——《Why We’re Leaving the Cloud》(他们的 SRE 的这篇博文 Our Cloud Spend in 2022说明了下云的困难和节约了多少成本)

后记

最后让我做个我自己的广告。我在过去几年的创业中,帮助了很多公司解决了这些 分布式,微服务,云原生以及云计算成本的问题,如果你也有类似问题。欢迎,跟我联系:[email protected]

另外,我们今年发布了一个平台 MegaEase Cloud,就是想让用户在不失去云计算体验的同时,通过自建高可用基础架构的方式来获得更低的成本(至少降 50%的云计算成本)。目前可以降低成本的方式:

  1. 基础软件:通过开源软件自建,
  2. 内容分发:MinIO + Cloudflare 的免费 CDN,
  3. 马上准备发布的直接与底层IDC合作的廉价GPU计算资源…

欢迎大家试用。

如何访问

注:这两个区完全独立,帐号不互通。因为网络的不可抗力,千万不要跨区使用。

产品演示

介绍文章

 

小T导读:京能集团在储能安全管理平台中采用 TDengine TSDB 作为底层时序数据库。依托 TDengine 企业版的零代码数据写入平台,来自全国 28 家电化学储能电站的数据能够按照统一编码规则高效接入 TDengine 时序数据库中,实现了稳定、高性能的数据采集与管理。在此基础上,借助 TDengine TSDB Flink Connector,系统可快速、稳定地从数据库中读取海量数据,开展实时分析与智能处理,充分释放数据的潜在价值。本文将结合该项目的实践过程,为大家带来深入分享与参考。

项目背景

京能集团储能安全管理平台共接入全国 28 家电化学储能电站,累计测点达 270 万个,由四个平台公司分别负责数据传输与汇聚。系统需要支撑大规模的数据统计分析、事件报警与安全预警,对底层数据库的性能与稳定性提出了极高要求。

鉴于电化学储能项目采集点数量庞大(270 万点)、锂电池热失控的超前预警技术复杂等因素,传统关系型数据库已无法满足高并发写入与海量数据存储的需求。由于这些数据具备时间序列写入、格式固定、写入量巨大等典型特征,我们最终选择采用时序数据库作为系统核心数据底座。

应用实际落地

在充分调研国内多款时序数据库产品后,我们发现,从国内目前的实际情况分析,TDengine TSDB 已成为众多企业在海量数据高速存储、处理与调用场景中的首选方案。基于其成熟的技术体系与稳定的性能表现,我们最终选定 TDengine TSDB 作为平台的底层时序数据库,并结合 Kafka 与 Flink 构建了完整的数据流处理体系,实现了数据的高效传输与实时计算,顺利达成项目预期目标。以下是架构简图:

TDengine TSDB 支持多种写入方式

  1. SQL 语言写入 :https://docs.taosdata.com/basic/insert/
  2. 无模式写入:https://docs.taosdata.com/develop/schemaless/
  3. 参数绑定方式:https://docs.taosdata.com/develop/stmt/
  4. 企业版的零代码数据写入— taosExplorer 数据接入功能:https://docs.taosdata.com/advanced/data-in/

项目中涉及多个 Kafka 集群、数十个需要接入的 topic。我们重点采用了 TDengine 企业版的零代码数据写入能力,实现了从 Kafka 到 TDengine TSDB 的高效对接。该功能支持灵活配置类似 ETL 的复杂自定义选项,极大简化了数据接入流程和时间,而且数据接入性能完全达到了项目要求。

为了保证数据的合理性,我们出台了《京能集团电化学储能电站安全管理平台和储能电站设备标识编码规则》,通过标准的 kks 编码在 taosX 对 Kafka 数据进行了有效过滤和清理,最终写入 TDengine TSDB。kks 部分编码实例如下:

下图为数据过滤、转换等规则设置:

此外,taosX 数据接入还支持多节点高可用配置。只需在多台 taosX 上部署相同的 Kafka 数据接入任务,并设置相同的 groupId,即可自动实现任务高可用,确保数据接入的连续性与稳定性。

同时,TDengine 还提供完善的 taosX 任务监控机制,可直接通过 Grafana 一键配置,快速生成可视化监控图表:

超级表 + 子表的使用

TDengine TSDB 结合“一个数据采集点一张表”的设计理念,引入了具有创新性的“超级表”机制,从根本上解决了大规模时序数据结构不统一、聚合困难、运维复杂等问题。每个采集点的数据独立存储,天然具备写入无锁、数据顺序追加、块状连续存储等优势。这种设计方式不仅提升了写入与查询性能,还带来了极高的数据压缩效率。

TDengine TSDB 支持对超级表标签进行动态的添加、修改与删除操作,满足设备属性变更、系统扩展等业务需求。

计算、分析处理

在 Flink 计算平台上,我们借助 TDengine TSDB 企业版提供的 Flink 连接器——TDengine TSDB Flink Connector(https://docs.taosdata.com/advanced/data-publisher/Flink/),实现了与 TDengine TSDB 的无缝集成。该连接器可高效、稳定地从 TDengine TSDB 中读取海量时序数据,并在此基础上进行全面、深入的分析处理,充分挖掘数据的潜在价值,极大地提升数据处理的效率和质量。

Flink CDC 主要用于提供数据订阅功能,能实时监控 TDengine TSDB 数据库的数据变化,并将这些变更以数据流形式传输到 Flink 中进行处理,同时确保数据的一致性和完整性。

落地效果

  1. 数据接入便利性:目前我们已接入 20 多个 kafka 数据,后期还会继续增加。得益于 TDengine 企业版零代码数据接入能力,新增任务仅需复制并做少量参数调整即可完成,操作简便高效,整体接入过程较传统方式节省约 90% 的时间成本
  2. 数据查询性能高:开启数据库缓存功能后,能够实时获取每个设备点位最新值,毫秒级别即可返回结果
  3. 数据存储成本低:TDengine TSDB 具备出色的数据压缩能力,其二级压缩技术将数据视作无差别的二进制块进行再次压缩。与一级压缩相比,二级压缩的侧重点在于消除数据块之间的信息冗余。目前我们提供的服务器存储远远满足我们项目规划的 5 年数据存储,存储成本估算节省至少 60-70%
  4. 实时订阅:通过 TDengine 提供的 Flink CDC 实时订阅功能,能方便、高效的进行分析、告警等处理,给我们后期分析带来了极大的便利性。

后期规划

目前,我们正在对京能集团储能安全管理平台已经接入的 28 场站数据进行分析和优化,提高数据采集的可靠性和鲁棒性。未来我们会针对 TDengine TSDB 新版本和新功能进行持续跟踪,进一步开发 TDengine TSDB 的内在潜力和各种有效的功能。

近期我们关注到 TDengine 发布了新产品 TDengine IDMP,通过经典的树状层次结构组织传感器、设备采集的数据,建立数据目录,对数据提供情境化、标准化的处理,并提供实时分析、可视化等功能,接下来我们会进一步了解此产品在我们业务中的使用可能。

关于京能集团

北京能源集团有限责任公司是北京市人民政府出资设立的国有独资公司,肩负着保障首都北京能源安全可靠供应的重任。京能集团成立于 2004 年,由原北京国际电力开发投资公司和原北京市综合投资公司合并而成,2011 年、2014 年先后又与北京市热力集团有限责任公司、北京京煤集团有限责任公司实施合并重组,实现了产业链条融合互补。经过多年的资源整合,集团由单一能源产业发展为热力、电力、煤炭、健康文旅等多业态产业格局。2024 年在中国企业 500 强排名第 247 位,中国服务企业 500 强排名第 87 位。

作者:张海增

在 Go 业务开发中,我们经常遇到这样的场景:

  • 环境切换:本地开发用 NATS 或 RabbitMQ 贪图轻快,线上却要接入 Kafka 或 AWS SQS 。
  • 代码耦合:业务逻辑被底层 MQ 的 SDK 对象(如 *rocketmq.Producer)绑定,一旦想换驱动,几乎要重写整个消息发送逻辑。
  • 配置坑多:每个 MQ 的参数设置五花八门,一不小心传错了参数,程序却静默运行,等到上线出事才发现配置没生效。

为了解决这些痛点,我发起了 Unified MQ Broker for Go 项目。它就像是 MQ 领域的 "DBAL"(类似于 SQL 领域的 GORM 或数据库驱动层),让你通过一套 API 就能无缝切换多种消息中间件。

🚀 v0.2.0 重磅更新

经过一段时间的打磨,我们刚刚发布了 v0.2.0 版本。这次更新不只是增加了驱动,更是在“健壮性”和“性能”上做了深度优化:

1. 🛡️ 独创“选项追踪” (Option Tracking)

  • 痛点:如果你给 Kafka 传了一个 SQS 的 DeduplicationID,大部分 SDK 会选择静默忽略。
  • 方案:v0.2.0 引入了审计机制。如果底层适配器没有读取你传入的某个配置项,系统会在连接或发布时发出显式警告。彻底告别因拼写错误或参数误用导致的配置无效。

2. ⚡ 高性能“智能序列化” (Smart Serialization)

  • 优化:针对原始 []bytestring 数据实现了零拷贝路径,跳过冗余的 json.Marshal
  • 战果:压测显示,在高吞吐场景下,序列化性能提升了 5 倍以上(单次操作仅需 ~16ns )。

3. 🏗️ 延迟绑定 (Late Binding)

  • NewBroker 现在仅做静态配置。
  • 真正的网络 IO 、TCP 建连和 SDK 初始化全部推迟到 Connect() 时执行,方便与依赖注入框架(如 Wire )集成。

4. 🌐 全平台支持

目前已完美支持:RocketMQ, Kafka, RabbitMQ, NATS, AWS SQS, GCP Pub/Sub


💻 核心代码预览

无论底层是哪种 MQ ,你的业务代码只需要关心这一套统一逻辑:

import "github.com/qvcloud/broker"

// 切换驱动只需要换一行初始化,业务代码 0 改动
b := rabbitmq.NewBroker(broker.Addrs("amqp://..."))
b.Connect()

// 注入统一的中间件(如 OpenTelemetry 链路追踪)
b.Init(broker.Middleware(otel.Middleware))

// 统一的订阅 API
b.Subscribe("orders.created", func(ctx context.Context, event broker.Event) error {
    fmt.Println("收到订单:", string(event.Message().Body))
    return nil // 返回 nil 自动 Ack ,返回 error 自动 Nack/Retry
})

// 统一的发布 API
b.Publish(context.Background(), "orders.created", &broker.Message{
    Body: []byte(`{"id": 1001}`),
})

传送门

GitHub: https://github.com/qvcloud/broker

  • 核心理念: 接口驱动、高性能、原生支持 OpenTelemetry 。

如果你也深受 MQ 适配之苦,或者想为你的分布式系统寻找一个更规范的通信抽象,欢迎来试用、吐槽或贡献代码!如果你觉得不错,给个 Star 就是最大的支持。 🌟