标签 Flink 下的文章

在新零售不断演进的今天,用户走进瑞幸,要的不只是咖啡,更是一种“被真正理解”的体验——口味、习惯、场景,甚至那一刻的心情。作为国内领先的连锁咖啡品牌,瑞幸咖啡正从数字化迈入智能化新阶段,以人工智能技术驱动“人、货、场”核心业务平台的智能化重构,构建面向未来的智慧商业决策体系。

为此,瑞幸与阿里云深度共创,基于阿里云人工智能平台PAI,共同打造了一套真正“以用户为中心”的端到端智能推荐系统。它不再依赖静态规则,而是通过理解用户的偏好和需求,为用户提供贴心的咖啡选择建议——无论是清晨提神的美式,还是周末慵懒的生椰拿铁,当你唤醒AI Lucky,“为你而选”的新品、优惠与搭配,就会送到你眼前,让每一次选择都更轻松、更安心。

过去依赖人工规则的推荐方式,难以动态响应用户变化;如今,借助双方联合打磨的数据链路、算法模型与运营机制,瑞幸不仅实现了推荐精准度的跃升,也让用户获得更流畅、更贴心的服务体验。

这次合作,是瑞幸AI能力体系进化的重要一步,更是与阿里云“一起设计、一起验证、一起交付、一起沉淀”的技术共创典范——不是单方面交付产品,而是共同构建面向未来的智能零售能力。

从规则驱动到 AI 驱动:赋能增长新范式

截至2025年第三季度,瑞幸咖啡已建立起覆盖全国超2.9万家门店的庞大网络。随着用户规模持续扩大,瑞幸咖啡也在不断探索新技术增加对于客户的理解,为客户提供更加灵活的服务,促进增长的发展。

为探索AI赋能增长新范式,瑞幸致力于构建一套具备高精度、可迭代、可扩展能力的AI推荐系统。经过多轮技术评估与方案论证,瑞幸最终与阿里云大数据AI平台合作共创,采用MaxCompute+DataWorks+Flink+Hologres+PAI技术架构,以PAI-Rec作为其新一代推荐引擎,依托其强大的大数据、算法能力与全链路服务支持,开启提升用户推荐体验的智能化升级。

与客户共创,助力客户能力沉淀

此次合作不仅是技术产品的落地,更是一次深度的“技术共创”实践。阿里云技术团队与瑞幸技术团队紧密协作,全程参与需求分析、POC验证、系统上线与效果优化。

在项目推进过程中,阿里云技术团队与瑞幸团队高效协同,高质量完成多组对比实验,并组织多次技术交流,协助瑞幸系统性地沉淀了数据处理规范、特征工程方法、模型调优策略及测试体系,为其后续自主迭代与业务扩展打下坚实基础。

展望未来:AI 驱动“更懂你的咖啡”

本次合作不仅为瑞幸带来了显著的业务升级,也为阿里云人工智能平台PAI在零售行业树立了标杆案例。

“本次与阿里云合作的AI智能推荐场景,提供的不仅是一个工具、一个解决方案,更是一次双方共创合作经验的落地。”瑞幸技术负责人表示,“从POC到全量上线,阿里云团队展现了极强的技术实力与服务意识。我们相信,AI将成为瑞幸持续领跑行业的重要引擎。”

一杯咖啡的背后,是海量数据的流转与AI模型的精准计算。随着推荐系统的持续优化,瑞幸咖啡的运营模式实现“更智能、更个性、更高效”。未来,双方还将探索大模型在用户意图理解、生成式推荐、跨场景联动等方向的创新应用,进一步释放AI在消费场景中的潜力。

瑞幸咖啡 x 阿里云大数据AI平台的合作,不仅是一次技术升级,更是AI赋能实体经济的生动实践。在智能化浪潮中,AI将成为您的专属咖啡助手——从海量风味中,AI推荐为您探索意想不到的惊喜之选,让咖啡更懂你。

阿里云 AI 推荐方案:打造端到端智能推荐引擎

下面将重点介绍阿里云AI推荐方案在该场景中的技术亮点与应用优势。

阿里云AI推荐方案是面向企业级场景的全托管推荐算法服务平台,深度融合阿里巴巴在电商、本地生活等高并发、高实时性场景下的推荐实践经验,提供从数据处理、特征工程、模型训练、测试验证到在线服务的一站式解决方案。

在本次合作中,阿里云为瑞幸咖啡量身打造了覆盖“数据 → 模型 → 服务 → 迭代”的完整推荐链路:

  • 端到端系统搭建:基于全托管架构的阿里云大数据AI平台,搭建实时推荐全链路,快速构建从数据采集、实时特征计算、深度学习模型训练到在线推理的全流程系统,实现毫秒级响应的AI推荐服务。
  • 精准转化率提升:通过引入深度CTR/CVR预估模型、多目标优化(MMOE)及序列建模(如DIEN),显著提升推荐内容的相关性与转化效率。经测试验证,最终转化率较原有规则系统提升明显
  • 全托管运维,释放技术负担:依托人工智能平台PAI的自动化运维与弹性伸缩能力,瑞幸团队得以从繁重的系统维护中解放,聚焦核心业务创新,大幅降低AI落地门槛。

阿里云智能推荐系统解决方案

阿里云为企业开发者提供全链路深度定制的推荐系统解决方案。方案涵盖了离线处理、在线服务、实时数据流和工程架构等多个维度,包括召回、排序、过滤和重排等功能模块,提供多种数据诊断分析、推荐结果调试和引擎发布管理等工具,通过A/B testing服务和实验报表平台提升推荐系统的迭代效率。

搭建一套智能推荐系统,主要分为四个步骤:数据准备、离线训练、在线服务以及算法迭代。
b2c90afc7fb4401ea91455467a5526dc.png

1. 数据准备

  • 基础埋点与采集:首先需完成用户行为数据的埋点采集,包括曝光、点击、加购、收藏及下单等核心行为。 
  • 基础表构建:进行数据ETL,产出三张核心基础表:用户表(包含属性及偏好标签)、物品表(包含类目、价格等属性)及行为表(记录用户与物品的交互时间及类型)。 
  • 数据智能诊断:对原始数据进行潜在问题分析,评估特征的可用性与覆盖率,确保模型训练的质量。

2. 离线训练

  • 算法定制开发:对召回(如Etrec协同过滤)、粗排、精排(如DBMTL多目标训练)等算法的深度定制。 
  • 特征与样本准备:通过离线调度任务,完成特征抽取与正负样本构造。统一管理离线特征,确保离在线特征的一致性。 
  • 模型训练与调优:模型训练,并利用AutoML进行自动调参,提升模型性能。

3. 在线服务

  • 推荐引擎部署:部署召回和排序模型,处理在线推理请求。
  • 特征实时读取:在线推理时,推荐引擎高性能存储中读取用户和物品特征,并传递给PAI-EAS打分。
  • 联调与测试:上线前进行全链路联调,验证特征一致性,并观察推荐结果是否符合预期业务逻辑。

4. 算法迭代

  • AB实验监控:通过配置AB实验报表实时观察AB实验效果。在实验结束后,进行数据诊断任务以深入分析实验表现。
  • 闭环优化:根据实验结果调整特征和样本,或者调整模型架构后重新训练。
  • 特征自动挖掘:引入 AutoFE(自动特征工程) 技术,利用算法自动挖掘新特征,进一步提升推荐的精准度。

搭建一套智能推荐系统方案,主要依赖的云产品,包括:PAI-Rec、PAI、FeatureStore、MaxCompute+Dataworks等。
99f82c99fb984380a2e17d23d9210218.png

PAI-Rec使用EasyRec训练召回和排序模型,使用PAI-Rec引擎搭建推荐系统;通过 DataWorks 编辑和调度特征工程、样本和模型训练的代码;使用特征数据库FeatureDB存储用户特征、i2i相关物品和向量库;使用PAI-EAS 提供可弹性扩缩容的打分服务。

具体说明如下:

  • 人工智能平台PAI:面向开发者和企业的机器学习/深度学习工程平台,提供包含数据标注、模型构建、模型训练、模型部署、推理优化在内的AI开发全链路服务。
  • EasyRec算法框架:内置业界先进的深度学习模型,支持多种Tensorflow版本(>=1.12, <=2.4, PAI-TF)和 PyTorch 版本,覆盖了推荐全链路的需求,包括召回、粗排、排序、重排、多目标和冷启动等。开发者可基于EasyRec算法框架加速迭代推荐全链路需求。
  • 大数据开发治理平台DataWorks/云原生大数据计算服务MaxCompute:基于云原生的大数据服务,可搭配使用,针对推荐系统中特征处理、样本生成、画像管理、模型调度、数据更新等环节,提供了易用的开发工具和稳定的数据环境。
  • 特征平台管理工具FeatureStore:用于存储和管理离线和在线服务中的特征数据,确保了从离线到在线的特征统一与高效复用。同时,整合了阿里云上DataHub、Flink、Hologres和Tablestore等产品,并且自研了搜索推荐专用的特征数据库FeatureDB,提供特征管理功能。

这套“MaxCompute+DataWorks+Flink+Hologres+PAI”深度融合的技术架构,是面向零售、金融、出行等多行业场景的通用型智能数据中台范本。无论是构建AI驱动的推荐系统,还是实现全域数据资产的价值释放,阿里云Data+AI系列产品都能为企业提供从“数据到智能”的全栈赋能。

未来已来,智能不止于推荐。让每一次交互更懂用户,让每一份数据创造价值——阿里云大数据与AI产品组合,助力企业驶入智能化快车道。

一、背景与痛点

业务场景

在实时大数据处理场景中,Flink + ClickHouse 的组合被广泛应用于:

  • 日志处理: 海量应用日志实时写入分析库。
  • 监控分析: 业务指标、APM 数据的实时聚合。

这些场景的共同特点:

  • 数据量大:百万级 TPS,峰值可达千万级。
  • 写入延迟敏感: 需要秒级可见。
  • 数据准确性要求高:不允许数据丢失。
  • 多表写入: 不同数据根据分表策略写入不同的表。

开源 Flink ClickHouse Sink 的痛点

Flink 官方提供的 ClickHouse Sink(flink-connector-jdbc)在生产环境中存在以下严重问题:

痛点一:缺乏基于数据量的攒批机制

问题表现:

// Flink 官方 JDBC Sink 的实现
public class JdbcSink<T> extends RichSinkFunction<T> {
    private final int batchSize;  // 固定批次大小
    @Override
    public void invoke(T value, Context context) {
        bufferedValues.add(value);
        if (bufferedValues.size() >= batchSize) {
            // 只能基于记录数攒批,无法基于数据量
            flush();
        }
    }

带来的问题:

  1. 内存占用不可控: 100 条 1KB 的日志和 100 条 10MB 的日志占用内存差距 100 倍。
  2. OOM 风险高: 大日志记录(如堆栈转储)会迅速撑爆内存。
  3. 写入性能差: 无法根据记录大小动态调整批次,导致小记录批次过大浪费网络开销。

痛点二:无法支持动态表结构

问题表现:

// Flink 官方 Sink 只能写入固定表
public class JdbcSink {
    private final String sql;  // 固定的 INSERT SQL
    public JdbcSink(String jdbcUrl, String sql, ...) {
        this.sql = sql;  // 硬编码的表结构
    }
}

带来的问题:

  1. 多应用无法隔离: 所有应用的数据写入同一张表,通过特定分表策略区分。
  2. 扩展性差: 新增应用需要手动建表,无法动态路由。
  3. 性能瓶颈: 单表数据量过大(百亿级),查询和写入性能急剧下降。

痛点三:分布式表写入性能问题

问题表现:

// 大多数生产实现直接写入分布式表
INSERT INTO distributed_table_all VALUES (...)

ClickHouse 分布式表的工作原理:

带来的问题:

  1. 网络开销大: 数据需要经过分布式表层转发,延迟增加。
  2. 写入性能差: 分布式表增加了路由和转发逻辑,吞吐量降低。
  3. 热点问题: 所有数据先到分布式表节点,再转发,造成单点瓶颈。

生产级方案的核心改进

针对以上痛点,本方案提供了以下核心改进:

改进一:基于数据量的攒批机制

public class ClickHouseSinkCounter {
    private Long metaSize;  // 累计数据量(字节)
    public void add(LogModel value) {
        this.values.add(value);
        this.metaSize += value.getMetaSize();  // 累加数据量
    }
}
// 触发条件
private boolean flushCondition(String application) {
    return checkMetaSize(application)  // metaSize >= 10000 字节
        || checkTime(application);     // 或超时 30 秒
}

优势:

  • 内存可控: 根据数据量而非记录数攒批。
  • 精确控制: 1KB 的记录攒 10000 条 = 10MB,1MB 的记录攒 10 条 = 10MB。
  • 避免OOM: 大日志记录不会撑爆内存。

改进二:动态表结构与分片策略

public abstract class ClickHouseShardStrategy<T> {
    public abstract String getTableName(T data);
}
//日志侧实现为应用级分表
public class LogClickHouseShardStrategy extends ClickHouseShardStrategy<String> {
    @Override
    public String getTableName(String application) {
        // 动态路由:order-service → tb_logs_order_service
        return String.format("tb_logs_%s", application);
    }
}

优势:

  • 应用隔离: 日志侧内置应用级分表,每个应用独立分表。
  • 动态路由: 根据 application 自动路由到目标表。
  • 扩展性强: 新增应用无需手动建表(配合 ClickHouse 自动建表)。

改进三:本地表写入 + 动态节点发现

public class ClickHouseLocalWriter extends ClickHouseWriter {
    // 直接写本地表,避免分布式表转发
    private final ConcurrentMap<String, HikariDataSource> dataSourceMap;
    @Override
    public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
        // 1. 动态获取集群节点列表
        List<String> healthyHosts = getHealthyHosts(exceptionHosts);
        // 2. 随机选择健康节点
        return dataSourceMap.get(healthyHosts.get(random.nextInt(size)));
    }
}

优势:

  • 性能提升: 直接写本地表,避免网络转发。
  • 高可用: 动态节点发现 + 故障节点剔除。
  • 负载均衡: 随机选择 + Shuffle 初始化。

技术方案概览

基于以上改进,本方案提供了以下核心能力:

  1. 本地表/分布式表写入: 性能优化与高可用平衡。
  2. 分片策略: 按应用维度路由与隔离。
  3. 攒批与内存控制: 双触发机制(数据量 + 超时)。
  4. 流量控制与限流: 有界队列 + 连接池。
  5. 健壮的重试机制: 递归重试 + 故障节点剔除。
  6. Checkpoint 语义保证: At-Least-Once 数据一致性。

二、核心架构设计

架构图

核心组件

核心流程

三、本地表 vs 分布式表写入

ClickHouse 表结构说明

ClickHouse 推荐直接写本地表,原因:

  1. 写入性能: 避免分布式表的网络分发。
  2. 数据一致性: 直接写入目标节点,减少中间环节故障点,比分布式表写入更安全,利于工程化。
  3. 负载均衡: 客户端路由实现负载分散。
-- 本地表(实际存储数据)
CREATE TABLE tb_logs_local ON CLUSTER 'default' (
    application String,
    environment String,
    message String,
    log_time DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(log_time)
ORDER BY (application, log_time);
-- 分布式表(逻辑视图,不存储数据)
CREATE TABLE tb_logs_all ON CLUSTER 'default' AS tb_logs_local
ENGINE = Distributed('default', dw_log, tb_logs_local, cityHash64(application));

HikariCP 连接池配置

// HikariCP 连接池配置
public class ClickHouseDataSourceUtils {
    private static HikariConfig getHikariConfig(DataSourceImpl dataSource) {
        HikariConfig config = new HikariConfig();
        config.setConnectionTimeout(30000L);    // 连接超时 30s
        config.setMaximumPoolSize(20);          // 最大连接数 20
        config.setMinimumIdle(2);               // 最小空闲 2
        config.setDataSource(dataSource);
        return config;
    }
    private static Properties getClickHouseProperties(ClickHouseSinkCommonParams params) {
        Properties props = new Properties();
        props.setProperty("user", params.getUser());
        props.setProperty("password", params.getPassword());
        props.setProperty("database", params.getDatabase());
        props.setProperty("socket_timeout", "180000");      // Socket 超时 3 分钟
        props.setProperty("socket_keepalive", "true");      // 保持连接
        props.setProperty("http_connection_provider", "APACHE_HTTP_CLIENT");
        return props;
    }
}

配置说明:

  • maxPoolSize=20:每个 ClickHouse 节点最多 20 个连接。
  • minIdle=2:保持 2 个空闲连接,避免频繁创建。
  • socket_timeout=180s:Socket 超时 3 分钟,防止长时间查询阻塞。

ClickHouseLocalWriter:动态节点发现

public class ClickHouseLocalWriter extends ClickHouseWriter {
    // 本地节点缓存,按 IP 维护
    private final ConcurrentMap<String, HikariDataSource> dataSourceMap;
    // 动态获取集群本地表节点
    private final ClusterIpsUtils clusterIpsUtils;
    // IP 变更标志(CAS 锁,避免并发更新)
    private static final AtomicBoolean IP_CHANGING = new AtomicBoolean(false);
    @Override
    public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
        // 1️⃣ 检测集群节点变化(通过 CAS 避免并发更新)
        if (clusterIpsChanged() && IP_CHANGING.compareAndSet(false, true)) {
            try {
                ipChanged(); // 动态更新 dataSourceMap
            } finally {
                IP_CHANGING.set(false);
            }
        }
        // 2️⃣ 获取异常节点列表(从 Redis + APM 实时查询)
        Set<String> exceptIps = clusterIpsUtils.getExceptIps();
        exceptIps.addAll(exceptionHosts);
        // 3️⃣ 过滤健康节点,随机选择
        List<String> healthyHosts = dataSourceMap.keySet().stream()
            .filter(host -> !exceptIps.contains(host))
            .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(healthyHosts)) {
            throw new RuntimeException("Can't get datasource from local cache");
        }
        return dataSourceMap.get(healthyHosts.get(random.nextInt(healthyHosts.size())));
    }
    private void ipChanged() {
        List<String> clusterIps = clusterIpsUtils.getClusterIps();
        // 新增节点:自动创建连接池
        clusterIps.forEach(ip ->
            dataSourceMap.computeIfAbsent(ip, v ->
                createHikariDataSource(ip, port)
            )
        );
        // 移除下线节点:关闭连接池
        dataSourceMap.forEach((ip, ds) -> {
            if (!clusterIps.contains(ip)) {
                dataSourceMap.remove(ip);
                ds.close();
            }
        });
    }
}

核心逻辑:

  1. 动态节点发现: 从 system.clusters 查询所有节点。
  2. 自动扩缩容: 节点上线自动加入,下线自动剔除。
  3. 故障节点剔除: 通过 APM 监控,自动剔除异常节点。
  4. 负载均衡: 随机选择健康节点,避免热点。

集群节点动态发现(ClusterIpsUtils)

public class ClusterIpsUtils {
    // 从 system.clusters 查询所有节点
    private static final String QUERY_CLUSTER_IPS =
        "select host_address from system.clusters where cluster = 'default'";
    // LoadingCache:定时刷新节点列表(1 小时)
    private final LoadingCache<String, List<String>> clusterIpsCache =
        CacheBuilder.newBuilder()
            .expireAfterAccess(10, TimeUnit.HOURS)
            .refreshAfterWrite(1, TimeUnit.HOURS)
            .build(CacheLoader.asyncReloading(new CacheLoader<>() {
                @Override
                public List<String> load(String dbName) {
                    return queryClusterIps();  // 定时刷新节点列表
                }
            }));
    // 异常节点缓存(1 分钟刷新)
    private final LoadingCache<String, FlinkExceptIpModel> exceptIpsCache =
        CacheBuilder.newBuilder()
            .refreshAfterWrite(1, TimeUnit.MINUTES)
            .build(CacheLoader.asyncReloading(new CacheLoader<>() {
                @Override
                public FlinkExceptIpModel load(String dbName) {
                    return queryExceptIp();  // 从 Redis + APM 查询异常节点
                }
            }));
}

异常节点监控策略:

  • 磁盘使用率 >= 90%: 从 APM 查询 Prometheus 指标,自动加入黑名单。
  • HTTP 连接数 >= 50: 连接数过多说明节点压力大,自动加入黑名单。
  • 人工配置: 通过 Redis 配置手动剔除节点

数据来源:

  1. ClickHouse system.clusters 表: 获取所有集群节点。
  2. APM Prometheus 接口: 监控节点健康状态。
  3. Redis 缓存: 人工配置的异常节点。

负载均衡优化

public class ClickHouseWriter {
    public <T> ClickHouseWriter(...) {
        // Shuffle:随机打乱数据源顺序
        Collections.shuffle(clickHouseDataSources);
        this.clickHouseDataSources = clickHouseDataSources;
    }
    public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
        // 轮询 + 随机选择(已 shuffle,避免热点)
        int current = this.currentRandom.getAndIncrement();
        if (current >= clickHouseDataSources.size()) {
            this.currentRandom.set(0);
        }
        return clickHouseDataSources.get(currentRandom.get());
    }
}

优势:

  • 初始化时 shuffle,避免所有 writer 同时从第一个节点开始。
  • 轮询 + 随机选择,负载分散更均匀。
  • 故障节点自动剔除。

四、支持分表策略

分片策略抽象

public abstract class ClickHouseShardStrategy<T> {
    private String tableName;      // 表名模板,如 "tb_log_%s"
    private Integer tableCount;    // 分表数量
    // 根据数据决定目标表名
    public abstract String getTableName(T data);
}

日志分片实现

public class LogClickHouseShardStrategy extends ClickHouseShardStrategy<String> {
    @Override
    public String getTableName(String application) {
        // 表名格式:tb_log_{application}
        // 例如:application = "order-service" -> table = "tb_log_order_service"
        return String.format(
            this.getTableName(),
            application.replace("-", "_").toLowerCase()
        );
    }
}

按表(应用)维度的缓冲区

日志侧维度降级为应用名称维度缓冲区,实则因为按照应用分表,

业务方可使用自身分表策略携带表名元数据,进行表维度缓冲。

public class ClickHouseShardSinkBuffer {
    // 按 application 分组的缓冲区(ConcurrentHashMap 保证并发安全)
    private final ConcurrentHashMap<String, ClickHouseSinkCounter> localValues;
    public void put(LogModel value) {
        String application = value.getApplication();
        // 1️⃣ 检查是否需要 flush
        if (flushCondition(application)) {
            addToQueue(application); // 触发写入
        }
        // 2️⃣ 添加到缓冲区(线程安全的 compute 操作)
        localValues.compute(application, (k, v) -> {
            if (v == null) v = new ClickHouseSinkCounter();
            v.add(value);
            return v;
        });
    }
    private void addToQueue(String application) {
        localValues.computeIfPresent(application, (k, v) -> {
            // 深拷贝并清空(避免并发修改异常)
            List<LogModel> deepCopy = v.copyValuesAndClear();
            // 构造请求 Blank:application + targetTable + values
            String targetTable = shardStrategy.getTableName(application);
            ClickHouseRequestBlank blank = new ClickHouseRequestBlank(deepCopy, application, targetTable);
            // 放入队列
            writer.put(blank);
            return v;
        });
    }
}

核心设计:

  • 应用隔离: 每个表(应用)独立的 buffer,互不影响。
  • 线程安全: 使用 ConcurrentHashMap.compute()保证并发安全。
  • 深拷贝: List.copyOf() 创建不可变副本,避免并发修改。
  • 批量清空: 一次性取出所有数据,清空计数器。

五、攒批与内存控制

双触发机制

public class ClickHouseShardSinkBuffer {
    private final int maxFlushBufferSize;  // 最大批次大小(如 10000)
    private final long timeoutMillis;      // 超时时间(如 30s)
    // 触发条件检查(满足任一即触发)
    private boolean flushCondition(String application) {
        return localValues.get(application) != null
            && (checkMetaSize(application) || checkTime(application));
    }
    // 条件1:达到批次大小
    private boolean checkMetaSize(String application) {
        return localValues.get(application).getMetaSize() >= maxFlushBufferSize;
    }
    // 条件2:超时
    private boolean checkTime(String application) {
        long current = System.currentTimeMillis();
        return current - localValues.get(application).getInsertTime() > timeoutMillis;
    }
}

批次大小计算

public class ClickHouseSinkCounter {
    private final List<LogModel> values;
    private Long metaSize; // 累计的 metaSize(字节)
    public void add(LogModel value) {
        this.values.add(value);
        this.metaSize += value.getMetaSize(); // 累加 metaSize
    }
    public List<LogModel> copyValuesAndClear() {
        List<LogModel> logModels = List.copyOf(this.values); // 深拷贝(不可变)
        this.values.clear();
        this.metaSize = 0L;
        this.insertTime = System.currentTimeMillis();
        return logModels;
    }
}

关键点:

  • 使用 metaSize(字节数)而非记录数控制批次,内存控制更精确。
  • List.copyOf() 创建不可变副本,避免并发修改。
  • 清空后重置 insertTime,保证超时触发准确性。

带随机抖动的超时

private final long timeoutMillis;
public ClickHouseShardSinkBuffer(..., int timeoutSec, ...) {
    // 基础超时 + 10% 随机抖动(避免惊群效应)
    this.timeoutMillis = TimeUnit.SECONDS.toMillis(timeoutSec)
                      + new SecureRandom().nextInt((int) (timeoutSec * 0.1 * 1000));
}

目的: 避免多个TM 同时触发 flush,造成写入流量峰值。

配置示例

ClickHouseShardSinkBuffer.Builder
    .aClickHouseSinkBuffer()
    .withTargetTable("single_table")  //单表时,可直接使用指定表名
    .withMaxFlushBufferSize(10000)  // 对应字节数
    .withTimeoutSec(30)              // 30 秒超时
    .withClickHouseShardStrategy(new LogClickHouseShardStrategy("table_prefix_%s", 8))  //分表策略时,使用
    // 分表策略可根据业务实际情况进行扩展
    .build(clickHouseWriter);

六、写入限流与流量控制

有界队列设计

public class ClickHouseWriter {
    // 有界阻塞队列
    private final BlockingQueue<ClickHouseRequestBlank> commonQueue;
    public ClickHouseWriter(ClickHouseSinkCommonParams sinkParams, ...) {
        // 队列最大容量配置(默认 10)
        this.commonQueue = new LinkedBlockingQueue<>(sinkParams.getQueueMaxCapacity());
    }
    public void put(ClickHouseRequestBlank params) {
        unProcessedCounter.incrementAndGet();
        // put() 方法在队列满时会阻塞,实现背压
        commonQueue.put(params);
    }
}

背压传导:

线程池并发控制

public class ClickHouseWriter {
    private final int numWriters; // 写入线程数
    private ExecutorService service;
    private void buildComponents() {
        ThreadFactory threadFactory = ThreadUtil.threadFactory("clickhouse-writer");
        service = Executors.newFixedThreadPool(numWriters, threadFactory);
        // 创建多个 WriterTask 并提交
        for (int i = 0; i < numWriters; i++) {
            WriterTask task = new WriterTask(i, commonQueue, sinkParams, futures, unProcessedCounter);
            service.submit(task);
        }
    }
}

WriterTask 消费逻辑

class WriterTask implements Runnable {
    @Override
    public void run() {
        isWorking = true;
        while (isWorking || !queue.isEmpty()) {
            // poll() 超时返回(100ms),避免无限等待
            ClickHouseRequestBlank blank = queue.poll(100, TimeUnit.MILLISECONDS);
            if (blank != null) {
                // 创建 Future 并设置超时(3 分钟)
                CompletableFuture<Boolean> future = new CompletableFuture<>();
                future.orTimeout(3, TimeUnit.MINUTES);
                futures.add(future);
                try {
                    send(blank, future, new HashSet<>());
                } finally {
                    // final 进行未知异常兜底,防止为捕获异常造成future状态不完成,永久阻塞
                    if (!future.isDone()) {
                        future.completeExceptionally(new RuntimeException("Unknown exception"));
                    }
                    queueCounter.decrementAndGet();
                }
            }
        }
    }
}

配置参数

七、重试机制与超时控制

Future 超时控制

public class ClickHouseWriter {
    private final int numWriters; // 写入线程数
    private ExecutorService service;
    private void buildComponents() {
        ThreadFactory threadFactory = ThreadUtil.threadFactory("clickhouse-writer");
        service = Executors.newFixedThreadPool(numWriters, threadFactory);
        // 创建多个 WriterTask 并提交
        for (int i = 0; i < numWriters; i++) {
            WriterTask task = new WriterTask(i, commonQueue, sinkParams, futures, unProcessedCounter);
            service.submit(task);
        }
    }
}

超时策略:

  • Future 超时: 3 分钟(orTimeout)。
  • Socket 超时: 3 分钟(socket_timeout=180000)。
  • 连接超时: 30 秒(connectionTimeout=30000)。

重试逻辑

class WriterTask implements Runnable {
    @Override
    public void run() {
        isWorking = true;
        while (isWorking || !queue.isEmpty()) {
            // poll() 超时返回(100ms),避免无限等待
            ClickHouseRequestBlank blank = queue.poll(100, TimeUnit.MILLISECONDS);
            if (blank != null) {
                // 创建 Future 并设置超时(3 分钟)
                CompletableFuture<Boolean> future = new CompletableFuture<>();
                future.orTimeout(3, TimeUnit.MINUTES);
                futures.add(future);
                try {
                    send(blank, future, new HashSet<>());
                } finally {
                    // final 进行未知异常兜底,防止为捕获异常造成future状态不完成,永久阻塞
                    if (!future.isDone()) {
                        future.completeExceptionally(new RuntimeException("Unknown exception"));
                    }
                    queueCounter.decrementAndGet();
                }
            }
        }
    }
}

重试控制逻辑

private void handleUnsuccessfulResponse(..., Set<String> exceptHosts) {
    // 检查 Future 是否已完成(避免重复完成)
    if (future.isDone()) {
        return;
    }
    if (attemptCounter >= maxRetries) {
        // 达到最大重试次数,标记失败
        future.completeExceptionally(new RuntimeException("Max retries exceeded"));
    } else {
        // 递归重试
        requestBlank.incrementCounter();
        send(requestBlank, future, exceptHosts); // 递归调用,排除失败节点
    }
}

重试策略:

  • 递归重试: 失败后递归调用,直到成功或达到最大次数。
  • 异常节点隔离: 每次重试时排除失败的节点(exceptHosts)。
  • 超时控制: Future 超时(3 分钟)防止永久阻塞。

为什么递归重试是更好的选择

递归重试(当前实现)

队列重试(假设方案)

保证一致性

  // ClickHouseWriter.java:139-158
  while (!futures.isEmpty() || unProcessedCounter.get() > 0) {
      CompletableFuture<Void> future = FutureUtil.allOf(futures);
      future.get(3, TimeUnit.MINUTES);  // 阻塞直到全部完成
  }
  • Checkpoint 时所有数据要么全部成功,要么全部失败。
  • 重启后不会有部分数据重复的问题。

简单可靠

  • 代码逻辑清晰。
  • 对于队列重试且不重复,需要复杂的二阶段提交(这里暂不展开),大幅增加代码复杂度。

性能可接受

class WriterTask implements Runnable {
    @Override
    public void run() {
        while (isWorking || !queue.isEmpty()) {
            ClickHouseRequestBlank blank = queue.poll(100, TimeUnit.MILLISECONDS);
            if (blank != null) {
                // 创建 Future 并设置 3 分钟超时
                CompletableFuture<Boolean> future = new CompletableFuture<>();
                future.orTimeout(3, TimeUnit.MINUTES); // 防止永久阻塞
                futures.add(future);
                try {
                    send(blank, future, new HashSet<>());
                } finally {
                    if (!future.isDone()) {
                        future.completeExceptionally(new RuntimeException("Timeout"));
                    }
                    queueCounter.decrementAndGet();
                }
            }
        }
    }
}
  • 虽然阻塞,但有超时保护。
  • ClickHouse 写入通常很快(秒级)。
  • 网络故障时重试也合理。

避开故障节点

  // ClickHouseWriter.java:259-260
  HikariDataSource dataSource = getNextDataSource(exceptHosts);
  • 递归时可以传递 exceptHosts。
  • 自动避开失败的节点。
  • 提高成功率。

异常节点剔除

// 特殊错误码列表(自动加入黑名单)
private final List<Integer> ignoreHostCodes = Arrays.asList(210, 1002);
public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
    if (CollectionUtils.isNotEmpty(exceptionHosts)) {
        // 过滤异常节点
        List<HikariDataSource> healthyHosts = clickHouseDataSources.stream()
            .filter(ds -> !exceptionHosts.contains(getHostFromUrl(ds)))
            .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(healthyHosts)) {
            return null; // 所有节点都异常
        }
        return healthyHosts.get(random.nextInt(healthyHosts.size()));
    }
    // 正常轮询(已 shuffle,避免热点)
    return clickHouseDataSources.get(currentRandom.getAndIncrement() % size);
}

故障节点剔除策略:

  1. 错误码 210(网络异常): 自动加入黑名单。
  2. 错误码 1002(连接池异常): 自动加入黑名单。
  3. APM 监控: 磁盘 >= 90%、HTTP 连接 >= 50 的节点。
  4. 手动配置: 通过 Redis 配置剔除。

恢复机制:

  • LoadingCache 定时刷新(1 分钟)。
  • 节点恢复健康后自动从黑名单移除。

重试流程图

八、异常处理模式

两种 Sink 模式

public Sink buildSink(String targetTable, String targetCount, int maxBufferSize) {
    IClickHouseSinkBuffer buffer = ClickHouseShardSinkBuffer.Builder
        .aClickHouseSinkBuffer()
        .withTargetTable(targetTable)
        .withMaxFlushBufferSize(maxBufferSize)
        .withClickHouseShardStrategy(new LogClickHouseShardStrategy(targetTable, count))
        .build(clickHouseWriter);
    // 根据配置选择模式
    if (ignoringClickHouseSendingExceptionEnabled) {
        return new UnexceptionableSink(buffer);  // 忽略异常
    } else {
        return new ExceptionsThrowableSink(buffer); // 抛出异常
    }
}

UnexceptionableSink(忽略异常 - At-Most-Once)

public class UnexceptionableSink implements Sink<LogModel> {
    private final IClickHouseSinkBuffer<LogModel> buffer;
    @Override
    public void put(LogModel message) {
        buffer.put(message);  // 不检查 Future 状态
    }
    @Override
    public void flush() {
        buffer.flush();
    }
}

适用场景:

  • 允许部分数据丢失。
  • 不希望因写入异常导致任务失败。
  • 对数据准确性要求不高(如日志统计)。

语义保证:At-Most-Once(最多一次)

ExceptionsThrowableSink(抛出异常 - At-Least-Once)

public class ExceptionsThrowableSink implements Sink<LogModel> {
    private final IClickHouseSinkBuffer<LogModel> buffer;
    @Override
    public void put(LogModel message) throws ExecutionException, InterruptedException {
        buffer.put(message);
        // 每次写入都检查 Future 状态
        buffer.assertFuturesNotFailedYet();
    }
    @Override
    public void flush() throws ExecutionException, InterruptedException {
        buffer.flush();
    }
}

Future 状态检查:

public void assertFuturesNotFailedYet() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> future = FutureUtil.allOf(futures);
    // 非阻塞检查
    if (future.isCompletedExceptionally()) {
        logger.error("There is something wrong with the future. exist sink now");
        future.get(); // 抛出异常,导致 Flink 任务失败
    }
}

适用场景:

  • 数据准确性要求高。
  • 需要保证所有数据写入成功。
  • 异常时希望 Flink 任务失败并重启。

语义保证:At-Least-Once(至少一次)

Future 清理策略与并发控制

定时检查器

public class ClickHouseSinkScheduledCheckerAndCleaner {
    private final ScheduledExecutorService scheduledExecutorService;
    private final List<CompletableFuture<Boolean>> futures;
    // ⚠️ volatile 保证多线程可见性(关键并发控制点)
    private volatile boolean isFlushing = false;
    public ClickHouseSinkScheduledCheckerAndCleaner(...) {
        // 单线程定时执行器
        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(factory);
        // 定时执行清理任务(每隔 checkTimeout 秒,默认 30 秒)
        scheduledExecutorService.scheduleWithFixedDelay(getTask(), ...);
    }
    private Runnable getTask() {
        return () -> {
            synchronized (this) {
                //  关键:检查是否正在 flush,避免并发冲突
                if (isFlushing) {
                    return; // Checkpoint 期间暂停清理
                }
                // 1️⃣ 清理已完成的 Future
                futures.removeIf(filter);
                // 2️⃣ 触发所有 Buffer 的 flush(检查是否需要写入)
                clickHouseSinkBuffers.forEach(IClickHouseSinkBuffer::tryAddToQueue);
            }
        };
    }
    // Checkpoint flush 前调用(暂停 cleaner)
    public synchronized void beforeFlush() {
        isFlushing = true;
    }
    // Checkpoint flush 后调用(恢复 cleaner)
    public synchronized void afterFlush() {
        isFlushing = false;
    }
}

核心设计:

  • volatile boolean isFlushing: 标志位,协调 cleaner 与 checkpoint 线程。
  • synchronized (this): 保证原子性,避免并发冲突。
  • 单线程执行器: 避免 cleaner 内部并发问题。

并发控制机制

问题场景:

时间轴冲突:
T1: Cleaner 线程正在执行 tryAddToQueue()
T2: Checkpoint 触发,调用 sink.flush()
T3: Cleaner 同时也在执行 tryAddToQueue()
    ├─ 可能导致:数据重复写入
    ├─ 可能导致:Buffer 清空顺序混乱
    └─ 可能导致:Future 状态不一致

解决方案:

// ClickHouseSinkManager.flush()
public void flush() {
    // 1️⃣ 暂停定时清理任务(设置标志)
    clickHouseSinkScheduledCheckerAndCleaner.beforeFlush(); // isFlushing = true
    try {
        // 2️⃣ 执行 flush(此时 cleaner 线程会跳过执行)
        clickHouseWriter.waitUntilAllFuturesDone(false, false);
    } finally {
        // 3️⃣ 恢复定时清理任务
        clickHouseSinkScheduledCheckerAndCleaner.afterFlush(); // isFlushing = false
    }
}

并发控制流程:

关键设计点:

  1. volatile 保证可见性: isFlushing 使用 volatile,确保多线程间的可见性。
  2. synchronized 保证原子性: getTask() 整个方法体使用 synchronized (this)。
  3. 标志位协调: 通过 isFlushing 标志实现两个线程间的协调。
  4. finally 确保恢复: 即使 waitUntilAllFuturesDone() 异常,也会在 finally 中恢复 cleaner。

避免的并发问题:

  • 数据重复写入: Cleaner 和 Checkpoint 同时 flush。
  • Buffer 状态不一致: 一边清空一边写入。
  • Future 清理冲突: 正在使用的 Future 被清理。

性能影响:

  • Checkpoint flush 期间,cleaner 暂停执行(通常 1-3 秒)。
  • Cleaner 跳过的周期会在下次正常执行时补偿。
  • 对整体吞吐影响极小(cleaner 间隔通常 30 秒)。

九、Checkpoint 语义保证

为什么 Checkpoint 时必须 Flush?

不 Flush 的后果

不Flush导致数据永久丢失

正确做法

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    logger.info("start doing snapshot. flush sink to ck");
    // 1. 先 flush buffer(将内存数据写入 ClickHouse)
    if (sink != null) {
        sink.flush();
    }
    // 2. 等待所有写入完成
    if (sinkManager != null && !sinkManager.isClosed()) {
        sinkManager.flush();
    }
    // 此时 Checkpoint 才能标记为成功
    logger.info("doing snapshot. flush sink to ck");
}

Flush 实现与并发协调

public class ClickHouseSinkManager {
    public void flush() {
        //  步骤1:暂停定时清理任务
        clickHouseSinkScheduledCheckerAndCleaner.beforeFlush(); // isFlushing = true
        try {
            //  步骤2:执行 buffer flush + 等待所有写入完成
            clickHouseWriter.waitUntilAllFuturesDone(false, false);
        } finally {
            //  步骤3:恢复定时清理任务(finally 确保执行)
            clickHouseSinkScheduledCheckerAndCleaner.afterFlush(); // isFlushing = false
        }
    }
}

并发协调详解:

// cleaner 线程执行流程
synchronized (this) {
    if (isFlushing) {
        return; // Checkpoint 期间跳过本次执行
    }
    // 正常执行:清理已完成的 Future + 触发 Buffer flush
    futures.removeIf(filter);
    buffers.forEach(Buffer::tryAddToQueue);
}

关键点:

  • volatile 可见性: isFlushing 使用 volatile 确保 cleaner 线程立即看到状态变化。
  • synchronized互斥: getTask()方法体使用 synchronized (this) 确保原子性。
  • 标志位协调: 通过 beforeFlush() / afterFlush() 管理标志位。
  • finally 保证恢复: 即使 flush 异常,也会在 finally 中恢复 cleaner。

等待所有 Future 完成

public synchronized void waitUntilAllFuturesDone(boolean stopWriters, boolean clearFutures) {
    try {
        // 循环等待:直到所有 Future 完成 + 队列清空
        while (!futures.isEmpty() || unProcessedCounter.get() > 0) {
            CompletableFuture<Void> all = FutureUtil.allOf(futures);
            // 最多等待 3 分钟(与 Future 超时一致)
            all.get(3, TimeUnit.MINUTES);
            // 移除已完成的 Future(非异常)
            futures.removeIf(f -> f.isDone() && !f.isCompletedExceptionally());
            // 检查是否有异常 Future
            if (anyFutureFailed()) {
                break; // 有异常则退出
            }
        }
    } finally {
        if (stopWriters) stopWriters();
        if (clearFutures) futures.clear();
    }
}

关键逻辑:

  • 循环等待直到所有 Future 完成 + 队列清空。
  • 超时 3 分钟(与 Future 超时一致)。
  • 移除已完成的非异常 Future。
  • 有异常时退出循环。

三种 Flush 触发方式对比

Checkpoint 参数配置

// Checkpoint 配置建议
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用 Checkpoint(间隔 1 分钟)
env.enableCheckpointing(60000);
// Checkpoint 超时(必须大于 Future 超时 + 重试时间)
// 建议:CheckpointTimeout > FutureTimeout * MaxRetries
env.getCheckpointConfig().setCheckpointTimeout(600000); // 10 分钟
// 一致性模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 最小间隔(避免过于频繁)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 30 秒
// 最大并发 Checkpoint 数
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

语义保证

推荐配置:

生产环境: 使用 ExceptionsThrowableSink + Checkpoint。

允许部分丢失: 使用 UnexceptionableSink。

十、最佳实践与调优

生产配置

// ========== ClickHouse 连接参数 ==========
clickhouse.sink.target-table = tb_logs_local
clickhouse.sink.max-buffer-size = 104857600        // 批次大小
clickhouse.sink.table-count = 0                // 0 表示不分表
// ========== 写入性能参数 ==========
clickhouse.sink.num-writers = 10               // 写入线程数
clickhouse.sink.queue-max-capacity = 10        // 队列容量
clickhouse.sink.timeout-sec = 30               // flush 超时
clickhouse.sink.retries = 10                   // 最大重试次数
clickhouse.sink.check.timeout-sec = 30         // 定时检查间隔
// ========== 异常处理参数 ==========
clickhouse.sink.ignoring-clickhouse-sending-exception-enabled = false
clickhouse.sink.local-address-enabled = true   // 启用本地表写入
// ========== ClickHouse 集群配置 ==========
clickhouse.access.hosts = 192.168.1.1:8123,192.168.1.2:8123,192.168.1.3:8123
clickhouse.access.user = default
clickhouse.access.password = ***
clickhouse.access.database = dw_xx_xx
clickhouse.access.cluster = default
// ========== HikariCP 连接池配置 ==========
connectionTimeout = 30000                      // 连接超时 30s
maximumPoolSize = 20                           // 最大连接数 20
minimumIdle = 2                                // 最小空闲 2
socket_timeout = 180000                        // Socket 超时 3mi

性能调优

故障排查

十一、总结

本文深入分析了 Flink ClickHouse Sink 的实现方案,核心亮点包括:

技术亮点

  • 连接池选型: 使用 HikariCP,性能优异,连接管理可靠。
  • Future 超时控制: orTimeout(3min) 防止永久阻塞。
  • 显式资源管理: Connection 和 PreparedStatement 显式关闭,防止连接泄漏。
  • 负载均衡优化: Shuffle 初始化 + 轮询选择,避免热点。
  • 异常处理增强: future.isDone() 检查,避免重复完成。
  • 本地表写入: 动态节点发现 + 故障剔除,写入性能提升。
  • 分片策略: 按表(应用)维度路由,独立缓冲和隔离。
  • 攒批优化: 双触发机制(大小 + 超时)+ 随机抖动。
  • 流量控制: 有界队列 + 线程池,实现背压。
  • 健壮重试: 递归重试 + 异常节点剔除 + 最大重试限制。

Checkpoint 语义

  • At-Least-Once: ExceptionsThrowableSink + Checkpoint。
  • At-Most-Once: UnexceptionableSink。
  • Exactly-Once: 需要配合 ClickHouse 事务(未实现)。

生产建议

  1. 必须: Checkpoint 时 flush,否则会丢数据。
  2. 推荐: 使用 HikariCP + 本地表写入。
  3. 推荐: 配置合理的超时(Future < Socket < Checkpoint)。
  4. 推荐: 监控队列大小、Future 失败率、重试次数。

该方案已在生产环境大规模验证,能够稳定支撑百万级 TPS 的日志写入场景。

往期回顾

1.服务拆分之旅:测试过程全揭秘|得物技术

2.大模型网关:大模型时代的智能交通枢纽|得物技术

3.从“人治”到“机治”:得物离线数仓发布流水线质量门禁实践

4.AI编程实践:从Claude Code实践到团队协作的优化思考|得物技术

5.入选AAAI-PerFM|得物社区推荐之基于大语言模型的新颖性推荐算法

文 /虚白

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

流式计算任务通常需要 7x24 小时长期运行,面对网络抖动、机器故障或代码 Bug,如何保证任务不挂?或者挂了之后能自动恢复且数据不丢、不重?这正是 Flink 引以为傲的资本:强大的状态管理基于 Checkpoint 的容错机制

本文将带你深入理解 Flink 是如何“记忆”数据的,以及它是如何在故障发生时“时光倒流”恢复现场的。

一、什么是状态(State)

在流计算中,数据是一条条流过的。如果处理一条数据时,需要依赖之前的数据(例如:计算过去一小时的总和、去重、模式匹配),那么这些“之前的数据”或“中间计算结果”就是状态

1. 状态的分类

Flink 的状态分为两大类:Managed State(托管状态)Raw State(原生状态)。我们日常开发 99% 使用的是托管状态,由 Flink 运行时自动管理内存、序列化和故障恢复。

Managed State 又细分为:

  • Keyed State(键控状态)

    • 只能在 KeyedStream(即 keyBy 之后)上使用。
    • 状态是跟 Key 绑定的。Flink 为每个 Key 维护一份独立的状态实例。
    • 常用类型:ValueStateListStateMapStateReducingStateAggregatingState
  • Operator State(算子状态)

    • 绑定到算子并行实例(SubTask),与 Key 无关。
    • 常用于 Source Connector(记录读取的 Offset)或 Sink Connector(事务控制)。
    • 常用接口:ListStateUnionListStateBroadcastState

二、状态后端(State Backends)

状态存在哪里?是内存还是磁盘?这由 State Backend 决定。在 Flink 1.13 之后,配置方式简化为以下两种主要模式:

1. HashMapStateBackend (基于内存)

  • 存储位置:Java 堆内存(Heap)。
  • 特点:读写速度极快(对象直接访问,无序列化开销)。
  • 适用场景:状态较小(例如仅仅是简单的 Count 或去重),对延迟极其敏感的场景。
  • 缺点:受限于 JVM 堆大小,容易 GC;状态过大时可能 OOM。

2. EmbeddedRocksDBStateBackend (基于磁盘)

  • 存储位置:TaskManager 本地磁盘(基于 RocksDB 数据库),内存中只作为缓存(Off-heap)。
  • 特点:支持超大状态(TB 级别),不受 JVM 堆限制。
  • 适用场景:超大窗口、超长周期的聚合、海量 Key 的去重。
  • 缺点:需要序列化/反序列化,读写性能略低于内存版;需要调优 RocksDB 参数。

3. 配置示例

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 设置状态后端为 RocksDB
env.setStateBackend(new EmbeddedRocksDBStateBackend());

// 配合 Checkpoint 存储路径(存储在本地文件系统)
env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink/checkpoints");

三、容错核心:Checkpoint

Checkpoint(检查点)是 Flink 容错机制的灵魂。它是一个全局一致性快照,定期将所有算子的状态持久化到远程存储(如 HDFS)。

1. 核心原理:Barrier 对齐

Flink 使用 Chandy-Lamport 算法 的变体。

  1. Barrier 注入:JobManager 向 Source 发送 Checkpoint Barrier。
  2. Barrier 流动:Barrier 像普通数据一样在流中传输。
  3. 对齐(Alignment):当算子有多个输入流时,必须等待所有流的 Barrier 到齐,才能进行 Snapshot。这保证了状态的一致性(即 Exactly-Once)。
  4. 异步快照:算子将状态写入远程存储(异步过程),不阻塞数据处理。
  5. 确认完成:所有算子都完成快照后,JobManager 确认 Checkpoint 成功。

2. Checkpoint 配置实战

默认情况下 Checkpoint 是关闭的,生产环境必须开启

// 1. 开启 Checkpoint,每 5000ms 触发一次
env.enableCheckpointing(5000);

// 2. 设置 Checkpoint 模式(默认 EXACTLY_ONCE,也可以设为 AT_LEAST_ONCE)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 3. 设置两次 Checkpoint 之间的最小间隔(防止频繁 Checkpoint 导致性能下降)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);

// 4. Checkpoint 超时时间(默认 10分钟)
env.getCheckpointConfig().setCheckpointTimeout(60000);

// 5. 允许同时进行的 Checkpoint 数量(通常设为 1)
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 6. 开启作业取消时保留 Checkpoint(非常重要!否则 Cancel 任务会删除 Checkpoint)
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

// 7. 容忍 Checkpoint 失败次数(默认 0,即 Checkpoint 失败会导致任务重启)
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);

四、Savepoint:手动的超级 Checkpoint

虽然 Checkpoint 和 Savepoint 看起来很像(都是快照),但它们的定位完全不同:

特性CheckpointSavepoint
触发方式Flink 定时自动触发用户手动命令触发
主要目的故障恢复(Failover)运维操作(升级、扩容、迁移)
存储格式增量存储(依赖 StateBackend 优化)标准格式,全量存储(可跨版本)
生命周期随作业生命周期管理(除非设置保留)用户自行管理(删除需手动)

常用命令

# 触发 Savepoint
bin/flink savepoint <jobId> [targetDirectory]

# 从 Savepoint 重启作业 (或者 Checkpoint)
bin/flink run -s <savepointPath> ...

五、重启策略(Restart Strategies)

当任务发生故障(Exception)时,Flink 会尝试根据配置的策略自动重启。

// 1. 固定延迟重启(尝试 3 次,每次间隔 10秒)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
    3, 
    Duration.ofSeconds(10)
));

// 2. 失败率重启(在 5 分钟内失败超过 3 次则停止,否则每次间隔 10秒重启)
env.setRestartStrategy(RestartStrategies.failureRateRestart(
    3, 
    Duration.ofMinutes(5), 
    Duration.ofSeconds(10)
));

// 3. 无重启(直接失败)
env.setRestartStrategy(RestartStrategies.noRestart());

六、总结

  • State 是 Flink 实现复杂逻辑的记忆。
  • State Backend 决定了记忆存哪里(内存快但小,RocksDB 大但需序列化)。
  • Checkpoint 是自动化的定期备份,保证故障恢复后的数据一致性。
  • Savepoint 是手动的高级备份,用于版本升级和应用迁移。

掌握了状态与容错,你的 Flink 任务才算真正具备了“生产级”的健壮性。下一篇,我们将探讨 Flink SQL,看看如何用 SQL 解决 80% 的流计算需求。


原文来自:http://blog.daimajiangxin.com.cn

源码地址:https://gitee.com/daimajiangxin/flink-learning

摘要:本文整理自阿里采集分析平台工程技术负责人 吴宝国 老师,在 Flink Forward Asia 2025 城市巡回深圳站中的分享。

Tips:关注「公众号」回复 FFA 2025 查看会后资料~

大家好,我是来自阿里集团平台技术部数据技术与产品部的吴宝国。今天非常荣幸能在这里跟大家分享我们在阿里内部大规模落地 Fluss 的一些实践经验。

首先简单介绍一下我们团队。我们团队主要负责集团内部统一的用户行为采集与分析平台,也就是大家常说的 A+ 平台。我们的核心职责是为手淘、钉钉、高德、饿了么等众多集团内应用提供端到端的用户行为数据采集、处理、分析及服务能力。

1.png

在底层,我们构建了覆盖 Web、小程序、APP(包括 Android、iOS、PC、IOT、鸿蒙、VR 等)以及服务端的全场景采集 SDK 矩阵。在此之上,我们不仅采集用户的行为日志(比如点击、曝光、滑动等),还会融合业务数据(如用户标签、商品信息、订单数据等),构建服务于整个集团的流量域数据公共层。最终,我们通过分析产品帮助业务团队洞察用户行为,驱动运营和产品决策,例如提升广告效果、优化用户体验等。

为了支撑这一庞大体系的实时性需求,我们引入了开源流存储系统 Fluss 作为核心的日志数据实时采集通道。接下来,我将从为什么选择 Fluss、如何保障大规模落地稳定性、具体业务实践案例以及未来规划四个方面展开分享。

一、为什么选择 Fluss?——解决两大核心痛点

在引入 Fluss 之前,我们的实时数据架构长期面临两个根本性挑战。

(1)成本高昂:行式消息队列导致资源浪费严重

2.png

我们过去主要依赖阿里内部的行式消息队列 TT(TimeTunnel)。以手淘的实时流量公共层为例,这张表包含了首页、闪购、搜索等多个业务的数据。每个下游业务(比如推荐系统)都需要一个独立的 Flink 作业来消费这张全量表,然后在作业内进行过滤,只保留自己关心的部分。

这种模式带来了三重成本问题:

  • 存储与流量成本倍增:计费通常基于读写流量。即使每个业务只关心 1% 的数据,也需要为 100% 的全量数据付费。如果有 N 个业务,就要支付 N 倍的费用。
  • Flink CU 资源浪费:Flink 作业需要消耗大量计算单元(CU)来读取、反序列化并丢弃无用的数据。很多时候,作业空跑不做任何逻辑处理,但依然产生高昂开销。
  • 字段冗余读取:一张表可能包含数百个字段,但单个业务往往只需要其中几个。行式存储迫使消费者读取整行数据,造成巨大的 IO 和网络带宽浪费。

Fluss 通过其三大核心能力完美解决了上述问题:

  • 多级分区(Multi-level Partitioning):支持按业务、按场景等维度对数据进行精细划分。
  • 过滤下推(Filter Pushdown):消费者可以在订阅时声明过滤条件,数据在源头即可被精确过滤,避免全量拉取。
  • 列式存储(Columnar Storage):允许消费者只读取所需的字段,极大降低数据消费量和 Flink CU 消耗。

(2)湖流割裂:Lambda 架构的运维与一致性困境

3.png

业界经典的 Lambda 架构虽然能同时提供实时和离线视图,但维护两套独立的批处理和流处理链路,带来了开发、运维成本高企以及数据统计口径不一致等问题。

随着数据湖技术(如 Paimon、Hudi)的发展,湖仓一体架构成为主流,但它通常只能提供分钟级的数据新鲜度。对于搜索、推荐等要求秒级延迟的核心场景,我们仍需引入 Kafka 这类流式中间件,这实际上又回到了 Lambda 架构的老路,导致“湖”与“流”的割裂。

4.png

Fluss 的出现为我们提供了一个统一的解决方案:它既能作为高性能的流存储提供秒级数据新鲜度,又能通过其内置的分层存储(Tiering)能力无缝对接数据湖(如阿里内部的 Alake),真正实现了“湖流一体”,消除了双架构的痛点。

二、首次双11落地情况:大规模生产验证

2025 年的双 11 是 Fluss 在阿里集团的首次大促实战。目前,Fluss 已稳定服务于淘天(含通天塔、阿里妈妈等)、集团数据公共层、饿了么、淘宝闪购、高德、阿里影业等多个核心业务,核心场景主要集中在搜索、推荐、流量等。

5.png

在本次双十一期间,Fluss 展现了强大的承载能力:

  • 数据量:4 PB/天
  • TPS峰值:1 亿
  • BPS峰值:100 GiB/s

这些数据充分证明了 Fluss 在大规模、高并发场景下的稳定性和可靠性。

三、集群部署架构

阿里集团内部的业务特点与云上有所不同,因此我们的部署架构也进行了针对性设计。

我们采用了“大集群 + 区域化部署”的模式。不同地域(如张北、上海)拥有独立的 Fluss 集群,而同一地域内的不同业务(如高德、钉钉、淘天)则通过数据库(DB)级别进行逻辑隔离。数据持久化在阿里自研的分布式文件系统 盘古 上,并通过 Tiering Service 同步至内部数据湖 Alake

6.png

此架构的优势在于:

  • 资源复用:多个业务共享一个大集群,提高资源利用率。
  • 版本收敛:集群数量少,便于统一升级和管理。
  • 运维集约:减少运维复杂度。

但也带来挑战:

  • 运维压力:单一集群机器数量庞大,运维难度增加。
  • 资源隔离:需要额外机制保障不同业务间的资源隔离。

为此,我们开发了独立的 Fluss Manager 来管理账号权限和集群配置,并在 VVP(Fluss 专有空间)中独立部署 Tiering Service(Flink Job),确保其稳定运行。

为了保障如此大规模集群的稳定运行,我们在多个方面进行了深度建设。

(1) 机架感知(Rack Awareness)

为防止物理机或机架故障导致数据丢失,我们实现了严格的副本放置策略。

7.png

  • 机架感知前:三个副本可能分配在同一台物理机上的三个 Pod 上。一旦该物理机故障,将导致三副本数据丢失!
  • 机架感知后:三副本规避策略,不允许分配在同机房-同机架-同物理机上。即使一台物理机故障,仍有两副本工作,保障数据安全。

(2) 监控告警体系

8.png

我们建立了覆盖全栈的立体化监控告警体系:

  • 基础设施监控:包括物理机性能(磁盘容量、读写IO、网络流量、CPU、内存)和 Pod 性能。
  • 服务端监控:监控 CoordinatorServer、Tablet Server 等核心组件的 Metrics 和日志。
  • 远程存储监控:监控 Remote Storage (OSS/Pangu/HDFS) 的 QPS、读写延迟、带宽和容量。
  • 数据湖监控:监控 Alake 的水位、读写情况,防止因数据灌入过载而影响湖仓。
  • 告警服务:基于 Prometheus + SLS 的监控系统,实现及时告警。

四、稳定性建设

(1) 集群扩缩容(Rebalance Feature)

9.png

随着业务增长,集群需要动态扩容。我们实现了 Rebalance 功能:

  1. AdminClient 发起 RebalanceRequest
  2. CoordinatorServer 收到请求后,GoalOptimizer 生成 RebalancePlan
  3. RebalanceExecutor 执行计划,通知 Tablet Server 迁移 Bucket Leader 和 ISR。
  4. 新节点加入后,负载均衡,完成扩容。

(2) 表扩缩容(Bucket Rescale)

10.png

当单表流量增大时,可通过 ALTER TABLE 增加 Bucket 数量。

  1. Client 发起 ALTER TABLE 命令。
  2. Coordinator 计算新增 Bucket 的分布,并更新 Zookeeper 中的 TableAssignment
  3. Coordinator 通知所有 Tablet Server 创建新的 Bucket Replica。
  4. Tablet Server 创建 Replica 并开始接收数据。

注意:客户端需重启以感知新分区,期间消费任务可能有短暂波动。

(3) 无感升级(Controlled Shutdown)

11.png

为保障升级过程对在线作业无明显影响,我们实现了无感升级:

  1. 待下线 Tablet Server 发送 controlledShutdownRequest 给 Coordinator。
  2. Coordinator 执行 

    • 步骤1:重选 Leader(新 Leader 上线)。
    • 步骤2:下线 Follower。
    • 步骤3:关闭其他资源。
  3. 整个过程保证读写延迟波动小于 1 分钟,Leader 持续在线。
  • K8s 侧支持:支持灰度升级、滚动升级和原地升级(kill pod 并秒级拉起),提升升级效率。

(4) Coordinator HA

12.png

Coordinator 是集群的“大脑”。我们为其构建了高可用架构:

  • 主备选举:通过 Zookeeper 实现主备选举。
  • 状态同步:副节点持续监听 ZK 节点变化,保持 CoordinatorContext 一致。
  • 故障恢复:主节点宕机后,副节点自动选举为新主节点,并从 ZK 恢复上下文信息,确保元数据连续性。

(5) 压缩率与网络传输优化

13.png

为应对大规模集群的网络带宽瓶颈,我们集成了 ZSTD 列压缩算法。

  • 实测效果:在淘系数据上,开启 ZSTD 后,存储空间下降 6 倍(8.88TB → 1.52TB)。
  • 性能影响:写吞吐略有提升(3.33M/s → 3.51M/s),读吞吐基本持平(3.06M/s → 3.25M/s),CPU/内存开销可控。

(6) 上线前故障演练计划

14.png

上线前,我们执行了详尽的故障演练计划,模拟极端场景:

  • CoordinatorServer:随机宕机、反复切换 leader、大量建表和分区。
  • TableServer:随机宕机、Remote 存储堆积、Bucket 的 Replica 宕机。
  • Client:读写流量压测、一致性测试、冷数据追数据延迟测试。
  • 其他:网络拥塞、磁盘挂掉、Zookeeper 故障等。

通过这些演练,全面验证了系统的健壮性、容错能力和数据一致性。

五、湖流一体:统一架构的演进

15.png

在湖流一体这块,我们会直接从 Fluss Manager 发起“湖流一体表”的创建操作。创建完成后,会使用 Fluss 的生产账号(而不是业务自己的账号),在 Paimon 中为业务直接创建一张对应的 Paimon 表。

这张 Paimon 表与 Fluss 中的表在命名上完全一致,包括 Namespace 和 DB 名称都保持统一。这样一来,业务在 Paimon 侧可以给这张表打上“湖流一体表”的标记,在 Fluss 侧也能看到它是“湖流一体表”,对业务来说是一张“看起来统一”的表,但在底层实际上是两张独立的物理表。

数据同步方面,我们通过 Tailing Service 集群配合内部 Flink 集群,由生产账号将 Fluss 中的数据以分钟级或秒级的粒度同步到 Paimon。与此同时,在 Tailing Service 上做了一系列 Native 级别的优化,使得整体性能相较于通用的 Flink 接入方式(Flink Native)会更好一些。

六、业务实践案例与核心收益

Fluss 的落地为多个业务场景带来了显著收益,下面我将逐一介绍。

(1)淘宝数据平台:实时数仓重构

截屏2026-01-20 15.30.18.png

  • 原架构:依赖行式消息队列(TT)和离线数仓(MaxCompute/ODPS),数据新鲜度在小时级。
  • 新架构:采用 Fluss + Paimon 湖仓架构,数据新鲜度提升至秒级。
  • 收益

    • 替代行式消息队列,整体成本降低 40% 以上
    • 基于 Fluss 的列更新特性,离线/实时数据回刷时只需更新变更字段,回刷成本大幅降低
    • 简化了数据链路,下游 OLAP 引擎(如 StarRocks)可直接查询 Paimon 表。

(2)淘宝闪购:实时监控与加工

截屏2026-01-20 15.30.28.png

将流量实时 DWD 公共层写入 Fluss,并通过 Tiering Service 持久化到 Paimon。此架构既保障了秒级时效性,又支持高效的 OLAP 分析,真正实现了实时监控,产出效率远超旧版基于物化视图定时调度的方案。

(3)通天塔(AB实验平台):降本增效

截屏2026-01-20 15.30.35.png

  • 痛点:行式存储导致整行消费,资源消耗高(曝光表 44 个字段,平台仅需 13 个);数据探查困难;大 State 作业运维复杂、不稳定。
  • 方案:利用 Fluss 的列裁剪能力,结合 Paimon 存储和 StarRocks 查询。
  • 收益:读 Fluss 的 Flink 作业 CPU 占用减少 59%,内存占用减少 73%,IO 减少 20%。同时,通过 KV 表的 Merge 引擎和 Delta Join 技术,解耦了作业与状态,提升了灵活性。

(4)A+ 采集分析平台:全链路优化

截屏2026-01-20 15.30.42.png

在流量公共层应用 Fluss 的多级分区能力,显著降低了下游消费的数据量,使得下游 Flink CU 消耗降低约 35%,全链路成本降低约 70%

七、未来规划

展望未来,我们将从以下方向持续投入:

截屏2026-01-20 15.31.01.png

  1. 扩大服务规模:将 Fluss 服务推广至更多集团业务,巩固其作为统一实时数据通道的地位。
  2. 全面推进湖流一体:深化 Fluss 与 Paimon/Alake 的集成,打造更成熟、易用的湖流一体解决方案。
  3. 追求更高性能:持续优化 Fluss 内核,在吞吐、延迟、资源利用率等方面达到业界领先水平。
  4. 探索新场景:构建业界领先的 Agent 采集与评测一体化平台,为 AI Agent 在代码、电商、数据等场景的效果评估与优化提供数据基石。

🔥 阿里云流存储 Fluss 于 2026 年 1 月 13 日 正式开启免费公测

基于 Apache Fluss 打造的高性能列式流存储系统,具备毫秒级读写响应、实时数据更新及部分字段更新能力,可替换 Kafka 构建 面向分析的流式存储,结合 DLF(Paimon)等数据湖产品构建 湖流一体架构

🎁 公测活动: 公测期间单用户可 免费使用2个集群,单个集群上限80 Core,如果您在使用过程中向我们提出改进建议或评测报告,我们将依据反馈内容的深度与质量,向优质测评者 赠送定制Fluss周边礼品

流存储Fluss版公测说明:https://help.aliyun.com/zh/flink/realtime-fluss/product-overv...

复制链接或扫描下方二维码:https://survey.aliyun.com/apps/zhiliao/G-2wQFAuV

image

image


更多内容


活动推荐

复制下方链接或者扫描左边二维码

即可免费试用阿里云 Serverless Flink,体验新一代实时计算平台的强大能力!

了解试用详情:https://free.aliyun.com/?productCode=sc

在近期的 Streaming Lakehouse Meetup · Online EP.2|Paimon × StarRocks 共话实时湖仓 直播中,Apache Paimon PMC 成员/阿里云数据湖资深工程师叶俊豪带来了关于 Paimon 多模态数据湖的深度技术分享。

随着大模型训练对数据规模与多样性的要求不断提升,传统以批处理为中心的数据湖架构已难以满足 AI 工作负载对实时性、灵活性和成本效率的综合需求。特别是在推荐系统、AIGC 等典型场景中,工程师既要高频迭代结构化特征,又要高效管理图像、音频、视频等非结构化数据。面对这一挑战,Paimon 作为新一代流式数据湖存储引擎,正通过一系列底层创新,构建面向 AI 原生时代的统一数据基础设施。

一、结构化场景下的“列变更”困境

在推荐、广告等 AI 应用中,特征工程是一个持续演进的过程。例如,电商团队可能今天新增“用户近7日点击品类分布”,明天又加入“跨端行为一致性评分”。这种动态列变更导致“列爆炸”问题:表结构频繁扩展,而历史数据需与新特征对齐。

image.png

然而,已知的解决方案在此场景下仍然存在一些问题:

  • 主键表 partial-update:虽支持按主键更新部分列,但其基于 LSM 树的实现会在写入频繁时产生大量小文件,查询性能急剧下降;Compaction 虽可合并文件,却带来数倍的临时存储开销。
  • odps 存新特征值 + Join 拼接方案:将新特征写入独立表,查询时通过主键 Join 合并。看似避免了重写,但 Join 操作本身在 PB 级数据上开销巨大,且难以优化。
  • Append 表 + MERGE INTO:SQL 语法简洁,但底层仍需重写整个数据文件。对于每天增量达 PB 级的训练集,全量重写不仅成本高昂,还显著拖慢特征上线周期。

这些方案本质上都未能解耦“列”的物理存储,导致灵活性与效率不可兼得。

二、Paimon 的列分离架构:以全局 Row ID 为核心

Paimon 提出了 列分离存储架构,其核心是引入 全局唯一且连续的 Row ID。每行数据在首次写入时被分配一个在整个表生命周期内不变的 ID,且每个数据文件内的 Row ID 是连续的,元数据会记录该文件的起始 Row ID。

image.png

这一设计带来两个关键能力:

  1. 精准定位任意行:通过 Row ID 可直接定位到具体文件及偏移;
  2. 跨文件自动关联:当查询涉及多个列时,系统能根据 Row ID 范围自动将分散在不同文件中的列数据在存储层合并。

例如,当新增“用户兴趣标签”列时,Paimon 仅需写入一个包含该列与对应 Row ID 的新文件,无需修改原始特征文件。查询时,引擎透明地将两组文件按 Row ID 对齐合并,无需 SQL 层 Join,也无需重写历史数据。这种机制将列变更的存储成本从 O(N) 降至 O(ΔN),极大提升了特征迭代效率,同时节省了数十倍的存储空间。

三、迈向多模态:Blob 数据类型的三大突破

AI 训练不再局限于结构化特征。AIGC、多模态大模型等场景要求数据湖能高效处理图像、短视频、长音频等非结构化数据。这类数据具有两大特点:体积差异大(几 MB 到数十 GB)、访问稀疏(训练时通常只读取片段)。

传统列式格式(如 Parquet)将多模态数据与结构化字段混存,导致即使只查用户 ID,也需加载整个含视频的大文件,I/O 效率极低。

image.png

Paimon 引入 Blob 数据类型,实现三大突破:

  1. 物理分离存储:Blob 列独立成文件,与结构化数据完全解耦。查询结构化字段时,Blob 文件完全不参与 I/O,避免资源浪费。
  2. 多引擎统一抽象:无论使用 Spark、Flink、Java SDK 还是 Python 客户端,均可通过标准的 BYTES 或 BINARY 或 BLOB 类型定义 Blob 字段,接口一致,降低接入成本。
  3. blob-as-descriptor 机制:针对超大非结构化数据(如十几GB的视频/日志文件),传统计算引擎(如Flink/Spark)无法将其全量加载到内存中处理。为此,系统引入了 blob-as-descriptor 机制——它是一种协议,通过记录数据在外部存储(如OSS)中的位置、文件路径、起始偏移和长度等元信息,将实际数据读取任务交给下游系统按需流式加载。这样避免了内存溢出,实现了大文件高效入湖。

四、生产验证与未来演进

当前,Paimon Blob 已在淘宝、天猫等核心业务中实现大规模落地,每天有近 10PB 的多模态数据(如视频、音频、图像)通过 Blob Descriptor 协议高效写入 Paimon 湖,避免了 Flink 或 Spark 将大文件全量加载到内存的问题。然而,在实际使用中仍面临三大关键挑战:

  • 数据重复与删除问题,用户常因多次上传相同内容导致大量冗余(预估约 1PB/天的重复数据),亟需高效的去重与删除机制;
  • 小文件碎片化问题,频繁的小规模写入产生海量微小 Blob 文件,严重影响读取性能和存储效率;
  • 点查召回延迟高,缺乏对主键(如 UID)或向量特征的快速索引支持,难以满足毫秒级实时查询需求。

针对上述问题,团队已规划清晰的演进路径。

  • 点查性能优化方面,推进热 ID 下推能力,并构建统一的全局索引框架,同时支持标量索引(如字符串、数值)和向量索引(用于 AI 召回),其中基础版标量索引预计本月在开源 Master 分支可用。
  • 多模态数据管理方面,启动两项核心功能:

    • 一是基于 Deletion Vector + 占位符 的逻辑删除方案,在 Compaction 阶段安全清理重复或无效数据;
    • 二是开发 Blob Compaction 机制,自动合并小文件以提升读性能和存储密度。

此外,团队还前瞻性地提出跨表 Blob 复用的构想——多个表引用同一视频时仅存储一份物理数据,虽因涉及多表状态同步与一致性保障而技术难度较高,但已列入长期优化方向。整体目标是打造一个高效、紧凑、可快速检索的多模态数据湖底座,支撑未来 AIGC 与智能推荐等场景的规模化应用。

结语

Paimon 的技术演进,从结构化场景的列分离,到多模态数据的 Blob 抽象,每一项创新都源于真实业务痛点,并反哺于工程效率的提升。它不再只是“存储数据的地方”,而是成为 AI 原生时代的数据操作系统——高效、灵活、智能。

Paimon 将长期、持续且大力投入全模态数据湖建设,全面支持图像、音视频等非结构化数据的高效入湖、去重、合并与毫秒级点查。通过 Deletion Vector、Compaction 优化和全局索引等能力,Paimon 正构建面向 AI 时代的统一数据底座。作为开放湖表格式。

阿里云DLF 在云上提供全托管的Paimon存储服务,支持Paimon的智能存储优化与冷热分层。同时,DLF提供安全、开放、支持全模态数据的一体化Lakehouse管理平台,深度融入兼容其他例如 Iceberg、Lance 等主流格式,无缝对接 Flink、Spark 等计算引擎,,为 AIGC 与多模态智能应用提供高性能、低成本、易治理的数据基础设施。

阿里云DLF提供商业版Paimon服务,新用户免费试用100GB存储,1000CUH,点击领取https://free.aliyun.com/?productCode=dlf

image.png

在数据驱动的 AI 时代,基础设施的价值,最终要体现在对业务效率的实质性推动上。 Paimon 的实践,正为整个行业提供一条通往高效、统一、智能数据湖的新路径。


阿里云DLF提供商业版Paimon服务,新用户免费试用100GB存储,1000CUH,点击领取https://free.aliyun.com/?productCode=dlf

EMR Serverless StarRocks:2025年9月登顶全球TPC-H 10TB 性能和性价比榜单,性能比传统 OLAP 引擎提升 3-5 倍,100%兼容开源StarRocks,欢迎免费测试 >> https://free.aliyun.com/?searchKey=StarRocks

前往阿里云EMR官网开通 Serverless StarRocks试用并分享体验反馈,晒图可以领取精美礼品:https://x.sm.cn/EDWpX6I


更多内容


活动推荐

复制下方链接或者扫描左边二维码

即可免费试用阿里云 Serverless Flink,体验新一代实时计算平台的强大能力!

了解试用详情:https://free.aliyun.com/?productCode=sc

在近期的 Streaming Lakehouse Meetup · Online EP.2|Paimon × StarRocks 共话实时湖仓 直播中,Apache Paimon PMC 成员/阿里云数据湖资深工程师叶俊豪带来了关于 Paimon 多模态数据湖的深度技术分享。

随着大模型训练对数据规模与多样性的要求不断提升,传统以批处理为中心的数据湖架构已难以满足 AI 工作负载对实时性、灵活性和成本效率的综合需求。特别是在推荐系统、AIGC 等典型场景中,工程师既要高频迭代结构化特征,又要高效管理图像、音频、视频等非结构化数据。面对这一挑战,Paimon 作为新一代流式数据湖存储引擎,正通过一系列底层创新,构建面向 AI 原生时代的统一数据基础设施。

一、结构化场景下的“列变更”困境

在推荐、广告等 AI 应用中,特征工程是一个持续演进的过程。例如,电商团队可能今天新增“用户近7日点击品类分布”,明天又加入“跨端行为一致性评分”。这种动态列变更导致“列爆炸”问题:表结构频繁扩展,而历史数据需与新特征对齐。

image.png

然而,已知的解决方案在此场景下仍然存在一些问题:

  • 主键表 partial-update:虽支持按主键更新部分列,但其基于 LSM 树的实现会在写入频繁时产生大量小文件,查询性能急剧下降;Compaction 虽可合并文件,却带来数倍的临时存储开销。
  • odps 存新特征值 + Join 拼接方案:将新特征写入独立表,查询时通过主键 Join 合并。看似避免了重写,但 Join 操作本身在 PB 级数据上开销巨大,且难以优化。
  • Append 表 + MERGE INTO:SQL 语法简洁,但底层仍需重写整个数据文件。对于每天增量达 PB 级的训练集,全量重写不仅成本高昂,还显著拖慢特征上线周期。

这些方案本质上都未能解耦“列”的物理存储,导致灵活性与效率不可兼得。

二、Paimon 的列分离架构:以全局 Row ID 为核心

Paimon 提出了 列分离存储架构,其核心是引入 全局唯一且连续的 Row ID。每行数据在首次写入时被分配一个在整个表生命周期内不变的 ID,且每个数据文件内的 Row ID 是连续的,元数据会记录该文件的起始 Row ID。

image.png

这一设计带来两个关键能力:

  1. 精准定位任意行:通过 Row ID 可直接定位到具体文件及偏移;
  2. 跨文件自动关联:当查询涉及多个列时,系统能根据 Row ID 范围自动将分散在不同文件中的列数据在存储层合并。

例如,当新增“用户兴趣标签”列时,Paimon 仅需写入一个包含该列与对应 Row ID 的新文件,无需修改原始特征文件。查询时,引擎透明地将两组文件按 Row ID 对齐合并,无需 SQL 层 Join,也无需重写历史数据。这种机制将列变更的存储成本从 O(N) 降至 O(ΔN),极大提升了特征迭代效率,同时节省了数十倍的存储空间。

三、迈向多模态:Blob 数据类型的三大突破

AI 训练不再局限于结构化特征。AIGC、多模态大模型等场景要求数据湖能高效处理图像、短视频、长音频等非结构化数据。这类数据具有两大特点:体积差异大(几 MB 到数十 GB)、访问稀疏(训练时通常只读取片段)。

传统列式格式(如 Parquet)将多模态数据与结构化字段混存,导致即使只查用户 ID,也需加载整个含视频的大文件,I/O 效率极低。

image.png

Paimon 引入 Blob 数据类型,实现三大突破:

  1. 物理分离存储:Blob 列独立成文件,与结构化数据完全解耦。查询结构化字段时,Blob 文件完全不参与 I/O,避免资源浪费。
  2. 多引擎统一抽象:无论使用 Spark、Flink、Java SDK 还是 Python 客户端,均可通过标准的 BYTES 或 BINARY 或 BLOB 类型定义 Blob 字段,接口一致,降低接入成本。
  3. blob-as-descriptor 机制:针对超大非结构化数据(如十几GB的视频/日志文件),传统计算引擎(如Flink/Spark)无法将其全量加载到内存中处理。为此,系统引入了 blob-as-descriptor 机制——它是一种协议,通过记录数据在外部存储(如OSS)中的位置、文件路径、起始偏移和长度等元信息,将实际数据读取任务交给下游系统按需流式加载。这样避免了内存溢出,实现了大文件高效入湖。

四、生产验证与未来演进

当前,Paimon Blob 已在淘宝、天猫等核心业务中实现大规模落地,每天有近 10PB 的多模态数据(如视频、音频、图像)通过 Blob Descriptor 协议高效写入 Paimon 湖,避免了 Flink 或 Spark 将大文件全量加载到内存的问题。然而,在实际使用中仍面临三大关键挑战:

  • 数据重复与删除问题,用户常因多次上传相同内容导致大量冗余(预估约 1PB/天的重复数据),亟需高效的去重与删除机制;
  • 小文件碎片化问题,频繁的小规模写入产生海量微小 Blob 文件,严重影响读取性能和存储效率;
  • 点查召回延迟高,缺乏对主键(如 UID)或向量特征的快速索引支持,难以满足毫秒级实时查询需求。

针对上述问题,团队已规划清晰的演进路径。

  • 点查性能优化方面,推进热 ID 下推能力,并构建统一的全局索引框架,同时支持标量索引(如字符串、数值)和向量索引(用于 AI 召回),其中基础版标量索引预计本月在开源 Master 分支可用。
  • 多模态数据管理方面,启动两项核心功能:

    • 一是基于 Deletion Vector + 占位符 的逻辑删除方案,在 Compaction 阶段安全清理重复或无效数据;
    • 二是开发 Blob Compaction 机制,自动合并小文件以提升读性能和存储密度。

此外,团队还前瞻性地提出跨表 Blob 复用的构想——多个表引用同一视频时仅存储一份物理数据,虽因涉及多表状态同步与一致性保障而技术难度较高,但已列入长期优化方向。整体目标是打造一个高效、紧凑、可快速检索的多模态数据湖底座,支撑未来 AIGC 与智能推荐等场景的规模化应用。

结语

Paimon 的技术演进,从结构化场景的列分离,到多模态数据的 Blob 抽象,每一项创新都源于真实业务痛点,并反哺于工程效率的提升。它不再只是“存储数据的地方”,而是成为 AI 原生时代的数据操作系统——高效、灵活、智能。

Paimon 将长期、持续且大力投入全模态数据湖建设,全面支持图像、音视频等非结构化数据的高效入湖、去重、合并与毫秒级点查。通过 Deletion Vector、Compaction 优化和全局索引等能力,Paimon 正构建面向 AI 时代的统一数据底座。作为开放湖表格式。

阿里云DLF 在云上提供全托管的Paimon存储服务,支持Paimon的智能存储优化与冷热分层。同时,DLF提供安全、开放、支持全模态数据的一体化Lakehouse管理平台,深度融入兼容其他例如 Iceberg、Lance 等主流格式,无缝对接 Flink、Spark 等计算引擎,,为 AIGC 与多模态智能应用提供高性能、低成本、易治理的数据基础设施。

阿里云DLF提供商业版Paimon服务,新用户免费试用100GB存储,1000CUH,点击领取https://free.aliyun.com/?productCode=dlf

image.png

在数据驱动的 AI 时代,基础设施的价值,最终要体现在对业务效率的实质性推动上。 Paimon 的实践,正为整个行业提供一条通往高效、统一、智能数据湖的新路径。


阿里云DLF提供商业版Paimon服务,新用户免费试用100GB存储,1000CUH,点击领取https://free.aliyun.com/?productCode=dlf

EMR Serverless StarRocks:2025年9月登顶全球TPC-H 10TB 性能和性价比榜单,性能比传统 OLAP 引擎提升 3-5 倍,100%兼容开源StarRocks,欢迎免费测试 >> https://free.aliyun.com/?searchKey=StarRocks

前往阿里云EMR官网开通 Serverless StarRocks试用并分享体验反馈,晒图可以领取精美礼品:https://x.sm.cn/EDWpX6I


更多内容


活动推荐

复制下方链接或者扫描左边二维码

即可免费试用阿里云 Serverless Flink,体验新一代实时计算平台的强大能力!

了解试用详情:https://free.aliyun.com/?productCode=sc

已经在虚拟机部署好Apache DolphinScheduler了,想尝试下在Flink新建一个Flink节点,然后用Flink消费Kafka数据。

Apache DolphinScheduler用的是单机部署,具体操作可以参考官方文档:DolphinScheduler | 文档中心(https://dolphinscheduler.apache.org/zh-cn/docs/3.3.2/guide/in...).

  • 前置条件:已经安装Java 11、DolphinScheduler 3.3.2、Flink 1.18.1、Kafka 3.6.0,Zookeeper用Kafka内置的。建议这些安装都下载二进制的安装包到虚拟机安装,用命令安装的不可控,我下载的二进制包如下:

配置好Flink的环境变量

1、编辑环境变量:

sudo vim ~/.bashrc

增加Flink的路径

2、使环境变量生效:

#使环境变量生效
source ~/.bashrc
#查看环境变量
echo $Flink_HOME

修改Kafka、Flink以及DolphinScheduler的配置文件

因为用的是虚拟机,为了让外面的主机能够访问到虚拟机的网络,需要修改下配置文件

  1. 修改Kafka配置:找到Kafka安装包下的config文件夹,修改config下的server.properties文件,修改listeners是为了外面的主机能够访问到虚拟机的Kafka,还有把advertised.listeners改成虚拟机地址,写样例的时候能连上虚拟机的Kafka地址,不然默认连localhost
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
#192.168.146.132修改成虚拟机ip
advertised.listeners=PLAINTEXT://192.168.146.132:9092

  1. 修改Flink配置:找到Flink安装包下的conf文件夹,修改conf下的Flink-conf.yaml文件,把里面所有的localhost地址全部改成0.0.0.0,以便主机能访问到虚拟机的Flink。还有增加jobmanager和taskmanager的内存
jobmanager.rpc.address: 0.0.0.0
jobmanager.bind-host: 0.0.0.0
jobmanager.cpu.cores: 1
jobmanager.memory.process.size: 1600m
taskmanager.bind-host: 0.0.0.0
taskmanager.host: 0.0.0.0
taskmanager.memory.process.size: 2048m
taskmanager.cpu.cores: 1

  1. 修改Apache DolphinScheduler的配置文件,从Apache DolphinScheduler的启动脚本文件dolphinscheduler-daemon.sh可以看出,配置环境变量用的是bin/env文件夹下的dolphinscheduler_env.sh

查看dolphinscheduler-daemon.sh文件:

修改dolphinscheduler_env.sh文件,新增JAVA、Flink路径:

#修改成自己的JAVA、Flink路径
export JAVA_HOME=/data/jdk-11.0.29
export Flink_HOME=/data/Flink-1.18.1

关闭防火墙,启动应用

启动应用,包括Zookeeper、Kafka、Flink以及Apache DolphinScheduler。

#关闭防火墙
sudo systemctl stop firewalld
 
# 在 Flink 根目录下,执行以下命令启动 Flink 集群
bin/start-cluster.sh
 
# 启动 ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
 
# 启动 Kafka 服务器
bin/Kafka-server-start.sh config/server.properties &
 
#创建 Kafka 主题
bin/Kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
 
#使用命令行生产者发送消息
bin/Kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
 
#消费
bin/Kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

# 启动 Standalone Server 服务
bash ./bin/dolphinscheduler-daemon.sh start standalone-server

测试

测试Flink、Apache DolphinScheduler是否能访问成功。

  1. Flink访问地址:http://localhost:8081/,localhost改成自己虚拟机地址

  1. Apache DolphinScheduler访问地址:http://localhost:12345/dolphinscheduler/ui ,localhost改成自己虚拟机地址即可登录系统 UI。默认的用户名和密码是 admin/dolphinscheduler123

编写样例

用Flink消费Kafka数据,然后打包上传到Apache DolphinScheduler,启动Flink任务:

  1. 编写样例:

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.example</groupId>
    <artifactId>Flink-Kafka-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <Flink.version>1.18.1</Flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <Kafka.version>3.6.0</Kafka.version>
    </properties>
 
    <dependencies>
        <!-- Flink核心依赖 -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-java</artifactId>
            <version>${Flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-streaming-java</artifactId>
            <version>${Flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-clients</artifactId>
            <version>${Flink.version}</version>
        </dependency>
 
        <!-- 连接器基础依赖 -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-connector-base</artifactId>
            <version>${Flink.version}</version>
        </dependency>
 
        <!-- Kafka连接器(关键修改点) -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-connector-Kafka</artifactId>
            <version>3.1.0-1.18</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Kafka</groupId>
            <artifactId>Kafka-clients</artifactId>
            <version>${Kafka.version}</version>
        </dependency>
 
        <!-- 日志依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.36</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>
 
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>https://maven.aliyun.com/repository/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>apache-releases</id>
            <url>https://repository.apache.org/content/repositories/releases/</url>
        </repository>
    </repositories>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.Flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

FlinkKafkaConsumerExample.java

import org.apache.Flink.api.common.functions.FlatMapFunction;
import org.apache.Flink.api.java.tuple.Tuple2;
import org.apache.Flink.api.java.utils.ParameterTool;
import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.Flink.streaming.api.datastream.DataStream;
import org.apache.Flink.streaming.api.functions.ProcessFunction;
import org.apache.Flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.Flink.util.Collector;
import org.apache.Flink.streaming.connectors.Kafka.FlinkKafkaConsumer;
import org.apache.Flink.api.common.serialization.SimpleStringSchema;
import org.apache.Kafka.clients.consumer.ConsumerConfig;
import org.apache.Kafka.common.serialization.StringDeserializer;
 
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
 
 
public class FlinkKafkaConsumerExample {
    private static volatile int messageCount = 0;
    private static volatile boolean shouldStop = false;
    public static void main(String[] args) throws Exception {
        // 设置执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        // Kafka 配置
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092"); // Kafka broker 地址
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消费者组
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
        // 创建 Kafka 消费者
        FlinkKafkaConsumer<String> KafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        KafkaConsumer.setStartFromEarliest(); // 从最早的消息开始消费
        DataStream<String> stream = env.addSource(KafkaConsumer);
 
        // 处理数据:分词和计数
        DataStream<Tuple2<String, Integer>> counts = stream
                .flatMap(new Tokenizer())
                .keyBy(value -> value.f0)
                .sum(1);
 
 
        counts.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {
            @Override
            public void invoke(Tuple2<String, Integer> value, Context context) {
                System.out.println(value);
                messageCount++;
 
                // 检查是否达到停止条件
                if (messageCount >= 2 && !shouldStop) {
                    System.out.println("Processed 2 messages, stopping job.");
                    shouldStop = true; // 设置标志位,表示应该停止
                }
            }
        });
 
        // 执行作业并获取 JobClient
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                // 启动作业并获取 JobClient
                org.apache.Flink.core.execution.JobClient jobClient = env.executeAsync("Flink Kafka WordCount");
                System.out.println("Job ID: " + jobClient.getJobID());
 
                // 监测条件并取消作业
                while (!shouldStop) {
                    Thread.sleep(100); // 每100毫秒检查一次
                }
 
                // 达到停止条件时取消作业
                if (shouldStop) {
                    System.out.println("Cancelling the job...");
                    jobClient.cancel().get(); // 取消作业
                }
 
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
 
        // 在主线程中等待作业结束
        future.join(); // 等待作业完成
    }
 
    // Tokenizer 类用于将输入字符串转化为单词
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
 
}
  1. 打包上传到Apache DolphinScheduler

  1. 新建Flink节点,并启动

在Apache DolphinScheduler的任务实例看启动日志:

在虚拟机启动生产者,输出字符串,然后可以在Flink查看输出Kafka生产的消息:

原文链接:https://blog.csdn.net/Analyze_ing/article/details/156940553

Apache Gravitino Introduction.png

Apache Gravitino 概要介绍

作者: shaofeng shi
最后更新: [2025-12-29]

背景

在大数据时代,企业往往需要管理来自多云多域、异构数据源的元数据,如 Apache Hive、MySQL、PostgreSQL、Iceberg、Lance、S3、GCS 等; 此外,随着 AI 模型训练和推理的大量应用,海量的多模态数据、模型元数据等也需要一种方案进行管理。传统的做法是为每个数据源单独管理元数据,这不仅增加了运维复杂度,还容易造成数据孤岛。Apache Gravitino 作为一个高性能、支持地理分布式的联邦元数据湖,为我们提供了统一管理多源元数据的解决方案。

Gravitino 最初是由 Datastrato 公司发起并创立,在2023年开源,2024年捐赠给 Apache 孵化器,在2025年5月从 Apache 孵化器毕业,成为 Apache Top Level Project。目前已经在小米、腾讯、知乎、Uber、Pinterest 等企业落地生产环境。

什么是 Apache Gravitino?

Apache Gravitino 是一个高性能、地理分布式、联邦化的元数据湖管理系统,为用户提供统一的数据和AI资产管理平台,它能够:

  • 统一元数据管理:为不同类型的数据源提供统一的元数据模型和API
  • 直接元数据管理:直接管理底层系统,变更会实时反映到源系统
  • 多引擎支持:支持Trino、Spark、Flink等多种查询引擎
  • 地理分布式部署:支持跨区域、跨云的部署架构
  • AI资产管理:不仅管理数据资产,还支持AI/ML模型的元数据管理

核心概念包括:

  • Metalake:元数据的容器/租户,通常一个组织对应一个metalake
  • Catalog:来自特定元数据源的元数据集合
  • Schema:第二级命名空间,对应数据库中的schema概念
  • Table:最底层的对象,表示具体的数据表

Gravitino 整体架构

Apache Gravitino 核心特性概述

统一元数据管理

Gravitino 提供了一个统一的元数据管理层,支持多种数据源的集成:

支持的数据源类型:

  • 关系型数据库:MySQL、PostgreSQL、OceanBase、Apache Doris、StarRocks 等
  • 大数据存储:Apache Hive、Apache Iceberg、Apache Hudi、Apache Paimon、Delta Lake(开发中)
  • 消息队列:Apache Kafka
  • 文件系统:HDFS、S3、GCS、Azure Blob Storage、阿里云 OSS
  • AI/ML 数据格式:Lance(专为AI/ML工作负载设计的列式数据格式)

REST API 服务

Gravitino 提供了丰富的 REST API 服务,支持不同数据格式的标准化访问:

Gravitino 核心 REST API

  • 完整的元数据管理 RESTful API 接口
  • 支持 Metalake、Catalog、Schema、Table 等所有元数据对象的 CRUD 操作
  • 支持用户、组、角色和权限管理的完整 API
  • 提供标签、策略、模型等高级功能的 API 接口
  • 支持多种认证方式(Simple、OAuth2、Kerberos)

Iceberg REST 服务

  • 遵循 Apache Iceberg REST API 规范
  • 支持多种后端存储(Hive、JDBC、自定义后端)
  • 提供完整的表管理和查询能力
  • 支持多种存储系统(S3、HDFS、GCS、Azure等)

Lance REST 服务

  • 实现 Lance REST API 规范
  • 专为 AI/ML 工作负载优化
  • 支持高效的向量数据存储和检索
  • 提供命名空间和表管理功能

元数据实时获取和修改

Gravitino 采用直接元数据管理模式,确保数据的实时性和一致性:

  • 实时同步:对元数据的变更会立即反映到底层数据源
  • 双向同步:支持从 Gravitino 到数据源,以及从数据源到 Gravitino 的元数据同步
  • 事务支持:保证元数据操作的原子性和一致性
  • 版本管理:支持元数据的版本控制和历史追踪

统一访问控制

Gravitino 实现了跨多数据源的统一权限管理:

核心特性:

  • 基于角色的访问控制(RBAC):支持用户、组、角色的灵活权限管理
  • 所有权模型:每个元数据对象都有明确的所有者
  • 权限继承:支持层次化的权限继承机制
  • 细粒度控制:从 Metalake 到具体表的多层级权限控制

支持的权限类型:

  • 用户和组管理权限
  • 目录和模式创建权限
  • 表、topic、fileset的读写权限
  • 模型注册和版本管理权限
  • 标签和策略应用权限

统一数据血缘

基于 OpenLineage 标准,Gravitino 提供了完整的数据血缘追踪能力:

  • 自动血缘收集:通过 Spark 插件自动收集数据血缘信息
  • 统一标识符:将不同数据源的标识符转换为 Gravitino 统一标识符
  • 多数据源支持:支持 Hive、Iceberg、JDBC、文件系统等多种数据源的血缘追踪

高可用性和扩展性

部署模式:

  • 单机部署:适合开发和测试环境
  • 集群部署:支持高可用和负载均衡
  • Kubernetes 部署:支持容器化部署和自动扩缩容
  • Docker 支持:提供官方 Docker 镜像

存储后端:

  • 支持多种元数据存储后端(MySQL、PostgreSQL等)
  • 支持分布式存储系统

安全特性

认证方式:

  • Simple 认证(用户名/密码)
  • OAuth2 认证
  • Kerberos 认证(针对 Hive 后端)

凭证管理:

  • 支持云存储凭证代理(S3、GCS、Azure等)
  • 动态凭证刷新
  • 安全的凭证传递机制

Apache Gravitino 的集成能力

Gravitino 与主流计算引擎和数据处理框架深度集成,为用户提供统一的数据访问体验。

计算引擎集成

Apache Spark

  • 通过 Gravitino Spark Connector 实现无缝集成
  • 支持 Spark SQL 和 DataFrame API
  • 自动数据血缘收集和追踪
  • 支持多种数据源的统一访问

Trino

  • 通过 Gravitino Trino Connector 服务集成
  • 支持跨数据源的联邦查询
  • 高性能的分析查询能力

Apache Flink

  • 通过 Gravitino Flink Connector 服务集成
  • 支持流批一体化数据处理
  • 实时数据处理和分析

Python 生态集成

PyIceberg

  • 支持 Python 环境下的 Iceberg 表访问
  • 与 Gravitino Iceberg REST 服务集成
  • 支持数据科学和机器学习工作流
  • 提供 Pandas 兼容的数据接口

Daft

  • 现代化的分布式数据处理框架
  • 专为 AI/ML 工作负载优化
  • 支持多模态数据处理
  • 与 Gravitino 元数据管理集成

云原生集成

Kubernetes

  • 支持 Kubernetes 原生部署
  • 提供 Helm Charts 和 Operator
  • 支持自动扩缩容和故障恢复
  • 集成云原生监控和日志系统

API 和 SDK

REST API

  • 完整的 RESTful API 接口
  • 支持所有元数据管理操作
  • 标准化的 HTTP 接口
  • 支持多种认证方式

Java SDK

  • 原生 Java 客户端库
  • 类型安全的 API 接口
  • 支持连接池和重试机制
  • 完整的异常处理

Python SDK

  • Python 客户端库
  • 支持异步操作
  • 与 Jupyter Notebook 集成
  • 支持数据科学工作流

这些集成能力使得 Gravitino 能够无缝融入现有的数据基础设施,为用户提供统一、高效的数据管理体验。后续文章将详细介绍 Gravitino 的各项能力、各个集成组件的配置和使用方法,敬请关注。

下一步


Apache Gravitino正在快速发展中,本文基于最新版本编写。如遇到问题,建议查阅官方文档或在GitHub上提交issue。

作者:朱奥 /淘天集团高级数据工程师

导读:双 11 等大促场景会在短时间内集中爆发:运营与业务 BI 在开卖后的窗口期密集访问数据产品,瞬时请求量陡增,对查询引擎的稳定性、成本与治理体系提出极高要求。与此同时,业务对近实时数据产品的诉求持续增强,传统“多存储、多链路、依赖回刷”的模式在研发效率、回刷成本与响应速度上逐步暴露瓶颈。

本文围绕 Paimon 与 StarRocks 的组合实践,梳理淘天在大规模 OLAP 查询场景下的架构演进与双 11 保障体系:通过实时与离线统一入湖,消除数据同步链路与多份存储成本;基于稳定中间层叠加在线现算与维表实时关联,将高消耗回刷转化为秒级查询,核心场景回刷效率提升约 80%,年化节省成本接近 1000 万;同时结合 StarRocks + RoaringBitmap 低成本解决跨天交叉实时 UV 计算难题,满足大促近实时决策需求。

1 淘天集团营销活动 OLAP 查询的探索背景与核心策略

1.1 当前数据架构

首先,简要介绍当前的数据架构与数据流转方式。

从 DWD 层开始,我们的数据分为实时与离线两条主链路:

  • 实时数据主要存储在 TT中,在业界可类比为 Kafka 一类的消息队列;
  • 离线数据主要存储在 ODPS 中。

在数据加工与写入层面,我们会启动 Flink 流批一体任务:

  • 实时侧持续消费 TT 中的数据;
  • 离线侧消费 ODPS 中的数据;
  • 在计算过程中,任务会关联多类 ODPS 维表,例如类目维表、商家分层等维度信息。
  • 计算完成后,结果统一写入 ADS 层的Holo 表中,并在数据服务层对外透出。

在纯离线场景下,我们会通过 ODPS 任务读取 ODPS 数据,同时写入 ADS 层对应的 ODPS 表。这里既包含历史天级数据,也包含历史小时级数据。当存在查询加速需求时,我们还会将 ODPS 数据进一步导入到 Holo 中。

在数据服务层,我们主要通过 Holo 或 MC 对外提供数据服务。我们会根据查询时延要求选择不同的服务路径:当业务对响应速度要求更高、需要达到毫秒级时,通常通过 Holo 提供查询服务;当时延要求相对宽松,例如百毫秒级或秒级,则更多通过 MC 来承载查询请求。

1.2 业务诉求与核心痛点

随着业务发展,我们当前面临的诉求主要来自两个方向:一是业务侧希望获得更多实时数据产品;二是业务 BI 的实时分析需求持续增长。这对数据研发提出了新的挑战:进一步提升研发效率。

回到现有架构,其核心痛点主要体现在两方面:

  • 流批存储不统一:实时数据存储在 TT 中,离线数据存储在 ODPS 中;当存在查询加速需求时,部分数据还需要进一步落到 Holo 表中。
  • 整体开发架构较为复杂,数据需要在多个存储介质之间流转,导致端到端链路拉长。

在查询特性上,Holo 在点查场景具备更突出的性能优势,且整体稳定性较强,在淘天历年大促期间的表现也相对稳定。

但在更常见的 Shuffle 场景下,整体查询性能相对一般。尤其当 OLAP 查询负载更重、需要进行更复杂的计算,或需要关联规模较大的维表时,Shuffle 相关的执行效率会成为瓶颈,导致查询耗时明显拉长。

数据更新与维护上也存在较高成本。以 ODPS 中的维表为例(如类目维表、商家分层维表等),当维表发生业务变更时,往往需要触发 ADS 层任务的回刷,从而带来额外的回刷开销。

业务对“近实时”的诉求在部分场景下出现了被动降级。例如在跨天实时 UV 等场景中,由于 state 规模较大、成本较高等原因,方案不得不从实时级别降级到小时级别。从业务视角看,近实时能力仍然是明确存在的需求。

1.3 核心策略

1)架构简化提效

  • 架构上实现 存储介质的统一:将实时与离线数据统一沉淀到 Paimon 的湖存储中。在此基础上, StarRocks 可以直接面向湖存储进行高性能分析查询,从而能够消除数据同步链路以及多份存储带来的成本。
  • 降低使用门槛,让数据更容易被上层分析与 BI 使用。以实时链路为例,原本实时数据存储在 TT 中,而 TT 的数据形态具有明显特征:每行数据是一个字符串、缺少 schema。在这种形态下,数据虽然可以被消费,但如果要面向 BI 分析使用,往往还需要额外进行反序列化与解析,这会带来不可忽视的工程与使用成本。

在统一存储之后,Paimon 将实时与离线数据沉淀在同一张表中,并提供明确的 schema。这意味着,上层使用方可以直接面向结构化数据开展分析:即使分析师的数据开发能力不强,也可以基于 Paimon 的近实时中间层,通过 StarRocks 自助完成对近实时数据的分析。

在这种模式下,过去一些相对简单的取数与分析需求,可以由 BI 或分析师通过自助方式直接完成,不再必须提交给数据研发排期处理,从而在一定程度上减少数据开发侧的需求量与交付压力。

2) 业务难点攻坚

通过稳定的中间层 Paimon,以及“OLAP 实时关联易变维度”的模式,将原本高消耗的 ADS 回刷任务转化为秒级查询来完成。在后续内容中,我会进一步展开这一改造如何将高消耗回刷去掉,并带来显著的成本收益——每年可节省近千万元级别的回刷成本。

同时,我们通过 StarRocks + RoaringBitmap 的方案,高性能解决了跨天交叉实时 UV 的计算难题,以更低成本的方式满足大促期间对近实时能力的诉求。

1.4 新数据架构

在秒级数据链路上,我们通过实时 Flink 任务消费 DWD 层的 Fluss(秒级实时数据),并将结果写入 ADS 层的 Fluss。

Fluss 提供“湖流一体”的同步开关。开启后,Fluss 中的数据会按配置周期自动同步到 Paimon 表中,默认周期可以是每 3 分钟,且该时间间隔支持用户自定义配置。同步完成后,Paimon 表中会形成当天分钟级数据(t 当天)以及 t−n 的历史数据。

在此基础上,我们会启动 Flink 流批一体任务,同时消费 DWD 层 Paimon 的 t 当天数据与 t−n 历史数据,并将加工结果写入 Paimon 表的 ADS 层与 DWD 层,分别沉淀 t 当天与历史数据。

此外,基于 Paimon 的 partial-update能力,我们也可以构建离线大宽表,用于承载同一业务对象的多状态聚合。以订单为例,订单存在支付、确收、退款等多种状态,可以构建一张以 order\_id 为主键的 Paimon 大宽表,将这些状态写入同一行记录。这样在使用侧只需读取对应 order\_id 的一条记录,即可获取该订单的多种状态信息,使用成本与分析便利性都会更高。

在 ADS 层,我们沉淀的计算结果主要面向“叶子粒度”的维度:例如类目侧以叶子类目为主;若涉及商家分层维表,则对应叶子商家分层。在数据服务层,我们通过 StarRocks 对外提供数据服务。具体而言,在 StarRocks 层既可以直接读取 ADS 层数据进行点查,也可以直接读取 DWD 层的中间层数据进行在线计算。后一种方式的查询负载通常更重、数据量更大,但在当前实践中,StarRocks 仍然能够将查询时延控制在秒级范围内,在查询量较大的情况下保持较快响应。在查询过程中,我们也可以进一步关联 Paimon 维表,最终将查询结果在数据产品端进行展示与交付。

在我们 的业务场景中,一般来说,DWD 层的中间层事实数据相对稳定;真正“易变”的往往是维度侧的数据,例如 Paimon 维表(类目维表、商家分层维表等)。当业务规则或口径发生调整时,通常只需要更新维表即可。相较于回刷大规模中间层数据,维表更新的成本更低、执行也更快。

更关键的是,我们在查询侧采用现算方式:维表更新后,查询会在读取中间层数据的基础上实时关联最新维表,因此中间层数据无需随业务变更反复回刷。由于中间层计算量较重,如果依赖回刷来响应业务调整,整体周期往往较长——快则一到两天,慢则可能需要一周。通过“更新维表 + 查询现算”的方式,业务变更后可以更快在数据产品侧看到最新结果。

在数据服务层,我们进一步利用 StarRocks 的 Warehouse 机制,对读集群进行隔离与分级保障,避免不同业务互相影响。我们按照业务重要性划分为三类:

  • 默认 Warehouse:保障级别相对一般;
  • 重保 Warehouse:承载最核心业务,保障级别最高;
  • 业务 BI 专用 Warehouse:面向业务 BI 或其他业务的专用资源池,保障级别相对一般。

2 Paimon+StarRocks 在双11大规模 OLAP 查询场景下的实践与优化

2.1 业务背景

在日常情况下,运营和业务 BI 往往在不同时间访问数据产品,因此 StarRocks的瞬时请求量(RPS)整体较低,压力相对平稳。

但在大促期间情况会明显不同。以开卖时段为例,运营和业务 BI 通常会在接下来的一小时内集中访问数据产品,导致 StarRocks 的瞬时请求 RPS 急剧升高,对 StarRocks 集群带来显著挑战。

因此,本部分的实践与优化工作主要围绕“大促场景稳定运行”这一目标展开。

2.2 集群侧保障

1)在应用层面推广数据集缓存策略:目前配置 180 秒的查询缓存窗口。也就是说,同一条查询在 180 秒内被多次触发时,实际下发到 StarRocks 执行的仅为首次请求;后续请求直接复用首次查询结果。通过该策略,可以有效降低大促高峰期 StarRocks 集群的瞬时压力。

2)集群层面的保护机制:集群侧设置了 30 秒的全局超时:如果一条 SQL 在 30 秒内仍未执行完成,会被自动终止。该机制属于 StarRocks 的集群保护能力,当查询执行时间超过 30 秒,即可判定该 SQL 需要进一步优化,不适合直接上线,需要回退并完成优化后再进入生产环境。对于少量确有必要、且在 30 秒内无法完成的特殊 SQL,也支持为单条 SQL 配置更长的超时时间。但此类 SQL 数量通常极少,上线评估也会更加严格,以确保不会对整体集群稳定性产生影响。整体目标是避免单条慢 SQL 拖垮集群。

3)架构层隔离:按业务重要性划分只读实例。基于业务重要性对只读查询资源进行分层,将不同业务的读请求隔离到不同的只读实例上,避免相互干扰。

4)集群初始化配置

在新的 StarRocks 集群初始化时,比较推荐先设置一套基础参数,如下:

  • set global cbo\_cte\_reuse_rate=0;

当 CTE 被多处引用时,可能触发同一数据源的重复读取。例如,一个表在 select 中读取三次,那么 StarRocks会对同一张 Paimont 表执行三次读取,读 I/O 开销相当于被放大为 3 倍。将该参数设置为 0 后,可使同一张表在同一条查询中只读取一次。

•set global query_timeout=30;

设置 30 秒的集群全局查询超时 避免单条慢 SQL 拖垮集群。

•set global new\_planner\_optimize_timeout=10000;

适当调大执行图优化器的超时时间。如果该参数设置过小,SQL 在调度过程中更容易直接失败;适当增大后,可降低 SQL 失败的频率。

•set global pipeline_dop=8;

调整 pipeline 的 DOP,用于控制每台机器上拉起的 driver 数量。压测结果显示,在大促场景中 SQL 请求高度集中,若 DOP 设置过大(例如 64),单条 SQL 在每台机器上会拉起大量 driver,带来调度开销飙升,甚至可能打满 driver 阻塞队列,导致 CPU 利用率反而上不去,集群进入不可用状态。

在我们StarRocks集群的双 11 压测中,DOP 调整到 8 时整体查询表现最优,因此给出 DOP=8 作为建议值。需要强调的是,该值是经验建议,最终仍应以各自集群的压测结果为准进行配置。

•set global scan\_paimon\_partition\_num\_limit=100; --限制scan paimon外表的最大分区,杜绝扫描全表的情况

限制 scan paimon外表的最大分区,用于杜绝因条件缺失或下推失败导致的全表/超大范围扫描。

2.3 核心指标监控

通过观察 StarRocks 核心指标的水位变化,可以持续评估实例健康状况。常用的核心指标如图。

2.4 报警规则

建立 StarRocks 实例的异常报警机制非常关键,它能够帮助及时发现实例异常并快速介入处理。报警项的设置通常围绕“资源水位、节点可用性、调度拥塞、查询失败与时延”几类核心信号展开,其中有一部分阈值来自大促压测与实战探索,具有较强参考价值:

  • BE/CN 的 CPU 与内存使用率设置阈值,例如当使用率持续高于 70% 时触发告警;
  • FE 的 CPU 与内存使用率同样设置 70% 的告警阈值;
  • 在可用性方面,可以监控 BE/CN 或 FE 的可用率是否低于 100%,一旦出现低于 100% 的情况,通常意味着有节点不可用或发生故障。
  • 当 BE 阻塞队列数超过 2000 时,StarRocks 集群的查询时延可能出现陡增;
  • 在查询侧,可以增加查询失败次数与查询时延分位数的告警,例如“查询失败次数大于 n”“查询延迟 TP99 大于 n”。其中 n 的取值需要结合业务特性与可接受的服务水平目标进行配置。

2.5 元数据监控

为更有效地治理 StarRocks的各类查询请求,可以实时获取审计日志,并基于审计日志构建元数据监控大盘,为后续的慢查询 SQL 治理提供数据支撑与定位依据。

select * from _starrocks_audit_db_.starrocks_audit_tbl;

审计日志相关数据落在 StarRocks 的内表中,对应信息可实时查询。也就是说,某条 SQL 执行完成后,可以立即在该内表中查到这条 SQL 的执行耗时等关键字段。基于这一基础能力,如果需要进一步做更细的源数据与查询行为监控,也可以围绕审计日志中记录的 SQL 信息进行扩展。

在监控大盘的组织方式上,支持按 Warehouse 维度拆分(例如划分为多个 Warehouse),同时也可以按数据集进行过滤。在筛选完成后,重点关注的数据字段通常包括:数据集名称、总 CPU 消耗、总查询大小、查询次数、查询行数、失败率与失败次数、单次查询的 CU 消耗、查询时间以及查询发起人等。这些指标支持排序与聚合,便于在优化过程中选取特定时间窗口,对总 CPU 消耗、总查询大小、总查询行数等维度进行 Top SQL 排查与治理。通过优先治理这些“高消耗/高影响”的 SQL,往往能够显著改善集群整体健康状况,因为在许多情况下,集群不稳定的根因来自少量高风险的“坏 SQL”。

2.6 大促保障

大促保障的目标,是把不确定性尽量前置消化,确保开卖高峰期间查询链路稳定可控。

  • 在资源侧,会结合历史数据与业务预测,在大促开始前对 StarRocks 集群进行主动扩容,并在大促结束后主动缩容。
  • 在需求侧,提前与业务负责人对齐本次大促的核心变更点,重点关注改造或新增页面,并将核心页面的 QPS 进行量化,为全链路压测与容量评估做准备。
  • 针对重保页面,我们还会建立一套智能应急机制,分为实例级与查询级两层。实例级故障切换方面,当 StarRocks 主实例不可用时,可通过自动化预案工具(FBI)将重保页面的查询请求批量切换到备库 Warehouse,完成实例级容灾;查询级自动容错方面,当重保页面出现单次查询失败或超时,系统会将该查询自动路由到备库 Warehouse 重试,尽量做到用户无感,为关键 SQL 增加一次“二次机会”,提升整体稳定性。

2.7 大促压测

大促压测通常分为两层: 核心页面单压与全链路压测。

在核心页面单压阶段,会先梳理大促期间的核心页面及新增页面,并对这些页面进行单独压测。这样做的目的,是尽可能在活动前置暴露并解决单点问题导致的性能瓶颈,为后续上线留出精细化优化空间。

在全链路压测阶段,会模拟“所有页面同时达到流量峰值”的极限场景,用以验证 StarRocks 集群在峰值冲击下的整体资源水位与关键性能指标是否符合预期。重点关注的资源水位通常包括 CPU、内存与 I/O,同时结合查询时延等指标,评估集群在极端并发与高负载下的稳定性与承载边界。

2.8 压测发现的问题和优化方案

1)分区裁剪失效或缺少分区过滤,导致扫全表

压测中发现,部分 SQL 因分区裁剪失效或未配置分区过滤条件,出现扫描范围过大甚至扫全表的风险。针对该类问题,治理原则是必须启用分区过滤并确保分区裁剪生效,不允许存在扫全表 SQL 在线运行。

分区裁剪生效的常见写法包括:对分区字段进行日期传参,直接基于分区字段触发裁剪;或使用日期函数触发裁剪,例如 date\_format、date\_add 等函数也可以触发分区裁剪。

分区裁剪失效的典型场景是分区字段与子查询结果进行比较,例如将分区字段与子查询返回的最小活动时间进行对比时,分区裁剪会失效。原因在于分区裁剪发生在 FE 阶段,而子查询需要到 BE 执行,FE 在规划阶段无法获得子查询结果,从而无法生成有效的分区裁剪信息。

2)读取 Paimon 生表时小文件过多,导致读取数据块数过大

压测还发现,读取 Paimon 表时存在小文件过多的问题。

定位方法:在 StarRocks 执行 SQL 时可开启 profile(通过 hint:/+ SET\_VAR(enable\_profile = true) /)生成 profile 文件;在 profile 中搜索 “metadata”,其中 nativeReaderReadNum 表示读取的数据块数,nativeReaderReadBytes 表示读取的字节数。实践中,当单个分区的 nativeReaderReadNum 大于 200 时,通常建议考虑对表进行排序治理。

优化方案:在构建流批排序Paimon表时,建议采用分支表模式:离线分支将 bucket 设为 -1,实时分支按需设置 bucket。离线分支表通过 clustering columns 指定排序字段,可支持指定多个字段(如 f1、f2),一般选择 OLAP 查询中最常用的过滤字段,以提升过滤命中与读取效率。该能力仅支持 Flink 批写入,不支持 ODPS 写入;写入表时需要使用 hint: /*+ OPTIONS('sink.parallelism' = '64') */。对于 ODPS 写入的 Paimon 表,则需要在任务下挂一个单独的 compact 排序任务。

为何有效:在双 11 场景下,活动周期往往持续数十天。当天数据属于实时增量,而从活动开始到昨天的历史数据占比更大;因此对离线数据进行表排序收益显著。压测实测显示,排序后读取的数据块数约为排序前的 1/1000。 离线分支完成排序后,活动开始到昨天(占比最大的历史数据)基本都处于“已排序、数据块读取量很小”的状态;实时分支由于无法排序,读取的数据块会相对多一些,但实时数据通常只存在于当天,整体占比小,因此对整条 SQL 的查询时延影响相对有限。

3)检查是否命中 MapJoin:小维表建议显式 broadcast

当 SQL 需要 join 小表(例如小于 10MB 的维表)时,建议在维表前显式加 broadcast,以触发类似离线 MapJoin 的执行策略。实测显示,引入 broadcast 后查询时延可显著下降,典型场景下可从十几秒优化到约 3 秒,整体查询时延约为原先的 1/3。

SELECT xxx FROM table_a t0 LEFT JOIN [broadcast] dim_table_b t1 ON t0.cate_id = t1.slr_main_cate_id AND t1.ds = 'xxx'

4)检查跨地域访问:计算与存储尽量同地域部署

还需要确认 StarRocks 实例与所读取的 Paimon 表是否处于同一地域。若不在同一地域,查询时延会明显增加。建议将 StarRocks 的部署地域与 Paimon 表存储地域保持一致。

5)主键表建议开启 deletion vectors:减少无效数据读取

对于 Paimon 主键表,建议开启 'deletion-vectors.enabled' = 'true'参数。该能力会在写入阶段记录哪些主键数据已被删除;读取时可跳过已删除数据,减少无效扫描,从而提升查询性能。非主键表不需要开启该参数。

3 阶段成果与未来规划

3.1 阶段成果

整体来看,该方案带来了四方面阶段性成果。

  • 数据链路得到简化:通过统一存储与统一查询面,消除了数据同步链路,并降低了多份存储带来的成本与复杂度。
  • 数据使用门槛显著降低:基于 Paimon 的实时/离线中间层,不仅数据开发人员可以使用,业务分析师也可以通过 StarRocks 自助消费近实时数据,从而减少部分简单需求对数据研发排期的依赖。
  • 回刷开销得到明显削减核心场景的回刷效率提升约 80%,年化节省成本接近 1000 万。其关键在于查询可以直接读取 Paimon 公共层并关联 Paimon 维表,业务变更时只需刷新维表,无需回刷与该维表相关的整条数据链路。
  • 在高性能实时分析方面,低成本解决了跨天交叉维度实时 UV 的计算难题,满足大促期间近实时决策需求。具体做法是将可累加指标(如订单数、订单支付金额等)与不可累加指标(如 user\_id)分开处理:可累加指标在查询侧直接聚合;不可累加指标则将 user\_id 做 RB 化后存入中间层,StarRocks 读取 Paimon 表时通过 RB 相关函数计算 UV。

3.2 未来规划

面向下一阶段,规划主要集中在四个方向。

第一, 希望 StarRocks 具备更强的自动物化能力:针对用户高频查询的 SQL 自动生成物化结果,并在后续查询中自动完成改写,直接命中物化表。由于物化表往往已经完成聚合,其数据量相较直接查询中间层可以小很多个量级,从而显著降低扫描与计算开销,进一步提升查询速度与稳定性。

第二,计划进一步 丰富 StarRocks 的元数据能力

第三, 优化 StarRocks 的调度策略,重点是调度层面的 CPU 负载均衡能力。

第四,希望 StarRocks 具备直接读取 Fluss 的能力,从而支持秒级查询场景。目前 Paimon 仍以分钟级链路为主,如果能够在读取侧进一步下探到 Fluss,将更好覆盖对秒级实时性有明确诉求的业务场景。

从Kafka到AutoMQ:爱奇艺实时流数据架构演进

概述

本文详细介绍了爱奇艺在处理大规模实时流数据时,从传统Kafka架构向AutoMQ演进的技术历程。为了解决私有云环境下集群扩缩容难、资源利用率低以及运维成本高等挑战,爱奇艺开发了Stream平台与Stream-SDK,实现了业务与底层存储的彻底解耦。随后,公司引入公有云服务并最终切换至基于存算分离架构的AutoMQ,利用其单副本存储和秒级弹性的特性,显著提升了系统的灵活性。这一系列的架构升级不仅优化了数据治理体系,还成功将运营成本降低了70%以上。目前,爱奇艺正持续扩大AutoMQ的应用规模,以进一步实现降本增效的长期目标。

背景

Kafka因其高吞吐、低延时、可扩展的特性,在出现之后迅速成为流数据存储的标准组件,广泛应用于实时大数据场景。爱奇艺的流数据服务也主要基于Kafka构建,随着实时大数据应用越来越广泛,Kafka集群数量、规模越来越大,面临扩缩容繁琐、成本高、难治理等诸多问题与挑战。为解决这些问题,我们进行了Kafka服务化、上云、迁移AutoMQ等一系列探索。

本文将介绍爱奇艺Kafka从私有云迈向公有云、从Kafka到AutoMQ的探索与实践。

流数据在爱奇艺的应用

图1 数据通路

在爱奇艺,流数据的存储组件使用的是Kafka,计算组件主要使用的是Flink,流数据相关的典型数据通路如图1所示,主要包括如下环节:

  • 数据集成:Pingback(端上投递日志)、后端日志、数据库binlog、指标等持续产生的流数据,实时写入数据总线Kafka。
  • 数据仓库:由Flink程序将数据引入到实时(流式)、离线(批式)数仓。在实时数仓中,数据仍然以流数据形态存储在Kafka中,并通过Flink构建实时数仓各层数据。在离线数仓中,流数据将会聚集成批数据存储在Iceberg中,再由 Flink增量消费Iceberg构建离线数仓各层数据。实时数仓具备秒级延时,离线数仓具备分钟级以上延时。
  • 数据开发:数仓的数据通过数据开发平台应用到各业务场景。在实时计算中Kafka也会作为中间流数据的存储用于任务之间的解耦。
  • 数据应用:数据广泛应用到爱奇艺的推荐、搜索、广告、报表等等场景中。数据的价值随着延时增大快速衰减,为了数据价值最大化,近几年主要应用场景都已切换到流数据。

Kafka作为流数据的存储承担数据集成到大数据体系的数据总线、实时数仓存储、实时任务之间解耦等角色。

流数据存储服务:从管集群到管数据

爱奇艺的流数据服务最初以Kafka集群为核心构建,提供集群生命周期管理、Topic管理、消费监控等基础能力。随着业务规模扩大、集群数量和数据量持续增长,逐渐暴露出以下问题:

  1. 业务与集群强耦合:业务代码直接依赖Kafka地址访问集群,一旦需要迁移或调整集群,必须修改业务代码并重新上线,不灵活。同时也无法从平台侧统一识别和监控各业务的读写行为。
  2. 缺乏统一的数据与schema管理:平台没有管理数据描述、schema、数据归属等元数据信息,无法提供数据查找功能,不利于跨团队的数据理解、复用与治理。
  3. 主备数据管理缺失:对重要数据,业务侧通常配置主备链路,但平台侧缺乏对主备关系的统一管理,难以做到一致性保障与故障切换治理。

为了解决上述问题,我们将流数据存储服务升级到了如图2所示的架构,由Stream平台、Stream-SDK、存储组件三部分构成。

图2 流数据服务架构

先介绍下Stream平台,Stream-SDK和存储组件后面介绍。Stream平台由“集群管理”和“数据管理”两大模块组成。集群管理负责集群生命周期与底层资源的统一管理,侧重运维侧能力。数据管理是平台的核心,以“数据为中心”构建,面向数据开发人员提供统一的数据视图和管理能力,核心功能如下:

  1. 逻辑队列:原先“集群+Topic”定位数据的方式,升级为基于“项目+队列(Topic)”的逻辑命名方式,集群仅作为队列的一个属性,消除业务对具体集群的依赖。逻辑队列还支持同时绑定主备两个集群,结合Stream-SDK可实现主备链路的一键切换。
  2. Schema管理:支持为队列配置schema,并自动同步至大数据元数据中心,使队列能够在数据开发平台中自动映射为逻辑表,使用SQL直接处理流数据。
  3. 数据地图:提供队列的多维度查询与检索能力,支持在线申请和授权使用队列,简化跨团队的数据查找和复用流程。
  4. 数据血缘:基于Stream-SDK自动上报的读写端信息,构建应用级的读写血缘链路,帮助快速定位上下游数据关系及影响范围。

Stream-SDK:统一的流数据读写客户端

Stream-SDK是平台提供的统一数据访问客户端,封装了底层原生客户端,兼容Kafka协议和RocketMQ。业务仅需配置“项目+队列”,即可完成数据读写,无需关注具体集群地址或认证方式,从而实现业务代码与底层集群的彻底解耦。

图 3 Stream SDK 读写数据过程

Stream-SDK的数据读写流程如图3所示,主要包括两个阶段:

  1. 配置获取与上报

基于业务提供的项目、队列和Token(用于鉴权),SDK调用Stream平台的配置API,获取队列对应的集群信息、Topic、认证参数等配置,并使用原生客户端执行读写。同时,SDK会通过该API上报客户端IP、消费组、应用名称等信息,平台据此实时构建读写血缘。

  1. 集群变更感知与自动切换

在运行期间,SDK每分钟与Stream平台进行心跳交互,实时感知队列关联的集群是否发生变更。一旦检测到变化,SDK会自动将读写流量切换至新集群,实现无感迁移。

借助Stream-SDK,集群的迁移成本大幅降低,也为后续从私有云迈向公有云、从Kafka切换到AutoMQ的架构演变做好了准备。

Kafka混合多云建设

早期爱奇艺Kafka集群部署在私有云IDC,受制于IDC资源供给模式及Kafka架构固有特性,资源利用率难以保持在合理区间。自2023年起,平台逐步引入多家公有云Kafka,形成混合云架构,在资源弹性、运维效率和成本优化方面取得了显著成效。下文将介绍下上云过程。

私有云Kafka

![
图4 Kafka 架构](https://image.automq.com/20260126bot/atub35.png)

Kafka架构如图4所示,是经典的多副本容错分布式架构,由Broker和Zookeeper两类角色组成:Broker负责数据存储与客户端读写,Zookeeper负责管理集群的元数据与协作状态。在私有云中,Kafka部署在爱奇艺各IDC,其中Zookeeper通常以虚机部署,Broker则根据场景选择虚机或物理机。

私有云模式支撑了公司流数据规模的快速增长,但随着业务体量持续扩大,也逐渐暴露出以下问题:

  1. 集群弹性差:Kafka的Shared Nothing架构虽然简单可靠,但每个Broker上都存储大量数据,导致扩容或缩容时必须在Broker间进行大规模数据迁移。迁移过程耗时长且会影响业务任务的读写性能,使得集群难以实现平滑弹性伸缩。
  2. 资源弹性不足:私有云的物理资源从采购到报废周期较长,难以随业务流量动态变化而快速调整,导致集群资源利用率长期处于“过高或过低”的状态。同时,对于寒暑假、重点直播等短时流量高峰,也难以做到按需扩缩,影响系统整体资源效率与成本优化。

从私有云Kafka到公有云Kafka

为实现降本增效并提升流数据存储的灵活性,我们引入并上线了公有云Kafka产品。

公有云Kafka产品遵循Kafka协议,通过在Stream平台与Stream-SDK中进行统一适配,为业务侧提供一致、无差异的使用体验,实现了私有云与公有云之间统一接入和平滑切换。

借助公有云庞大的资源池和按需创建集群的能力,解决了私有云环境下资源弹性不足的问题,取得20%以上的降本效果。

从Kafka到AutoMQ

公有云Kafka虽然解决了资源弹性不足的问题,但是依然有集群弹性差的问题。新出现的AutoMQ支持秒级弹性吸引了我们的注意。

图 5 AutoMQ 架构

AutoMQ采用存算分离架构,如图所示,具备如下特性:

  1. 共享存储:数据统一存储在对象存储中,Broker不再持有本地数据。为解决对象存储延迟高、IOPS较低的问题AutoMQ引入块存储作为WAL(Write-Ahead Log),数据先写入WAL再进行批量落盘到对象存储。
  2. 单副本存储:云端的块存储和对象存储本身具备多副本特性,已在存储层保证了高可用,因此AutoMQ内部的Topic均采用单副本策略,避免传统Kafka中Broker之间的副本同步开销,大幅降低成本与数据复制压力。
  3. 兼容Kafka协议:AutoMQ基于开源Kafka改造,保留计算层逻辑,替换底层存储实现,完全兼容Kafka协议。
  4. 快速弹性:由于Broker不再存储数据,节点可快速启动或销毁,实现分钟级弹性;同时对象存储按量计费,使资源规模能够与业务流量保持高度匹配,避免资源浪费。

在完成相关性能与稳定性验证后,我们在公有云环境部署了AutoMQ,并将其纳入流数据服务存储体系。通过Stream平台逐步将私有云Kafka、公有云Kafka迁移至AutoMQ,成本进一步降低70%以上。

总结及规划

流数据因其低延时特性,已成为爱奇艺的重要数据通路。随着规模增长,传统私有云Kafka在弹性、成本与治理上逐渐遇到瓶颈,因此,流数据存储架构从“管集群”转向“管数据”,并通过Stream平台与Stream-SDK实现解耦与统一治理。随后引入公有云Kafka和AutoMQ,使系统在弹性、运维效率和成本上都实现了显著提升。

爱奇艺目前约40%的流量已迁移到公有云Kafka或AutoMQ,其中一半是AutoMQ,下一步将继续扩大AutoMQ的使用规模,并探索AutoMQ的自适应自动弹性机制,持续降本。

小T导读:京能集团在储能安全管理平台中采用 TDengine TSDB 作为底层时序数据库。依托 TDengine 企业版的零代码数据写入平台,来自全国 28 家电化学储能电站的数据能够按照统一编码规则高效接入 TDengine 时序数据库中,实现了稳定、高性能的数据采集与管理。在此基础上,借助 TDengine TSDB Flink Connector,系统可快速、稳定地从数据库中读取海量数据,开展实时分析与智能处理,充分释放数据的潜在价值。本文将结合该项目的实践过程,为大家带来深入分享与参考。

项目背景

京能集团储能安全管理平台共接入全国 28 家电化学储能电站,累计测点达 270 万个,由四个平台公司分别负责数据传输与汇聚。系统需要支撑大规模的数据统计分析、事件报警与安全预警,对底层数据库的性能与稳定性提出了极高要求。

鉴于电化学储能项目采集点数量庞大(270 万点)、锂电池热失控的超前预警技术复杂等因素,传统关系型数据库已无法满足高并发写入与海量数据存储的需求。由于这些数据具备时间序列写入、格式固定、写入量巨大等典型特征,我们最终选择采用时序数据库作为系统核心数据底座。

应用实际落地

在充分调研国内多款时序数据库产品后,我们发现,从国内目前的实际情况分析,TDengine TSDB 已成为众多企业在海量数据高速存储、处理与调用场景中的首选方案。基于其成熟的技术体系与稳定的性能表现,我们最终选定 TDengine TSDB 作为平台的底层时序数据库,并结合 Kafka 与 Flink 构建了完整的数据流处理体系,实现了数据的高效传输与实时计算,顺利达成项目预期目标。以下是架构简图:

TDengine TSDB 支持多种写入方式

  1. SQL 语言写入 :https://docs.taosdata.com/basic/insert/
  2. 无模式写入:https://docs.taosdata.com/develop/schemaless/
  3. 参数绑定方式:https://docs.taosdata.com/develop/stmt/
  4. 企业版的零代码数据写入— taosExplorer 数据接入功能:https://docs.taosdata.com/advanced/data-in/

项目中涉及多个 Kafka 集群、数十个需要接入的 topic。我们重点采用了 TDengine 企业版的零代码数据写入能力,实现了从 Kafka 到 TDengine TSDB 的高效对接。该功能支持灵活配置类似 ETL 的复杂自定义选项,极大简化了数据接入流程和时间,而且数据接入性能完全达到了项目要求。

为了保证数据的合理性,我们出台了《京能集团电化学储能电站安全管理平台和储能电站设备标识编码规则》,通过标准的 kks 编码在 taosX 对 Kafka 数据进行了有效过滤和清理,最终写入 TDengine TSDB。kks 部分编码实例如下:

下图为数据过滤、转换等规则设置:

此外,taosX 数据接入还支持多节点高可用配置。只需在多台 taosX 上部署相同的 Kafka 数据接入任务,并设置相同的 groupId,即可自动实现任务高可用,确保数据接入的连续性与稳定性。

同时,TDengine 还提供完善的 taosX 任务监控机制,可直接通过 Grafana 一键配置,快速生成可视化监控图表:

超级表 + 子表的使用

TDengine TSDB 结合“一个数据采集点一张表”的设计理念,引入了具有创新性的“超级表”机制,从根本上解决了大规模时序数据结构不统一、聚合困难、运维复杂等问题。每个采集点的数据独立存储,天然具备写入无锁、数据顺序追加、块状连续存储等优势。这种设计方式不仅提升了写入与查询性能,还带来了极高的数据压缩效率。

TDengine TSDB 支持对超级表标签进行动态的添加、修改与删除操作,满足设备属性变更、系统扩展等业务需求。

计算、分析处理

在 Flink 计算平台上,我们借助 TDengine TSDB 企业版提供的 Flink 连接器——TDengine TSDB Flink Connector(https://docs.taosdata.com/advanced/data-publisher/Flink/),实现了与 TDengine TSDB 的无缝集成。该连接器可高效、稳定地从 TDengine TSDB 中读取海量时序数据,并在此基础上进行全面、深入的分析处理,充分挖掘数据的潜在价值,极大地提升数据处理的效率和质量。

Flink CDC 主要用于提供数据订阅功能,能实时监控 TDengine TSDB 数据库的数据变化,并将这些变更以数据流形式传输到 Flink 中进行处理,同时确保数据的一致性和完整性。

落地效果

  1. 数据接入便利性:目前我们已接入 20 多个 kafka 数据,后期还会继续增加。得益于 TDengine 企业版零代码数据接入能力,新增任务仅需复制并做少量参数调整即可完成,操作简便高效,整体接入过程较传统方式节省约 90% 的时间成本
  2. 数据查询性能高:开启数据库缓存功能后,能够实时获取每个设备点位最新值,毫秒级别即可返回结果
  3. 数据存储成本低:TDengine TSDB 具备出色的数据压缩能力,其二级压缩技术将数据视作无差别的二进制块进行再次压缩。与一级压缩相比,二级压缩的侧重点在于消除数据块之间的信息冗余。目前我们提供的服务器存储远远满足我们项目规划的 5 年数据存储,存储成本估算节省至少 60-70%
  4. 实时订阅:通过 TDengine 提供的 Flink CDC 实时订阅功能,能方便、高效的进行分析、告警等处理,给我们后期分析带来了极大的便利性。

后期规划

目前,我们正在对京能集团储能安全管理平台已经接入的 28 场站数据进行分析和优化,提高数据采集的可靠性和鲁棒性。未来我们会针对 TDengine TSDB 新版本和新功能进行持续跟踪,进一步开发 TDengine TSDB 的内在潜力和各种有效的功能。

近期我们关注到 TDengine 发布了新产品 TDengine IDMP,通过经典的树状层次结构组织传感器、设备采集的数据,建立数据目录,对数据提供情境化、标准化的处理,并提供实时分析、可视化等功能,接下来我们会进一步了解此产品在我们业务中的使用可能。

关于京能集团

北京能源集团有限责任公司是北京市人民政府出资设立的国有独资公司,肩负着保障首都北京能源安全可靠供应的重任。京能集团成立于 2004 年,由原北京国际电力开发投资公司和原北京市综合投资公司合并而成,2011 年、2014 年先后又与北京市热力集团有限责任公司、北京京煤集团有限责任公司实施合并重组,实现了产业链条融合互补。经过多年的资源整合,集团由单一能源产业发展为热力、电力、煤炭、健康文旅等多业态产业格局。2024 年在中国企业 500 强排名第 247 位,中国服务企业 500 强排名第 87 位。

作者:张海增

数字公告板提供商 Pinterest 发布了一篇文章,解释了其新平台Moka在大规模数据处理方面的未来蓝图。该公司正在将核心工作负载从老化的 Hadoop 基础设施迁移到基于 Kubernetes 的系统上,该系统运行在亚马逊 EKS 上,以 Apache Spark 作为主要引擎,并即将支持其他框架。

 

在一个包含两篇文章的博客系列中,Soam Acharya、Rainie Li、William Tom 和 Ang Zhang 描述了 Pinterest 大数据平台团队如何考虑下一代大规模数据处理平台的替代方案,因为现有的基于 Hadoop 的系统(内部称为 Monarch)的局限性变得越来越明显。他们将 Moka 作为搜索的结果,以及基于 EKS 的云原生数据处理平台,该平台现在运行的生产负载达到了 Pinterest 的规模。该系列的第一部分关注整体设计和应用层。相比之下,第二部分转向作者所说的“Moka 的基础设施重点方面,包括经验和未来方向”。

 

文章从实际角度描述了向 Kubernetes 的转变。它展示了一个全行业的转变,即大型技术公司现在将 Kubernetes 视为数据的控制平面,而不仅仅是无状态的服务平台。在大数据社区日益增长的受欢迎程度和越来越多的采用的鼓励下,团队探索了基于 Kubernetes 的系统,作为 Hadoop 2.x 最有可能的替代品。任何候选平台都必须满足可扩展性、安全性、成本以及托管多个处理引擎的精确标准。Moka 是如何在不放弃现有 Spark 投资的情况下现代化 Hadoop 时代的数据平台的一个例子。

 

第二篇文章的核心主题是如何在 Kubernetes 上以非常大的规模运行 Spark。作者解释了他们如何围绕 Moka 添加日志、指标和作业历史服务,以便工程师可以在不了解底层集群拓扑的情况下调试和调整作业。他们使用 Fluent Bit 对日志集合进行标准化,并使用 OpenTelemetry 和 Prometheus 兼容的端点发布统一指标。这为基础设施和应用程序团队提供了系统健康的一致视图。

 

Pinterest 还投资于通过基础设施即代码的方式使平台可重复使用。在文章中,团队概述了他们如何使用 Terraform 和 Helm 创建 EKS 集群、配置网络和安全以及部署支持组件,如 Spark 历史服务器。

 

Pinterest 的工程师还讨论了处理不同的硬件架构。他们描述了他们如何构建多架构镜像,以便他们的数据工作负载在 Intel 和基于 ARM 的实例上运行良好,包括 AWS Graviton,并将此与集群规模的成本和效率目标联系起来。InfoQ 编辑 Eran Stiller 在 LinkedIn上对该项目中的总结指出,Moka“提供了容器级别的隔离、ARM 支持、YuniKorn 调度,并通过整合工作负载和跨实例类型的自动扩展实现了显著的成本节省”。这些细节将工作置于云用户寻求在不牺牲性能的情况下削减基础设施成本的更大趋势之中。

 

关于处理引擎的更广泛的行业对话为 Pinterest 的故事增添了细微差别。在另一篇LinkedIn帖子中,Acharya 写道:“虽然 Spark 是我们的主要主力,但 Moka 的成功意味着 Pinterest 的其他用例也在效仿:Flink Batch 已经投入生产,Apache Ray 紧随其后,Flink Streaming 也将在今年晚些时候推出”。通过对 Spark 和 Flink 技术的深入探讨,我们可以了解到这一点的重要性。强调 Spark 仍然非常适合大型批处理和交互式分析工作负载,而 Flink 是“为实时、有状态的流处理而构建的”,具有严格的逐事件处理。团队将 Moka 呈现为一个灵活的基础,可以根据特定工作负载的需求添加不同的引擎,而不是一个只支持 spark 的平台。

 

外部观察者从 Pinterest 案例中吸取了教训。ML工程师通讯将 Moka 文章描述为“在 Kubernetes 上部署 EKS 集群、Fluent Bit 日志、OTEL 指标管道、镜像管理和 Spark 的自定义 Moka UI”的例子,将其与其他现代数据基础设施案例研究并列。这些反应表明,Moka 被视为一类云原生数据系统的参考架构。

 

然而,团队确实将他们的迁移工作呈现为一个正在进行的旅程,而不是一个已经完成的项目。在博客和进一步的LinkedIn帖子中,Pinterest 作者讨论了“经验和未来的方向”,并描述了早期概念验证如何导致随着对新堆栈的信心增长而逐步远离 Hadoop 的迁移。Acharya 指出,“最好的问题出现在规模上”,构建平台涉及“解决难题”,因为团队转移了实际工作负载。对于其他组织来说,这种经验可能是最重要的教训。复制围绕 Kubernetes、EKS 和 Spark 的技术选择相对简单,但从遗留系统中解耦并投资于可观测性、自动化和多引擎支持的过程可能是未来真正的工作。

 

原文链接:

https://www.infoq.com/news/2026/01/pinterest-kubernetes-bigdata/