标签 ClickHouse 下的文章

一、背景与痛点

业务场景

在实时大数据处理场景中,Flink + ClickHouse 的组合被广泛应用于:

  • 日志处理: 海量应用日志实时写入分析库。
  • 监控分析: 业务指标、APM 数据的实时聚合。

这些场景的共同特点:

  • 数据量大:百万级 TPS,峰值可达千万级。
  • 写入延迟敏感: 需要秒级可见。
  • 数据准确性要求高:不允许数据丢失。
  • 多表写入: 不同数据根据分表策略写入不同的表。

开源 Flink ClickHouse Sink 的痛点

Flink 官方提供的 ClickHouse Sink(flink-connector-jdbc)在生产环境中存在以下严重问题:

痛点一:缺乏基于数据量的攒批机制

问题表现:

// Flink 官方 JDBC Sink 的实现
public class JdbcSink<T> extends RichSinkFunction<T> {
    private final int batchSize;  // 固定批次大小
    @Override
    public void invoke(T value, Context context) {
        bufferedValues.add(value);
        if (bufferedValues.size() >= batchSize) {
            // 只能基于记录数攒批,无法基于数据量
            flush();
        }
    }

带来的问题:

  1. 内存占用不可控: 100 条 1KB 的日志和 100 条 10MB 的日志占用内存差距 100 倍。
  2. OOM 风险高: 大日志记录(如堆栈转储)会迅速撑爆内存。
  3. 写入性能差: 无法根据记录大小动态调整批次,导致小记录批次过大浪费网络开销。

痛点二:无法支持动态表结构

问题表现:

// Flink 官方 Sink 只能写入固定表
public class JdbcSink {
    private final String sql;  // 固定的 INSERT SQL
    public JdbcSink(String jdbcUrl, String sql, ...) {
        this.sql = sql;  // 硬编码的表结构
    }
}

带来的问题:

  1. 多应用无法隔离: 所有应用的数据写入同一张表,通过特定分表策略区分。
  2. 扩展性差: 新增应用需要手动建表,无法动态路由。
  3. 性能瓶颈: 单表数据量过大(百亿级),查询和写入性能急剧下降。

痛点三:分布式表写入性能问题

问题表现:

// 大多数生产实现直接写入分布式表
INSERT INTO distributed_table_all VALUES (...)

ClickHouse 分布式表的工作原理:

带来的问题:

  1. 网络开销大: 数据需要经过分布式表层转发,延迟增加。
  2. 写入性能差: 分布式表增加了路由和转发逻辑,吞吐量降低。
  3. 热点问题: 所有数据先到分布式表节点,再转发,造成单点瓶颈。

生产级方案的核心改进

针对以上痛点,本方案提供了以下核心改进:

改进一:基于数据量的攒批机制

public class ClickHouseSinkCounter {
    private Long metaSize;  // 累计数据量(字节)
    public void add(LogModel value) {
        this.values.add(value);
        this.metaSize += value.getMetaSize();  // 累加数据量
    }
}
// 触发条件
private boolean flushCondition(String application) {
    return checkMetaSize(application)  // metaSize >= 10000 字节
        || checkTime(application);     // 或超时 30 秒
}

优势:

  • 内存可控: 根据数据量而非记录数攒批。
  • 精确控制: 1KB 的记录攒 10000 条 = 10MB,1MB 的记录攒 10 条 = 10MB。
  • 避免OOM: 大日志记录不会撑爆内存。

改进二:动态表结构与分片策略

public abstract class ClickHouseShardStrategy<T> {
    public abstract String getTableName(T data);
}
//日志侧实现为应用级分表
public class LogClickHouseShardStrategy extends ClickHouseShardStrategy<String> {
    @Override
    public String getTableName(String application) {
        // 动态路由:order-service → tb_logs_order_service
        return String.format("tb_logs_%s", application);
    }
}

优势:

  • 应用隔离: 日志侧内置应用级分表,每个应用独立分表。
  • 动态路由: 根据 application 自动路由到目标表。
  • 扩展性强: 新增应用无需手动建表(配合 ClickHouse 自动建表)。

改进三:本地表写入 + 动态节点发现

public class ClickHouseLocalWriter extends ClickHouseWriter {
    // 直接写本地表,避免分布式表转发
    private final ConcurrentMap<String, HikariDataSource> dataSourceMap;
    @Override
    public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
        // 1. 动态获取集群节点列表
        List<String> healthyHosts = getHealthyHosts(exceptionHosts);
        // 2. 随机选择健康节点
        return dataSourceMap.get(healthyHosts.get(random.nextInt(size)));
    }
}

优势:

  • 性能提升: 直接写本地表,避免网络转发。
  • 高可用: 动态节点发现 + 故障节点剔除。
  • 负载均衡: 随机选择 + Shuffle 初始化。

技术方案概览

基于以上改进,本方案提供了以下核心能力:

  1. 本地表/分布式表写入: 性能优化与高可用平衡。
  2. 分片策略: 按应用维度路由与隔离。
  3. 攒批与内存控制: 双触发机制(数据量 + 超时)。
  4. 流量控制与限流: 有界队列 + 连接池。
  5. 健壮的重试机制: 递归重试 + 故障节点剔除。
  6. Checkpoint 语义保证: At-Least-Once 数据一致性。

二、核心架构设计

架构图

核心组件

核心流程

三、本地表 vs 分布式表写入

ClickHouse 表结构说明

ClickHouse 推荐直接写本地表,原因:

  1. 写入性能: 避免分布式表的网络分发。
  2. 数据一致性: 直接写入目标节点,减少中间环节故障点,比分布式表写入更安全,利于工程化。
  3. 负载均衡: 客户端路由实现负载分散。
-- 本地表(实际存储数据)
CREATE TABLE tb_logs_local ON CLUSTER 'default' (
    application String,
    environment String,
    message String,
    log_time DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(log_time)
ORDER BY (application, log_time);
-- 分布式表(逻辑视图,不存储数据)
CREATE TABLE tb_logs_all ON CLUSTER 'default' AS tb_logs_local
ENGINE = Distributed('default', dw_log, tb_logs_local, cityHash64(application));

HikariCP 连接池配置

// HikariCP 连接池配置
public class ClickHouseDataSourceUtils {
    private static HikariConfig getHikariConfig(DataSourceImpl dataSource) {
        HikariConfig config = new HikariConfig();
        config.setConnectionTimeout(30000L);    // 连接超时 30s
        config.setMaximumPoolSize(20);          // 最大连接数 20
        config.setMinimumIdle(2);               // 最小空闲 2
        config.setDataSource(dataSource);
        return config;
    }
    private static Properties getClickHouseProperties(ClickHouseSinkCommonParams params) {
        Properties props = new Properties();
        props.setProperty("user", params.getUser());
        props.setProperty("password", params.getPassword());
        props.setProperty("database", params.getDatabase());
        props.setProperty("socket_timeout", "180000");      // Socket 超时 3 分钟
        props.setProperty("socket_keepalive", "true");      // 保持连接
        props.setProperty("http_connection_provider", "APACHE_HTTP_CLIENT");
        return props;
    }
}

配置说明:

  • maxPoolSize=20:每个 ClickHouse 节点最多 20 个连接。
  • minIdle=2:保持 2 个空闲连接,避免频繁创建。
  • socket_timeout=180s:Socket 超时 3 分钟,防止长时间查询阻塞。

ClickHouseLocalWriter:动态节点发现

public class ClickHouseLocalWriter extends ClickHouseWriter {
    // 本地节点缓存,按 IP 维护
    private final ConcurrentMap<String, HikariDataSource> dataSourceMap;
    // 动态获取集群本地表节点
    private final ClusterIpsUtils clusterIpsUtils;
    // IP 变更标志(CAS 锁,避免并发更新)
    private static final AtomicBoolean IP_CHANGING = new AtomicBoolean(false);
    @Override
    public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
        // 1️⃣ 检测集群节点变化(通过 CAS 避免并发更新)
        if (clusterIpsChanged() && IP_CHANGING.compareAndSet(false, true)) {
            try {
                ipChanged(); // 动态更新 dataSourceMap
            } finally {
                IP_CHANGING.set(false);
            }
        }
        // 2️⃣ 获取异常节点列表(从 Redis + APM 实时查询)
        Set<String> exceptIps = clusterIpsUtils.getExceptIps();
        exceptIps.addAll(exceptionHosts);
        // 3️⃣ 过滤健康节点,随机选择
        List<String> healthyHosts = dataSourceMap.keySet().stream()
            .filter(host -> !exceptIps.contains(host))
            .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(healthyHosts)) {
            throw new RuntimeException("Can't get datasource from local cache");
        }
        return dataSourceMap.get(healthyHosts.get(random.nextInt(healthyHosts.size())));
    }
    private void ipChanged() {
        List<String> clusterIps = clusterIpsUtils.getClusterIps();
        // 新增节点:自动创建连接池
        clusterIps.forEach(ip ->
            dataSourceMap.computeIfAbsent(ip, v ->
                createHikariDataSource(ip, port)
            )
        );
        // 移除下线节点:关闭连接池
        dataSourceMap.forEach((ip, ds) -> {
            if (!clusterIps.contains(ip)) {
                dataSourceMap.remove(ip);
                ds.close();
            }
        });
    }
}

核心逻辑:

  1. 动态节点发现: 从 system.clusters 查询所有节点。
  2. 自动扩缩容: 节点上线自动加入,下线自动剔除。
  3. 故障节点剔除: 通过 APM 监控,自动剔除异常节点。
  4. 负载均衡: 随机选择健康节点,避免热点。

集群节点动态发现(ClusterIpsUtils)

public class ClusterIpsUtils {
    // 从 system.clusters 查询所有节点
    private static final String QUERY_CLUSTER_IPS =
        "select host_address from system.clusters where cluster = 'default'";
    // LoadingCache:定时刷新节点列表(1 小时)
    private final LoadingCache<String, List<String>> clusterIpsCache =
        CacheBuilder.newBuilder()
            .expireAfterAccess(10, TimeUnit.HOURS)
            .refreshAfterWrite(1, TimeUnit.HOURS)
            .build(CacheLoader.asyncReloading(new CacheLoader<>() {
                @Override
                public List<String> load(String dbName) {
                    return queryClusterIps();  // 定时刷新节点列表
                }
            }));
    // 异常节点缓存(1 分钟刷新)
    private final LoadingCache<String, FlinkExceptIpModel> exceptIpsCache =
        CacheBuilder.newBuilder()
            .refreshAfterWrite(1, TimeUnit.MINUTES)
            .build(CacheLoader.asyncReloading(new CacheLoader<>() {
                @Override
                public FlinkExceptIpModel load(String dbName) {
                    return queryExceptIp();  // 从 Redis + APM 查询异常节点
                }
            }));
}

异常节点监控策略:

  • 磁盘使用率 >= 90%: 从 APM 查询 Prometheus 指标,自动加入黑名单。
  • HTTP 连接数 >= 50: 连接数过多说明节点压力大,自动加入黑名单。
  • 人工配置: 通过 Redis 配置手动剔除节点

数据来源:

  1. ClickHouse system.clusters 表: 获取所有集群节点。
  2. APM Prometheus 接口: 监控节点健康状态。
  3. Redis 缓存: 人工配置的异常节点。

负载均衡优化

public class ClickHouseWriter {
    public <T> ClickHouseWriter(...) {
        // Shuffle:随机打乱数据源顺序
        Collections.shuffle(clickHouseDataSources);
        this.clickHouseDataSources = clickHouseDataSources;
    }
    public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
        // 轮询 + 随机选择(已 shuffle,避免热点)
        int current = this.currentRandom.getAndIncrement();
        if (current >= clickHouseDataSources.size()) {
            this.currentRandom.set(0);
        }
        return clickHouseDataSources.get(currentRandom.get());
    }
}

优势:

  • 初始化时 shuffle,避免所有 writer 同时从第一个节点开始。
  • 轮询 + 随机选择,负载分散更均匀。
  • 故障节点自动剔除。

四、支持分表策略

分片策略抽象

public abstract class ClickHouseShardStrategy<T> {
    private String tableName;      // 表名模板,如 "tb_log_%s"
    private Integer tableCount;    // 分表数量
    // 根据数据决定目标表名
    public abstract String getTableName(T data);
}

日志分片实现

public class LogClickHouseShardStrategy extends ClickHouseShardStrategy<String> {
    @Override
    public String getTableName(String application) {
        // 表名格式:tb_log_{application}
        // 例如:application = "order-service" -> table = "tb_log_order_service"
        return String.format(
            this.getTableName(),
            application.replace("-", "_").toLowerCase()
        );
    }
}

按表(应用)维度的缓冲区

日志侧维度降级为应用名称维度缓冲区,实则因为按照应用分表,

业务方可使用自身分表策略携带表名元数据,进行表维度缓冲。

public class ClickHouseShardSinkBuffer {
    // 按 application 分组的缓冲区(ConcurrentHashMap 保证并发安全)
    private final ConcurrentHashMap<String, ClickHouseSinkCounter> localValues;
    public void put(LogModel value) {
        String application = value.getApplication();
        // 1️⃣ 检查是否需要 flush
        if (flushCondition(application)) {
            addToQueue(application); // 触发写入
        }
        // 2️⃣ 添加到缓冲区(线程安全的 compute 操作)
        localValues.compute(application, (k, v) -> {
            if (v == null) v = new ClickHouseSinkCounter();
            v.add(value);
            return v;
        });
    }
    private void addToQueue(String application) {
        localValues.computeIfPresent(application, (k, v) -> {
            // 深拷贝并清空(避免并发修改异常)
            List<LogModel> deepCopy = v.copyValuesAndClear();
            // 构造请求 Blank:application + targetTable + values
            String targetTable = shardStrategy.getTableName(application);
            ClickHouseRequestBlank blank = new ClickHouseRequestBlank(deepCopy, application, targetTable);
            // 放入队列
            writer.put(blank);
            return v;
        });
    }
}

核心设计:

  • 应用隔离: 每个表(应用)独立的 buffer,互不影响。
  • 线程安全: 使用 ConcurrentHashMap.compute()保证并发安全。
  • 深拷贝: List.copyOf() 创建不可变副本,避免并发修改。
  • 批量清空: 一次性取出所有数据,清空计数器。

五、攒批与内存控制

双触发机制

public class ClickHouseShardSinkBuffer {
    private final int maxFlushBufferSize;  // 最大批次大小(如 10000)
    private final long timeoutMillis;      // 超时时间(如 30s)
    // 触发条件检查(满足任一即触发)
    private boolean flushCondition(String application) {
        return localValues.get(application) != null
            && (checkMetaSize(application) || checkTime(application));
    }
    // 条件1:达到批次大小
    private boolean checkMetaSize(String application) {
        return localValues.get(application).getMetaSize() >= maxFlushBufferSize;
    }
    // 条件2:超时
    private boolean checkTime(String application) {
        long current = System.currentTimeMillis();
        return current - localValues.get(application).getInsertTime() > timeoutMillis;
    }
}

批次大小计算

public class ClickHouseSinkCounter {
    private final List<LogModel> values;
    private Long metaSize; // 累计的 metaSize(字节)
    public void add(LogModel value) {
        this.values.add(value);
        this.metaSize += value.getMetaSize(); // 累加 metaSize
    }
    public List<LogModel> copyValuesAndClear() {
        List<LogModel> logModels = List.copyOf(this.values); // 深拷贝(不可变)
        this.values.clear();
        this.metaSize = 0L;
        this.insertTime = System.currentTimeMillis();
        return logModels;
    }
}

关键点:

  • 使用 metaSize(字节数)而非记录数控制批次,内存控制更精确。
  • List.copyOf() 创建不可变副本,避免并发修改。
  • 清空后重置 insertTime,保证超时触发准确性。

带随机抖动的超时

private final long timeoutMillis;
public ClickHouseShardSinkBuffer(..., int timeoutSec, ...) {
    // 基础超时 + 10% 随机抖动(避免惊群效应)
    this.timeoutMillis = TimeUnit.SECONDS.toMillis(timeoutSec)
                      + new SecureRandom().nextInt((int) (timeoutSec * 0.1 * 1000));
}

目的: 避免多个TM 同时触发 flush,造成写入流量峰值。

配置示例

ClickHouseShardSinkBuffer.Builder
    .aClickHouseSinkBuffer()
    .withTargetTable("single_table")  //单表时,可直接使用指定表名
    .withMaxFlushBufferSize(10000)  // 对应字节数
    .withTimeoutSec(30)              // 30 秒超时
    .withClickHouseShardStrategy(new LogClickHouseShardStrategy("table_prefix_%s", 8))  //分表策略时,使用
    // 分表策略可根据业务实际情况进行扩展
    .build(clickHouseWriter);

六、写入限流与流量控制

有界队列设计

public class ClickHouseWriter {
    // 有界阻塞队列
    private final BlockingQueue<ClickHouseRequestBlank> commonQueue;
    public ClickHouseWriter(ClickHouseSinkCommonParams sinkParams, ...) {
        // 队列最大容量配置(默认 10)
        this.commonQueue = new LinkedBlockingQueue<>(sinkParams.getQueueMaxCapacity());
    }
    public void put(ClickHouseRequestBlank params) {
        unProcessedCounter.incrementAndGet();
        // put() 方法在队列满时会阻塞,实现背压
        commonQueue.put(params);
    }
}

背压传导:

线程池并发控制

public class ClickHouseWriter {
    private final int numWriters; // 写入线程数
    private ExecutorService service;
    private void buildComponents() {
        ThreadFactory threadFactory = ThreadUtil.threadFactory("clickhouse-writer");
        service = Executors.newFixedThreadPool(numWriters, threadFactory);
        // 创建多个 WriterTask 并提交
        for (int i = 0; i < numWriters; i++) {
            WriterTask task = new WriterTask(i, commonQueue, sinkParams, futures, unProcessedCounter);
            service.submit(task);
        }
    }
}

WriterTask 消费逻辑

class WriterTask implements Runnable {
    @Override
    public void run() {
        isWorking = true;
        while (isWorking || !queue.isEmpty()) {
            // poll() 超时返回(100ms),避免无限等待
            ClickHouseRequestBlank blank = queue.poll(100, TimeUnit.MILLISECONDS);
            if (blank != null) {
                // 创建 Future 并设置超时(3 分钟)
                CompletableFuture<Boolean> future = new CompletableFuture<>();
                future.orTimeout(3, TimeUnit.MINUTES);
                futures.add(future);
                try {
                    send(blank, future, new HashSet<>());
                } finally {
                    // final 进行未知异常兜底,防止为捕获异常造成future状态不完成,永久阻塞
                    if (!future.isDone()) {
                        future.completeExceptionally(new RuntimeException("Unknown exception"));
                    }
                    queueCounter.decrementAndGet();
                }
            }
        }
    }
}

配置参数

七、重试机制与超时控制

Future 超时控制

public class ClickHouseWriter {
    private final int numWriters; // 写入线程数
    private ExecutorService service;
    private void buildComponents() {
        ThreadFactory threadFactory = ThreadUtil.threadFactory("clickhouse-writer");
        service = Executors.newFixedThreadPool(numWriters, threadFactory);
        // 创建多个 WriterTask 并提交
        for (int i = 0; i < numWriters; i++) {
            WriterTask task = new WriterTask(i, commonQueue, sinkParams, futures, unProcessedCounter);
            service.submit(task);
        }
    }
}

超时策略:

  • Future 超时: 3 分钟(orTimeout)。
  • Socket 超时: 3 分钟(socket_timeout=180000)。
  • 连接超时: 30 秒(connectionTimeout=30000)。

重试逻辑

class WriterTask implements Runnable {
    @Override
    public void run() {
        isWorking = true;
        while (isWorking || !queue.isEmpty()) {
            // poll() 超时返回(100ms),避免无限等待
            ClickHouseRequestBlank blank = queue.poll(100, TimeUnit.MILLISECONDS);
            if (blank != null) {
                // 创建 Future 并设置超时(3 分钟)
                CompletableFuture<Boolean> future = new CompletableFuture<>();
                future.orTimeout(3, TimeUnit.MINUTES);
                futures.add(future);
                try {
                    send(blank, future, new HashSet<>());
                } finally {
                    // final 进行未知异常兜底,防止为捕获异常造成future状态不完成,永久阻塞
                    if (!future.isDone()) {
                        future.completeExceptionally(new RuntimeException("Unknown exception"));
                    }
                    queueCounter.decrementAndGet();
                }
            }
        }
    }
}

重试控制逻辑

private void handleUnsuccessfulResponse(..., Set<String> exceptHosts) {
    // 检查 Future 是否已完成(避免重复完成)
    if (future.isDone()) {
        return;
    }
    if (attemptCounter >= maxRetries) {
        // 达到最大重试次数,标记失败
        future.completeExceptionally(new RuntimeException("Max retries exceeded"));
    } else {
        // 递归重试
        requestBlank.incrementCounter();
        send(requestBlank, future, exceptHosts); // 递归调用,排除失败节点
    }
}

重试策略:

  • 递归重试: 失败后递归调用,直到成功或达到最大次数。
  • 异常节点隔离: 每次重试时排除失败的节点(exceptHosts)。
  • 超时控制: Future 超时(3 分钟)防止永久阻塞。

为什么递归重试是更好的选择

递归重试(当前实现)

队列重试(假设方案)

保证一致性

  // ClickHouseWriter.java:139-158
  while (!futures.isEmpty() || unProcessedCounter.get() > 0) {
      CompletableFuture<Void> future = FutureUtil.allOf(futures);
      future.get(3, TimeUnit.MINUTES);  // 阻塞直到全部完成
  }
  • Checkpoint 时所有数据要么全部成功,要么全部失败。
  • 重启后不会有部分数据重复的问题。

简单可靠

  • 代码逻辑清晰。
  • 对于队列重试且不重复,需要复杂的二阶段提交(这里暂不展开),大幅增加代码复杂度。

性能可接受

class WriterTask implements Runnable {
    @Override
    public void run() {
        while (isWorking || !queue.isEmpty()) {
            ClickHouseRequestBlank blank = queue.poll(100, TimeUnit.MILLISECONDS);
            if (blank != null) {
                // 创建 Future 并设置 3 分钟超时
                CompletableFuture<Boolean> future = new CompletableFuture<>();
                future.orTimeout(3, TimeUnit.MINUTES); // 防止永久阻塞
                futures.add(future);
                try {
                    send(blank, future, new HashSet<>());
                } finally {
                    if (!future.isDone()) {
                        future.completeExceptionally(new RuntimeException("Timeout"));
                    }
                    queueCounter.decrementAndGet();
                }
            }
        }
    }
}
  • 虽然阻塞,但有超时保护。
  • ClickHouse 写入通常很快(秒级)。
  • 网络故障时重试也合理。

避开故障节点

  // ClickHouseWriter.java:259-260
  HikariDataSource dataSource = getNextDataSource(exceptHosts);
  • 递归时可以传递 exceptHosts。
  • 自动避开失败的节点。
  • 提高成功率。

异常节点剔除

// 特殊错误码列表(自动加入黑名单)
private final List<Integer> ignoreHostCodes = Arrays.asList(210, 1002);
public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
    if (CollectionUtils.isNotEmpty(exceptionHosts)) {
        // 过滤异常节点
        List<HikariDataSource> healthyHosts = clickHouseDataSources.stream()
            .filter(ds -> !exceptionHosts.contains(getHostFromUrl(ds)))
            .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(healthyHosts)) {
            return null; // 所有节点都异常
        }
        return healthyHosts.get(random.nextInt(healthyHosts.size()));
    }
    // 正常轮询(已 shuffle,避免热点)
    return clickHouseDataSources.get(currentRandom.getAndIncrement() % size);
}

故障节点剔除策略:

  1. 错误码 210(网络异常): 自动加入黑名单。
  2. 错误码 1002(连接池异常): 自动加入黑名单。
  3. APM 监控: 磁盘 >= 90%、HTTP 连接 >= 50 的节点。
  4. 手动配置: 通过 Redis 配置剔除。

恢复机制:

  • LoadingCache 定时刷新(1 分钟)。
  • 节点恢复健康后自动从黑名单移除。

重试流程图

八、异常处理模式

两种 Sink 模式

public Sink buildSink(String targetTable, String targetCount, int maxBufferSize) {
    IClickHouseSinkBuffer buffer = ClickHouseShardSinkBuffer.Builder
        .aClickHouseSinkBuffer()
        .withTargetTable(targetTable)
        .withMaxFlushBufferSize(maxBufferSize)
        .withClickHouseShardStrategy(new LogClickHouseShardStrategy(targetTable, count))
        .build(clickHouseWriter);
    // 根据配置选择模式
    if (ignoringClickHouseSendingExceptionEnabled) {
        return new UnexceptionableSink(buffer);  // 忽略异常
    } else {
        return new ExceptionsThrowableSink(buffer); // 抛出异常
    }
}

UnexceptionableSink(忽略异常 - At-Most-Once)

public class UnexceptionableSink implements Sink<LogModel> {
    private final IClickHouseSinkBuffer<LogModel> buffer;
    @Override
    public void put(LogModel message) {
        buffer.put(message);  // 不检查 Future 状态
    }
    @Override
    public void flush() {
        buffer.flush();
    }
}

适用场景:

  • 允许部分数据丢失。
  • 不希望因写入异常导致任务失败。
  • 对数据准确性要求不高(如日志统计)。

语义保证:At-Most-Once(最多一次)

ExceptionsThrowableSink(抛出异常 - At-Least-Once)

public class ExceptionsThrowableSink implements Sink<LogModel> {
    private final IClickHouseSinkBuffer<LogModel> buffer;
    @Override
    public void put(LogModel message) throws ExecutionException, InterruptedException {
        buffer.put(message);
        // 每次写入都检查 Future 状态
        buffer.assertFuturesNotFailedYet();
    }
    @Override
    public void flush() throws ExecutionException, InterruptedException {
        buffer.flush();
    }
}

Future 状态检查:

public void assertFuturesNotFailedYet() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> future = FutureUtil.allOf(futures);
    // 非阻塞检查
    if (future.isCompletedExceptionally()) {
        logger.error("There is something wrong with the future. exist sink now");
        future.get(); // 抛出异常,导致 Flink 任务失败
    }
}

适用场景:

  • 数据准确性要求高。
  • 需要保证所有数据写入成功。
  • 异常时希望 Flink 任务失败并重启。

语义保证:At-Least-Once(至少一次)

Future 清理策略与并发控制

定时检查器

public class ClickHouseSinkScheduledCheckerAndCleaner {
    private final ScheduledExecutorService scheduledExecutorService;
    private final List<CompletableFuture<Boolean>> futures;
    // ⚠️ volatile 保证多线程可见性(关键并发控制点)
    private volatile boolean isFlushing = false;
    public ClickHouseSinkScheduledCheckerAndCleaner(...) {
        // 单线程定时执行器
        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(factory);
        // 定时执行清理任务(每隔 checkTimeout 秒,默认 30 秒)
        scheduledExecutorService.scheduleWithFixedDelay(getTask(), ...);
    }
    private Runnable getTask() {
        return () -> {
            synchronized (this) {
                //  关键:检查是否正在 flush,避免并发冲突
                if (isFlushing) {
                    return; // Checkpoint 期间暂停清理
                }
                // 1️⃣ 清理已完成的 Future
                futures.removeIf(filter);
                // 2️⃣ 触发所有 Buffer 的 flush(检查是否需要写入)
                clickHouseSinkBuffers.forEach(IClickHouseSinkBuffer::tryAddToQueue);
            }
        };
    }
    // Checkpoint flush 前调用(暂停 cleaner)
    public synchronized void beforeFlush() {
        isFlushing = true;
    }
    // Checkpoint flush 后调用(恢复 cleaner)
    public synchronized void afterFlush() {
        isFlushing = false;
    }
}

核心设计:

  • volatile boolean isFlushing: 标志位,协调 cleaner 与 checkpoint 线程。
  • synchronized (this): 保证原子性,避免并发冲突。
  • 单线程执行器: 避免 cleaner 内部并发问题。

并发控制机制

问题场景:

时间轴冲突:
T1: Cleaner 线程正在执行 tryAddToQueue()
T2: Checkpoint 触发,调用 sink.flush()
T3: Cleaner 同时也在执行 tryAddToQueue()
    ├─ 可能导致:数据重复写入
    ├─ 可能导致:Buffer 清空顺序混乱
    └─ 可能导致:Future 状态不一致

解决方案:

// ClickHouseSinkManager.flush()
public void flush() {
    // 1️⃣ 暂停定时清理任务(设置标志)
    clickHouseSinkScheduledCheckerAndCleaner.beforeFlush(); // isFlushing = true
    try {
        // 2️⃣ 执行 flush(此时 cleaner 线程会跳过执行)
        clickHouseWriter.waitUntilAllFuturesDone(false, false);
    } finally {
        // 3️⃣ 恢复定时清理任务
        clickHouseSinkScheduledCheckerAndCleaner.afterFlush(); // isFlushing = false
    }
}

并发控制流程:

关键设计点:

  1. volatile 保证可见性: isFlushing 使用 volatile,确保多线程间的可见性。
  2. synchronized 保证原子性: getTask() 整个方法体使用 synchronized (this)。
  3. 标志位协调: 通过 isFlushing 标志实现两个线程间的协调。
  4. finally 确保恢复: 即使 waitUntilAllFuturesDone() 异常,也会在 finally 中恢复 cleaner。

避免的并发问题:

  • 数据重复写入: Cleaner 和 Checkpoint 同时 flush。
  • Buffer 状态不一致: 一边清空一边写入。
  • Future 清理冲突: 正在使用的 Future 被清理。

性能影响:

  • Checkpoint flush 期间,cleaner 暂停执行(通常 1-3 秒)。
  • Cleaner 跳过的周期会在下次正常执行时补偿。
  • 对整体吞吐影响极小(cleaner 间隔通常 30 秒)。

九、Checkpoint 语义保证

为什么 Checkpoint 时必须 Flush?

不 Flush 的后果

不Flush导致数据永久丢失

正确做法

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    logger.info("start doing snapshot. flush sink to ck");
    // 1. 先 flush buffer(将内存数据写入 ClickHouse)
    if (sink != null) {
        sink.flush();
    }
    // 2. 等待所有写入完成
    if (sinkManager != null && !sinkManager.isClosed()) {
        sinkManager.flush();
    }
    // 此时 Checkpoint 才能标记为成功
    logger.info("doing snapshot. flush sink to ck");
}

Flush 实现与并发协调

public class ClickHouseSinkManager {
    public void flush() {
        //  步骤1:暂停定时清理任务
        clickHouseSinkScheduledCheckerAndCleaner.beforeFlush(); // isFlushing = true
        try {
            //  步骤2:执行 buffer flush + 等待所有写入完成
            clickHouseWriter.waitUntilAllFuturesDone(false, false);
        } finally {
            //  步骤3:恢复定时清理任务(finally 确保执行)
            clickHouseSinkScheduledCheckerAndCleaner.afterFlush(); // isFlushing = false
        }
    }
}

并发协调详解:

// cleaner 线程执行流程
synchronized (this) {
    if (isFlushing) {
        return; // Checkpoint 期间跳过本次执行
    }
    // 正常执行:清理已完成的 Future + 触发 Buffer flush
    futures.removeIf(filter);
    buffers.forEach(Buffer::tryAddToQueue);
}

关键点:

  • volatile 可见性: isFlushing 使用 volatile 确保 cleaner 线程立即看到状态变化。
  • synchronized互斥: getTask()方法体使用 synchronized (this) 确保原子性。
  • 标志位协调: 通过 beforeFlush() / afterFlush() 管理标志位。
  • finally 保证恢复: 即使 flush 异常,也会在 finally 中恢复 cleaner。

等待所有 Future 完成

public synchronized void waitUntilAllFuturesDone(boolean stopWriters, boolean clearFutures) {
    try {
        // 循环等待:直到所有 Future 完成 + 队列清空
        while (!futures.isEmpty() || unProcessedCounter.get() > 0) {
            CompletableFuture<Void> all = FutureUtil.allOf(futures);
            // 最多等待 3 分钟(与 Future 超时一致)
            all.get(3, TimeUnit.MINUTES);
            // 移除已完成的 Future(非异常)
            futures.removeIf(f -> f.isDone() && !f.isCompletedExceptionally());
            // 检查是否有异常 Future
            if (anyFutureFailed()) {
                break; // 有异常则退出
            }
        }
    } finally {
        if (stopWriters) stopWriters();
        if (clearFutures) futures.clear();
    }
}

关键逻辑:

  • 循环等待直到所有 Future 完成 + 队列清空。
  • 超时 3 分钟(与 Future 超时一致)。
  • 移除已完成的非异常 Future。
  • 有异常时退出循环。

三种 Flush 触发方式对比

Checkpoint 参数配置

// Checkpoint 配置建议
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用 Checkpoint(间隔 1 分钟)
env.enableCheckpointing(60000);
// Checkpoint 超时(必须大于 Future 超时 + 重试时间)
// 建议:CheckpointTimeout > FutureTimeout * MaxRetries
env.getCheckpointConfig().setCheckpointTimeout(600000); // 10 分钟
// 一致性模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 最小间隔(避免过于频繁)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 30 秒
// 最大并发 Checkpoint 数
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

语义保证

推荐配置:

生产环境: 使用 ExceptionsThrowableSink + Checkpoint。

允许部分丢失: 使用 UnexceptionableSink。

十、最佳实践与调优

生产配置

// ========== ClickHouse 连接参数 ==========
clickhouse.sink.target-table = tb_logs_local
clickhouse.sink.max-buffer-size = 104857600        // 批次大小
clickhouse.sink.table-count = 0                // 0 表示不分表
// ========== 写入性能参数 ==========
clickhouse.sink.num-writers = 10               // 写入线程数
clickhouse.sink.queue-max-capacity = 10        // 队列容量
clickhouse.sink.timeout-sec = 30               // flush 超时
clickhouse.sink.retries = 10                   // 最大重试次数
clickhouse.sink.check.timeout-sec = 30         // 定时检查间隔
// ========== 异常处理参数 ==========
clickhouse.sink.ignoring-clickhouse-sending-exception-enabled = false
clickhouse.sink.local-address-enabled = true   // 启用本地表写入
// ========== ClickHouse 集群配置 ==========
clickhouse.access.hosts = 192.168.1.1:8123,192.168.1.2:8123,192.168.1.3:8123
clickhouse.access.user = default
clickhouse.access.password = ***
clickhouse.access.database = dw_xx_xx
clickhouse.access.cluster = default
// ========== HikariCP 连接池配置 ==========
connectionTimeout = 30000                      // 连接超时 30s
maximumPoolSize = 20                           // 最大连接数 20
minimumIdle = 2                                // 最小空闲 2
socket_timeout = 180000                        // Socket 超时 3mi

性能调优

故障排查

十一、总结

本文深入分析了 Flink ClickHouse Sink 的实现方案,核心亮点包括:

技术亮点

  • 连接池选型: 使用 HikariCP,性能优异,连接管理可靠。
  • Future 超时控制: orTimeout(3min) 防止永久阻塞。
  • 显式资源管理: Connection 和 PreparedStatement 显式关闭,防止连接泄漏。
  • 负载均衡优化: Shuffle 初始化 + 轮询选择,避免热点。
  • 异常处理增强: future.isDone() 检查,避免重复完成。
  • 本地表写入: 动态节点发现 + 故障剔除,写入性能提升。
  • 分片策略: 按表(应用)维度路由,独立缓冲和隔离。
  • 攒批优化: 双触发机制(大小 + 超时)+ 随机抖动。
  • 流量控制: 有界队列 + 线程池,实现背压。
  • 健壮重试: 递归重试 + 异常节点剔除 + 最大重试限制。

Checkpoint 语义

  • At-Least-Once: ExceptionsThrowableSink + Checkpoint。
  • At-Most-Once: UnexceptionableSink。
  • Exactly-Once: 需要配合 ClickHouse 事务(未实现)。

生产建议

  1. 必须: Checkpoint 时 flush,否则会丢数据。
  2. 推荐: 使用 HikariCP + 本地表写入。
  3. 推荐: 配置合理的超时(Future < Socket < Checkpoint)。
  4. 推荐: 监控队列大小、Future 失败率、重试次数。

该方案已在生产环境大规模验证,能够稳定支撑百万级 TPS 的日志写入场景。

往期回顾

1.服务拆分之旅:测试过程全揭秘|得物技术

2.大模型网关:大模型时代的智能交通枢纽|得物技术

3.从“人治”到“机治”:得物离线数仓发布流水线质量门禁实践

4.AI编程实践:从Claude Code实践到团队协作的优化思考|得物技术

5.入选AAAI-PerFM|得物社区推荐之基于大语言模型的新颖性推荐算法

文 /虚白

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

导读:面对万亿级广告数据存量、日均 3 亿行增量及数千个复杂查询模板的挑战,快手广告数据平台如何突破性能瓶颈、实现架构统一与体验跃升?本文系统介绍了快手广告团队从 ClickHouse on ES 混合架构,全面迁移至 Apache Doris 的统一分析实践,最终实现查询性能提升 20~90%,写入吞吐提升 3 倍,存储效率提升 60%。

本文整理自快手高级计算引擎研发工程师 周思闽 在 Doris Summit 2025 中的演讲内容,并以演讲者第一视角进行叙述。

快手是国内日活过亿的短视频平台,其广告投放平台是商业化外部广告主与快手电商商家进行广告投放的主要阵地,支持客户在平台上进行广告物料搭建、物料管理、策略变更、数据查看等操作,这对底层数据系统的存储、计算与查询性能提出了极高要求。

要支撑如此大规模的广告投放与实时分析,底层数据架构面临巨大挑战。当前,快手的广告数据包括:由投放系统产生的物料数据以及用于数据分析的效果数据,这些数据呈现出三个显著特征:

  • 数据存量巨大:广告物料累计已达千亿级别,且随业务发展正向万亿规模迈进,存储体量位居公司前列,对架构扩展性提出极高要求。
  • 数据增长迅猛:仅 2025 年第一季度,日均新增广告物料数据同比激增 3.5 倍,要求底层引擎具备强大的实时写入与弹性扩展能力。
  • 数据模型复杂:整个数据体系涵盖约 700 个核心字段,涉及物料、投放、用户、效果等多个维度;同时,为应对多样化分析场景,沉淀的查询模板已超 4000 个,对查询引擎的兼容性与性能均是严峻考验。

架构演进:从分散存储到统一分析

01 早期架构及挑战

早期存储架构中,物料数据由 MySQL、Elasticsearch 协同存储;效果数据主要存储与 Clickhouse 中。

数据分析时,将分散在 MySQL、Elasticsearch 中的物料数据与 ClickHouse 中的效果数据进行高效关联查询,从而为广告主提供完整、及时的投放效果洞察。

01 早期架构及挑战.PNG

在如上所说的 ClickHouse on ES 架构中,用户提交的查询通常包含 Elasticsearch 外表(a)与 ClickHouse 内表(b)。ClickHouse 会解析查询中外表部分,将其转换为 Elasticsearch 查询语句,通过 HTTP 请求获取数据并封装为 Block,最后在引擎内部完成与内表的关联计算。

01 早期架构及挑战-1.PNG

然而,随着 Elasticsearch 中数据量持续增长,该架构逐渐暴露诸多问题:

  • 查询性能恶化:慢查询率上升至 35%,平均查询耗时达到 1.4 秒;
  • 存储瓶颈:Elasticsearch 单分片难以支撑 10 亿级以上数据量,扩容与数据重分布成本高;
  • 运维复杂度高:数据链路依赖组件多,运维与监控成本显著上升;
  • 问题定位困难:缺少 ClickHouse 与 Elasticsearch 之间的全链路可观测手段,出现查询延迟、数据不一致等问题时,需跨系统排查,耗时较长。

02 选型目标及调研

基于上述问题及挑战,我们为新架构设定了明确目标:

  • 慢查询率低于 5%;
  • 运维排查耗时降低至分钟级;
  • 支持单表万亿级别数据存储;
  • 保障数据实时性,延迟低于 5 分钟。

基于以上目标,我们对 Apache Doris、ClickHouse、Elasticsearch 等主流 OLAP 引擎进行了全面的调研与性能压测。测试涵盖了写入吞吐、查询延迟、存储压缩率、全文检索性能等关键维度。

02 选型目标及调研.png

在这过程中,ClickHouse 首先被排除,因其不支持唯一键模型,而广告物料数据存在大量更新场景,要求引擎具备主键更新能力。因此,重点在 Elasticsearch 与 Apache Doris 之间进行对比。

综合测试结果,Apache Doris 在写入性能、查询效率、存储成本及运维复杂度等方面均表现优异,不仅能够满足既定架构目标,还在多个场景下显著优于 Elasticsearch。因此,我们最终选定 Apache Doris 作为下一代广告数据分析引擎

03 基于 Apache Doris 的统一分析引擎

在实际应用中,我们引入 Apache Doris(计算引擎) 替换了原先架构中的 Elasticsearch、ClickHouse,设计了统一分析引擎 Bleem。通过在外部表模块中引入数据缓存层与元数据服务层,有效提升了跨源查询效率,使数据湖外表的查询性能接近内表水平,实现了关键的性能突破。

03 基于 Apache Doris 的统一分析引擎.png

具体来看,Bleem 架构自下而上分为 5 层

  • 存储层:数据湖中的 Hive/Hudi 数据存储于 HDFS;存算分离模式下的内表数据存放于对象存储 BlobStore;存算一体模式下的内表数据则存储于本地磁盘。
  • 缓存层:将 Hive/Hudi 外部表数据缓存至 Alluxio,保障 I/O 稳定性,提升数据读取效率。
  • 计算层:Apache Doris 为核心引擎。不同项目组对应不同的 Doris 集群,以实现计算资源物理隔离,用户可按需申请计算资源。依托于 Doris 湖仓查询能力,可直接对 Doris 内表与外部 Hive/Hudi 数据查询。同时,Doris 也支持存算一体与存算分离两种部署方式,可根据实际需求灵活选择。
  • 服务层:元数据缓存服务实时监听 Hive 元数据变更,并同步至缓存中,以提升湖仓外部表的查询效率。
  • 接入层:将 OneSQL 作为统一查询接入网关,提供集群路由、查询改写、物化改写、查询鉴权、限流与阻断等功能。

依托 Doris 强大的 OLAP 计算与湖仓一体能力,将此前分散的数据湖分析、实时 OLAP 查询、在线报表及全文检索等多种场景,统一整合至同一套引擎架构中,实现了技术栈的收敛与提效。该架构在实际落地中已带来显著收益:

  • 性能大幅提升:慢查询率低于 5%,整体查询性能提升了 20%~90%
  • 存储扩展高效:支持万亿级别数据存储,水平扩容效率较 Elasticsearch 提升 10 倍以上;
  • 运维大幅简化:一套引擎覆盖全部查询场景,系统依赖组件少,运维复杂度显著降低;
  • 可观测性全面加强:Doris 支持全链路追踪与全面监控,平均问题排查时间降低 80%

迁移实践及调优经验

整个迁移过程分为三个阶段,稳步推进以确保业务平稳过渡:

  • 第一阶段(试点验证):选取关键词推广场景进行试点,跑通全量与增量数据导入流程,搭建双链路并行验证数据一致性与查询正确性。
  • 第二阶段(主体迁移):迁移原 ClickHouse on ES 查询链路,将 Elasticsearch 中全量物料数据导入 Doris,完成业务切换后下线 Elasticsearch 集群。
  • 第三阶段(收尾统一):迁移剩余纯 ClickHouse 场景,将无需关联 Elasticsearch 的查询任务及其数据全部迁移至 Doris,完成整体架构统一。

在架构升级及迁移过程中,我们收获了许多实践及优化经验,在此逐一分享

01 解决极端场景下数据一致性问题

在数据导入层面,我们基于 SeaTunnel 实现流式数据同步,该方式支持批处理场景下的 Overwrite 语义,所有导入均采用两阶段提交机制,以确保数据同步的最终一致性。

而在基于 SeaTunnel 和 Spark 的数据同步过程中,我们遇到了极端场景下的数据重复问题。主要有两种情况:

  • Spark 推测执行时,两个 Task 同时写入同一份数据并均完成 Doris 两阶段提交,尽管 Driver 只认定一个 Task 成功,但数据已重复。
  • Spark Task 完成 Doris 提交后,在向 Driver 汇报前因抢占或异常退出,Driver 重启 Task 并重新写入数据。

为解决该问题,我们在 Doris 的两阶段事务提交环节引入了 ZooKeeper 分布式锁机制,通过记录并校验事务状态来保证批同步的一致性。具体流程如下:

  • 准备提交阶段,先获取 ZooKeeper 临时锁,确保同一时间只有一个事务进入提交流程;
  • 获取锁后,将 Prepare 状态写入 ZooKeeper 临时节点,并记录当前事务 ID;
  • 查询上一个事务的状态:

    • 若不存在,直接提交当前事务;
    • 若上一事务处于 Prepare 状态,则先回滚上一事务,再提交当前事务;
    • 若上一事务已 Commit,则直接回滚当前事务;
  • 最终将 Commit 状态写入 ZooKeeper 持久节点,完成本次提交。

01 解决极端场景下数据一致性问题.png

02 Stream Load 机制优化

为应对高并发数据导入,我们对 Apache Doris 的 Stream Load 机制进行了调优。通过合理配置任务优先级与合并(Compaction)参数,显著提升了写入吞吐与稳定性。Doris 内部通过 Load Channel 进行任务调度,以区分高优与普通优先级通道。

02 Stream Load 机制优化.png

调优的核心在于合理配置相关参数,例如当 Stream Load 任务指定的 timeout 时间小于 300 秒时,系统会将其判定为高优任务并分配至高优通道。参数优化如下

load_task_high_priority_threshold_second=300
compaction_task_num_per_fast_disk=16
max_base_compaction_threads=8
max_cumu_compaction_threads=8

03 差异化的建表策略

OLAP 引擎的查询性能很大程度上取决于表结构设计。因此,我们针对不同业务场景制定了差异化的建表策略:

物料表(高频更新与大规模检索):该表数据量极大且需支持实时更新。业务查询主要基于 account_id 进行过滤,而非原 MySQL 的自增 ID。为充分发挥 Doris 前缀索引与排序键的优势,在保证业务逻辑等价的前提下,我们将 account_idid 组合为联合主键,并将account_id 设为首个排序键及分桶字段,大幅提升查询过滤效率。同时配置倒排索引以支持多维检索,并选用 ZSTD 压缩算法平衡存储与 IO 性能。

-- 建表语句参考
CREATE TABLE ad_core_winfo
(account_id BIGINT NOT NULL,
id BIGINT NOT NULL, 
word STRING,
INDEX idx_word (`word`) USING INVERTED...) 
UNIQUE KEY(account_id,id) 
DISTRIBUTED BY HASH(account_id) BUCKETS 1000;

效果表(多维聚合分析): 相较于物料表,效果表侧重于数仓指标的累加与聚合。因此,我们直接采用聚合模型,并按照“天”或“小时”粒度设置分区。

-- 建表语句参考
CREATE TABLE ad_dsp_report
(__time DATETIME, 
account_id BIGINT, ...
`ad_dsp_cost` BIGINT SUM,
...) 
AGG KEY(__time,account_id,...) 
AUTO PARTITION BY RANGE(date_trunc(`__time`,'hour'))()
DISTRIBUTED BY HASH(account_id) BUCKETS 2;

04 大账户数据倾斜治理

在数据压测中,我们发现不同 Account ID 对应的数据量差异极大,小至个位数、大至百万级别,导致 BE 节点 CPU 负载严重不均。通过 SHOW DATA SKEW 命令进一步确认,Tablet 存储分布明显倾斜:大 Tablet 占用空间达 3–4 GB,小 Tablet 仅 100-200 MB,且大账户查询延迟较高。为此,我们实施了以下两点优化:

A:按账户范围进行分区

经分析,Account ID 为 5–8 位数字,且未来不会超过 10 位。因此使用 FROM_UNIXTIME 函数将 Account ID 转换为 Datetime 类型,按月对历史数据进行分区,共划分出 33 个历史分区。每个分区可容纳 2,592,000 个 Account ID,后续每新增约 200 多万个 Account ID 才会新增一个月份分区。同时,针对历史分区,根据数据存量进行手动分桶,新分区则默认设置为 256 个分桶。

该方案通过分区裁剪有效过滤了大量无关数据,同时为未来数据膨胀预留了扩展空间(物料表日均增量约 3 亿),显著降低分区增长对查询性能的影响。

B:对 Account ID 进行二次哈希

为缓解单个 Account ID 数据量差异过大导致的分布不均,我们选取与 Account ID 无关的 ID 字段,通过 ID MOD 7 计算得到一个取值在 0~6 之间的 mod 字段。将原本仅基于 account_id 的哈希分桶键调整为 (account_id, mod) 联合键,从而将同一 Account ID 的数据分散到 7 个 BE 节点上。

04 大账户数据倾斜治理.png

优化后,各 Tablet 大小基本均衡稳定在 1GB 左右,数据存储与查询负载得以在多个 BE 间均匀分布,有效解决了 此前 CPU 负载不均的问题。

05 万级分区下的查询优化

当分区数量达到万级别时,简单点查 SQL 的耗时达到 250 毫秒,远超 100 毫秒的预期。通过分析,耗时主要集中在 Plan 阶段,原因是 Doris(2.1 版本)在分区裁剪时,会遍历所有分区进行匹配,万级分区的顺序遍历开销巨大。

为此,我们将顺序遍历改为二分查找:对万级分区先进行排序,再利用二分查找快速定位目标分区,将时间复杂度从 O(n) 降至 O(log n)。优化后,该查询耗时从 250 毫秒降至 12 毫秒,性能提升超过 20 倍。目前,二分查找已在 Doris 3.1 版本中实现。

06 并发调优

在查询优化过程中,我们发现:多数查询经过条件过滤后,实际命中的数据量并不大,即便在大账户场景下,命中数据量也仅在百万级别。然而,Profile 显示这类查询的 Total Instance 数高达 800 个,其默认并发数为 32,存在明显的过度并发

为此,我们调整以下参数降低并发开销:

set global parallel_exchange_instance_num=5;
set global parallel_pipeline_task_num=2;

调整后,同一查询的 Total Instance 数量降至 17 个,查询耗时也显著缩短。这说明在小数据量点查场景下,适当降低并发可有效减少 RPC 开销,从而降低延迟(220ms 降至 147ms)。同时,这一优化也提升了系统的整体 QPS 承载能力。

收益及规划

经过上述架构迁移与深度优化,我们在三个核心维度取得了显著收益:

  • 查询性能大幅提升:关键词推广页平均查询延迟下降 64%,创意推广页延迟下降超过 90%,整体查询体验实现跨越式提升。
  • 写入能力显著增强:单节点写入承载能力提升 3 倍以上,单表实时导入峰值突破 300 万行/秒
  • 存储效率优化明显:通过分区策略与 ZSTD 压缩算法,存储效率较 Elasticsearch 提升约 60%,并可轻松支撑万亿级数据存储。

未来,我们将深度探索 Apache Doris ,重点围绕两方面展开:

  • 增强全文检索与分词能力:引入社区在 Doris 4.0 版本中推出的 BM25 打分功能,以及 IK 分词器等更多分词组件,实现按业务场景灵活选用最优分词方案。
  • 增强向量索引:基于 Doris 4.0 版本,在内表和数据湖外表场景下对向量检索的性能和边界能力做验证与优化。

本文完。您还可以阅读来自快手另一篇实践案以及中通快递、小米集团、顺丰科技用户故事来了解湖仓分析。

很多团队在业务发展到一定阶段后,都会认真评估一次:
用户行为分析系统,是继续用现成产品,还是自己搭一套?

实际上,当企业需要埋点分析时,往往已经没有太多时间成本可投入
业务方希望尽快看到数据结果,管理层关注投入产出比,而完全从零自建埋点系统,周期长、风险高、不可控。
因此,基于成熟开源方案快速上线,再按需求自己二开,是目前更常见、也更可控的一种选择。

这篇文章不讨论“埋点的重要性”,只做一件事:
以自建埋点分析系统为参照,给出一个成本参考,并对比基于 ClkLog 开源方案的实际投入。

完全自建一套埋点分析系统成本通常在几十万,且建设周期长、不可控因素多。
基于ClkLog开源方案搭建首期成本可控制在几万最快一周完成部署集成,可以快速交付使用,并具备持续扩展的能力。

一、自建埋点分析系统,通常需要哪些模块?
很多团队低估了“自建”的工作量,下面只列最基础、不可回避的部分
1.数据采集层(SDK + 埋点规范)
这个阶段往往被低估,但实际上 SDK 会长期伴随业务演进,需要持续维护。
2.数据接入与处理层
核心目标是稳定接住数据:常见技术栈包括接入服务 + Kafka / MQ。
3.数据存储层
通常会选择 ClickHouse / Doris / Druid 这类分析型数据库,同时需要设计分区、冷热数据策略。
4.复杂分析计算
这是自建中最耗精力的部分,很多团队会发现:统计不难,难的是保证分析口径正确且性能可用。
5.管理后台与可视化
这部分前端和交互成本往往被严重低估。
6.运维与长期维护
系统上线只是开始,后续还包括各项调优、异常排查等运维工作。

二、为什么很多团队不会选择「完全自建」?
问题不在“能不能做”,而在是否划算
●早期业务验证阶段,数据系统很难直接创造业务价值
●自建系统容错成本高,试错周期长
因此,越来越多团队会选择:
在成熟的开源埋点分析系统基础上建设,而不是从零开始。

三、ClkLog开源方案能解决什么问题?
ClkLog提供了一套可直接落地的开源埋点分析方案,不依赖第三方SaaS服务。全面覆盖了埋点系统中最重、最复杂的核心能力:

1.数据采集层
支持神策SDK与自研鸿蒙SDK
2.数据接受层
进行日志数据接收与存储
3.数据处理层
进行数据处理、归档等服务
4.数据存储层
使用clickhouse进行大量数据查询
5.数据可视化
内置多种成熟分析模型,开箱即用

企业无需从零搭建底层能力,只需要围绕自身业务场景完成部署、运维和少量定制,即可形成一套可用的自有埋点分析系统。

四、基于 ClkLog,企业实际需要投入哪些成本?
1. 基础运行环境(参考)
以 ClkLog 社区版为例,在 1万日活应用规模下,采用Docker方式部署,单台服务器即可满足基础使用需求。
推荐配置参考:8核CPU;32GB内存
在常见的云厂商环境中,约1-2万/年,即可覆盖服务器、云盘、流量、备份等成本。

2. 软件部署与集成
●获取ClkLog代码(Github/Gitee)
●自行部署ClkLog服务(docker部署最快10分钟完成)
●接入埋点SDK(兼容web/小程序/iOS/安卓等)
●常规运维数据库和服务
整体实施周期短,最快一天即可完成部署并交付使用。

3. 业务层面的工作
●埋点规范梳理
●事件与指标定义
●少量业务定制
ClkLog已经内置十几种行业标准分析模型,可供业务直接开箱使用。若还有更多定制业务需要分析,可以通过自定义事件或二次开发来实现,与完全自建相比,省去的是部门团队沟通、大量底层系统设计与长期维护成本。

五、写在最后
对于大多数团队来说,先把系统跑起来、用起来、产生价值,比一开始追求完美更重要。
如果团队希望:
●完全掌控数据
●又不想长期投入基础设施研发
●把精力更多放在业务分析而不是系统本身
那么,基于成熟开源方案搭建自己的埋点分析系统,是一个性价比较高、风险更可控的选择


ClickHouse 数据库渗透测试指南

目录

1概述

2信息收集

3权限评估

4文件读取利用

5文件写入利用

6命令执行利用

7SSRF 利用

8云环境利用

9横向移动

10防御绕过

11常见限制与绕过

12工具与脚本

概述

什么是 ClickHouse

ClickHouse 是由 Yandex 开发的开源列式数据库管理系统,专为在线分析处理(OLAP)场景设计。它能够高效处理大规模数据集,支持快速查询和实时数据分析。

默认配置

配置项

默认值

HTTP 端口

8123

TCP 端口

9000

MySQL 协议端口

9004

配置目录

/etc/clickhouse-server/

数据目录

/var/lib/clickhouse/

用户脚本目录

/var/lib/clickhouse/user_scripts/

用户文件目录

/var/lib/clickhouse/user_files/

渗透测试目标

1 信息泄露:获取敏感数据、配置信息

2 文件读取:读取系统敏感文件

3 文件写入:写入 webshell、脚本、配置文件

4 命令执行:实现 RCE(远程代码执行)

5 横向移动:利用 SSRF 或数据库连接攻击内网

信息收集

1. 版本信息

2. 用户与权限

3. 数据库与表

4. 服务器配置

5. 集群信息

6. 查询历史(敏感信息)

7. 可用函数

权限评估

关键权限检查

权限等级

权限级别

可执行操作

只读用户

SELECT 查询

普通用户

SELECT、INSERT、部分表函数

DDL 权限

CREATE、DROP、ALTER

SOURCES 权限

url()、mysql()、postgresql() 等

管理员

全部权限

文件读取利用

1. file() 函数读取

2. 路径穿越(低版本可能有效)

3. 软链接绕过(需要命令执行权限)

如果有命令执行权限,可以创建软链接绕过目录限制:

然后读取:

文件写入利用

1. INTO OUTFILE

2. 写入脚本到 user_scripts 目录

命令执行利用

方法1:UDF 配置文件(需要配置文件写入权限)

步骤1:创建 UDF 配置文件

创建 /etc/clickhouse-server/cmd_function.xml:

关键参数说明:

execute_direct=0:command 通过 sh -c 执行

execute_direct=1:command 在 user_scripts 目录查找脚本

步骤2:确保 config.xml 包含

步骤3:重启服务后执行

方法2:带参数的交互式 Shell

UDF 配置:

执行命令:

方法3:executable() 表函数

方法4:Executable 表引擎

方法5:Executable Dictionary

SSRF 利用

1. url() 表函数

2. 常见内网服务探测

3. 批量内网扫描

4. 其他 SSRF 函数

云环境利用

AWS

阿里云

腾讯云

Google Cloud

横向移动

1. MySQL 连接

2. PostgreSQL 连接

3. remote() 连接其他 ClickHouse

4. JDBC 连接(需要 JDBC Bridge)

防御绕过

1. 编码绕过

2. 字符串拼接

3. 使用变量

常见限制与绕过

托管环境限制

限制

状态

绕过方法

INTO OUTFILE 禁用

常见

无直接绕过,尝试其他写入方式

Executable DDL 禁用

常见

尝试配置文件方式

路径穿越阻止

常见

软链接(需命令执行)

url() 内网限制

少见

DNS 重绑定

配置文件只读

常见

无绕过

工具与脚本

1. 信息收集一键脚本

2. Python SSRF 扫描脚本

3. 批量端口扫描 SQL 模板

公司收购 Langfuse,正式进军 LLM 可观测性 (LLM observability) 领域,并推出原生 Postgres 服务,以统一事务型与分析型工作负载。

旧金山 — 2026 年 1 月 16 日 — 实时分析、数据仓库、可观测性 (observability) 以及 AI/ML 领域的领导者 ClickHouse 今日宣布完成 D 轮融资,融资金额达 4 亿美元。本轮由 Dragoneer Investment Group 领投,Bessemer Venture Partners、GIC、Index Ventures、Khosla Ventures、Lightspeed Venture Partners、T. Rowe Price Associates, Inc. 管理的账户,以及 WCM Investment Management 共同参与。

此次融资正值 ClickHouse 持续且加速增长之际。目前,公司通过全托管服务 ClickHouse Cloud 已服务超过 3,000 家客户,年度经常性收入 (ARR) 同比增长超过 250%。在过去三个月中,Capital One、Lovable、Decagon、Polymarket 和 Airwallex 等客户开始采用该平台或扩大了现有部署。这些新客户加入了 ClickHouse 已建立的客户群体,其中包括 Meta、Cursor、Sony 和 Tesla 等 AI 创新者及全球知名品牌。

“ClickHouse 的初衷就是为最严苛的数据工作负载提供卓越的性能和成本效率,而今天的增长势头正是这一战略的最好证明,”ClickHouse 首席执行官 Aaron Katz 表示。“面向未来,我们正在支持统一的事务型与分析型工作负载,让开发者能够在坚实的技术基础之上构建各种由 AI 驱动的应用。同时,我们也在拓展产品能力,引入 LLM 可观测性,帮助 AI 应用构建者在进入生产阶段时,更好地评估 AI 输出的质量和行为。新的资金支持,加上持续的产品执行力,使我们有能力在 AI 时代打造领先的数据与 LLM 可观测性平台。”

图片

对大规模数据基础设施与 AI 的高度确信投资

Dragoneer 成立于 2012 年,由 Marc Stad 创立,采用高度精选、以研究为核心的方法,专注于与少数具有品类定义意义的公司建立长期合作关系。过去十年中,该公司投资了多家领先的数据平台以及多家基础性的 AI 公司。

随着 AI 系统逐步从实验走向生产,对底层数据基础设施提出了更高要求。AI 驱动的应用会产生远高于以往的查询量,对延迟更加敏感,同时还需要持续的评估能力和可观测性。在这样的背景下,真正的价值正越来越集中到那些能够支撑大规模、数据密集型生产工作负载的基础设施平台之上。

“每一次重大的平台变革,最终都会回馈那些最贴近生产环境的基础设施公司,”Dragoneer Investment Group 合伙人 Christian Jensen 表示。“当模型能力不断提升,真正的瓶颈就转移到了数据基础设施上。ClickHouse 的突出之处在于,它能够在大规模 AI 系统运行时,提供所必需的性能、效率和可靠性。”

在严谨的评估过程中,Dragoneer 认为 ClickHouse 已成为现代数据技术栈中具有品类定义意义的领导者。该平台广泛支持关键任务级的实时工作负载,深度嵌入于始终在线、面向客户以及 AI 驱动的系统之中。

ClickHouse 的增长不仅来自对现有系统的替代,更来自对全新工作负载的支持。通过在大规模场景下实现高性价比的实时分析,ClickHouse 让许多过去因延迟或成本受限而无法落地的应用场景成为可能。与主要服务内部分析团队的许多数据基础设施平台不同,ClickHouse 经常直接嵌入到面向终端用户的产品中,在这些场景下,性能和可靠性会直接影响用户体验。

“我们寻找的是在系统绝不能停机时依然值得客户信赖的平台,而 ClickHouse 一直展现出这样的能力,”Jensen 补充道。

LLM 可观测性:ClickHouse 通过收购 Langfuse 进入该市场

ClickHouse 正式宣布收购开源 LLM 可观测性平台 Langfuse。与关注系统健康和性能指标的传统可观测性不同,LLM 可观测性关注的是如何确保非确定性、日益复杂的 AI 系统能够输出准确、安全且符合用户意图的结果。随着 AI 系统不断深入生产工作流,LLM 可观测性已成为构建和运营 AI 应用团队不可或缺的一环。

Langfuse 开源项目增长迅速,截至 2025 年底,已获得超过 2 万个 GitHub Star,每月 SDK 安装量超过 2,600 万次。

“我们之所以在 ClickHouse 之上构建 Langfuse,是因为 LLM 可观测性和评估本质上就是一个数据问题,”Langfuse 首席执行官 Marc Klingen 表示。“如今作为一个团队,我们能够提供更加紧密的一体化体验:更快的数据摄取、更深入的评估能力,以及从生产问题到可量化改进之间更短的闭环。”

图片

Langfuse 联合创始人 Clemens Rawert、Marc Klingen、Max Deichmann

原生 Postgres 服务:ClickHouse 面向 AI 构建者推出统一数据技术栈

ClickHouse 同时宣布推出一个与自身平台深度集成的企业级 Postgres 服务。为了支撑既需要事务处理又需要分析能力的现代实时 AI 应用,ClickHouse 打造了一套统一的数据技术栈,其中包括由 NVMe 存储支撑、具备原生 CDC 能力的高性能可扩展 Postgres。用户只需几次点击,就能将事务数据同步至 ClickHouse,从而解锁最高可达 100 倍的分析性能提升。借助由原生 Postgres 扩展提供支持的统一查询层,开发者可以构建横跨事务与分析的应用,而无需维护多个独立系统。该服务由 ClickHouse 与开源云公司 Ubicloud 联合打造,Ubicloud 团队在 Citus Data、Heroku 和 Microsoft 拥有丰富的产品与工程经验。

“Postgres 与 ClickHouse 在架构上天然互补,是 AI 应用不可或缺的组成部分。通过合作,我们为团队交付了一套真正的一体化技术栈,让生产级 Postgres 负责事务处理,让 ClickHouse 专注分析,并作为一个整体协同运行,”Ubicloud 联合首席执行官兼联合创始人 Umur Cubukcu 表示。“我们非常高兴能在 Ubicloud 与 ClickHouse 携手合作,这正是开源生态系统成功的方式:由值得信赖的团队打造一流产品,并共同成长。”

图片

Ubicloud 联合创始人 Umur、Ozgun 和 Daniel

持续的全球扩张与产品动能

在完成融资并收购 Langfuse 的同时,ClickHouse 也在持续扩展其全球布局和生态体系。过去一年中,公司通过与 Japan Cloud 的合作进入日本市场,并宣布与 Microsoft Azure 围绕 OneLake 建立合作关系。ClickHouse 还在旧金山、纽约、阿姆斯特丹、悉尼和班加罗尔举办了多场用户活动,吸引了超过 1,000 名参与者,演讲嘉宾来自 OpenAI、Tesla、Capital One、Ramp 和 Canva 等公司,并连续第二年举办了 AWS re:Invent Chainsmokers 客户活动。

一系列近期产品进展进一步强化了 ClickHouse 在分析、AI 与可观测性交汇领域的地位。公司在数据湖支持方面持续投入,新增了对 Apache Iceberg、Delta Lake 以及主流数据目录的兼容性。同时,平台扩展了全文搜索能力,这对于包括 AI 可观测性在内的各类可观测性场景正变得愈发关键。此外,ClickHouse 还引入了轻量级更新机制,以支持需求更高、负载更复杂的 AI 驱动型应用。根据近期基准测试结果,ClickHouse 持续提供行业领先的性价比,在性能与成本比上超越主流云数据仓库。

借助 D 轮融资、对 Langfuse 的收购以及原生 Postgres 服务的推出,ClickHouse 已做好加速增长的准备,并将进一步巩固其作为统一数据平台与 AI 可观测性平台的战略地位。

了解更多:

关于 ClickHouse:

 

ClickHouse 是一个快速的开源列式数据库管理系统,专为大规模实时数据处理与分析而设计。ClickHouse Cloud 以高性能为核心,提供卓越的查询速度与并发能力,非常适合需要从海量数据中即时获取洞察的应用。随着 AI 智能体 (AI Agent) 越来越多地嵌入软件系统,并生成频率更高、复杂度更大的查询请求,ClickHouse 提供了一个高吞吐、低延迟的引擎,专门用于应对这一挑战。ClickHouse 受到 Sony、Tesla、Memorial Sloan Kettering、Lyft 和 Instacart 等领先企业的信任,帮助团队通过一个可扩展、高效且现代化的数据平台释放数据价值并做出更明智的决策。欲了解更多信息,请访问 clickhouse.com。

关于 Dragoneer Investment Group:

Dragoneer 是一家以增长为导向的投资机构,资产管理规模超过 300 亿美元。该机构与在公有和私有市场中打造品类定义型公司的创始人及管理团队长期合作。迄今为止,已有 50 多家 Dragoneer 投资的公司成功上市。其投资组合包括 Airbnb、Amwins、Atlassian、Databricks、Datadog、Meta、Nubank、OpenAI、Revolut、ServiceNow、Snowflake、Spotify 和 Uber。

关于 Langfuse:

Langfuse 是一个用于构建、测试和监控 LLM 应用及 AI 智能体的开源平台。团队使用 Langfuse 来追踪和调试智能体工作流、运行评估,并持续衡量和改进生产环境中 AI 输出的质量。Langfuse 既提供托管云服务,也支持在生产规模下自托管。作为增长最快的 LLM 工程平台之一,Langfuse 拥有 20,470 个 GitHub Star、每月超过 2,600 万次 SDK 安装量以及 600 多万次 Docker 拉取,并受到《财富》50 强中 19 家公司和《财富》500 强中 63 家公司的信任。欲了解更多信息,请访问 langfuse.com。

关于 Ubicloud:

Ubicloud 正在打造开源版的 AWS,在裸金属和公有云之上交付核心云服务。Ubicloud 由打造分布式 PostgreSQL 的 Citus Data 团队创立 (该公司已被 Microsoft 收购)。其旗舰数据库产品 Ubicloud PostgreSQL 提供企业级托管 Postgres 体验,并具备行业领先的性价比。Ubicloud 在 AI、计算、PostgreSQL 和 Kubernetes 等领域提供的服务每周支撑超过 100 万台虚拟机运行,可帮助客户将云成本降低多达 70%。Ubicloud 获得了 Y Combinator 及其他知名硅谷投资机构的支持。欲了解更多信息,请在 X 上关注 Ubicloud @ubicloudHQ,或访问 ubicloud.com。

/END/

征稿启示

面向社区长期正文,文章内容包括但不限于关于 ClickHouse 的技术研究、项目实践和创新做法等。建议行文风格干货输出 &图文并茂。质量合格的文章将会发布在本公众号,优秀者也有机会推荐到 ClickHouse 官网。请将文章稿件的 WORD 版本发邮件至:Tracy.Wang@clickhouse.com。

做量化交易系统的后端开发,最头疼的不是策略算法(那是 Quants 的事),而是数据管道(Data Pipeline)的健壮性。

特别是处理历史 Tick 数据时,我们面临的是一个典型的“高并发写入+高精度时序”场景。在早期的架构设计中,我经常因为低估了 Tick 数据的体量和复杂性,导致系统在回放时出现“幽灵交易”——即数据到达顺序与交易所撮合顺序不一致。

工程上的三个拦路虎

时间戳的绝对真理: 在分钟线级别,这一秒和下一秒区别不大。但在 Tick 级别,毫秒级的乱序就是灾难。工程上必须严格依赖 Exchange Timestamp 而不是本地接收时间。

分页与流量控制: 也就是 Pagination。一次请求拉取全天 Tick 是不现实的,HTTP 响应体过大会导致超时或内存溢出。

异构数据源: 历史归档数据通常是冷存储结构,而实时流是 WebSocket 热数据,如何用一套代码兼容这两种接口?

高效的解决方案

为了解决这些 IO 密集型任务,我的思路是:将数据获取层(Ingestion Layer)完全解耦。

不要尝试自己在应用层去清洗原始报文。目前比较成熟的做法是直接对接第三方聚合 API。以我目前使用的 AllTick API 为例,它在服务端已经做好了清洗和标准化。这就相当于把复杂的 ETL 过程外包了出去,我们只需要通过简单的 HTTP 请求拿到 JSON 格式的结构化数据。

这样,我们的工程重心就可以从“怎么抓数据”转移到“怎么用数据”上。

代码实现:构建数据拉取器

下面是一个基于 Python requests 库构建的简单拉取器原型。注意看参数中的 limit 和时间窗口设置,这是处理大流量数据的关键:

import requests

API_KEY = "YOUR_API_KEY"
BASE_URL = "https://api.alltick.co/v1/market/tick/history"

params = {
    "symbol": "AAPL.US",
    "market": "US",
    "start_time": "2024-01-02 09:30:00",
    "end_time": "2024-01-02 09:31:00",
    "limit": 1000
}

headers = {
    "Authorization": f"Bearer {API_KEY}"
}

resp = requests.get(BASE_URL, params=params, headers=headers)
data = resp.json()

for tick in data.get("data", []):
    ts = tick["timestamp"]
    price = tick["price"]
    volume = tick["volume"]
    print(ts, price, volume)

架构师视角的补充

在实际生产环境中,这段代码拉下来的数据,我不建议直接进 Pandas 分析,而是应该先进入消息队列(如 Kafka)或者写入 ClickHouse 这样的列式数据库。

为什么?因为历史 Tick 的价值在于高保真回放。标准化的接口解决了“源头”问题,而合理的存储架构解决了“流转”问题。这才是构建低延迟交易系统的正确姿势。

今天这篇博客文章由 Streamfold 的 Mike Heffner 和 Ray Jenkins 撰写。他们是 Rotel 的维护者,该项目是一个用 Rust 编写的开源工具,致力于实现高性能、资源高效的 OpenTelemetry 数据采集。

TLDR;

大规模系统中的效率至关重要:哪怕是资源消耗的小幅下降,也可能带来显著的成本节约和效率提升。

OTel + ClickHouse 的基准测试:我们构建了一套数据管道,用于评估将流式追踪数据写入 ClickHouse 的性能。

Rotel 实现了 4 倍性能提升:我们展示了如何将 OTel Collector 每核每秒处理 13.7 万个追踪跨度 (trace span) 提升到 Rotel 的 46.2 万个,并详细说明了多项关键的性能优化。

工具和资源:文末提供了我们在基准测试中所用的工具清单。

引言

在 PB 级规模下运营一个可观测性平台,需要持续关注资源利用效率。即便是每核性能或内存使用上的细微改进,也能大幅降低基础设施开销。

本文源自我们在丹佛举办的一场 ClickHouse 技术见面会的演讲。在会上,我们分享了我们对 Rotel 的开发工作。Rotel 是一个面向大规模系统的高性能 OpenTelemetry 数据平面 (data plane)。

得益于其高压缩比和良好的成本效益,ClickHouse 越来越多地被用于大规模的 OpenTelemetry 负载中。而在这些系统中,OTel Collector 往往是整条数据管道中成本最高的部分。

最近,ClickHouse 发布了一篇关于大规模系统中效率重要性的文章(https://clickhouse.com/blog/scaling-observability-beyond-100pb-wide-events-replacing-otel),并介绍了他们在内部平台 LogHouse 上运行 OTel 的实际经验。

其中有个细节引起了我们的注意:

“OTel Collector:使用超过 800 个 CPU 核传输每秒 200 万条日志。”

按每核每秒约 2500 条日志计算,对于典型的日志行来说,这意味着一个 8 核服务器每秒仅能传输约 10MB,远远低于现代硬件的处理能力。而 ClickHouse 每核每秒最多可处理超过 1.25 万条 OTel 事件,吞吐能力是前者的五倍以上。因此,当前的瓶颈在数据采集端,而非存储端。尽管我们无法复现 ClickHouse 内部的 LogHouse 平台,但我们认为,对现代 OpenTelemetry 数据管道进行基准测试仍具有重要意义。因此,我们希望借此次演讲探讨一个核心问题:将追踪数据发送到 ClickHouse 时,不同的 OpenTelemetry 数据平面表现如何?

本文将逐步介绍我们构建的基准测试流程,比较 OpenTelemetry Collector 和 Rotel 的性能。我们搭建了一套合成的追踪数据管道,先通过 Kafka 传输追踪跨度,再写入 ClickHouse。在 OTel Collector 达到 110 万跨度/秒的基准测试后,我们展示了通过以下几项优化如何让 Rotel 在相同硬件上实现最高 370 万跨度/秒的处理能力:实现 JSON 的二进制序列化、分析 Tokio 任务调度性能 (perf analysis of Tokio task management)、以及引入改进的 LZ4 压缩算法。

文末我们还列出了用于此次基准测试的框架和工具。

基准测试框架

在进入测试结果之前,我们先介绍一下评估 OpenTelemetry Collector 和 Rotel 所采用的测试方法。如果你感兴趣,也可以直接跳转到结果部分,链接见这里。

追踪数据管道

我们此次基准测试的重点是将追踪跨度 (trace span) 写入 ClickHouse,因为在大型系统中,追踪数据增长极快。我们参考了一个高度可靠的流处理管道进行建模,选用 Kafka 作为日志流层。测试目标是模拟这样一种场景:大量边缘采集器将数据发送至 Kafka 流,由少数几个核心采集器批量写入 ClickHouse。在这个管道中,Rotel 和 OTel Collector 使用相同的 Kafka Protobuf 编码,因此两者可以互换使用。

图片

评估方法

我们希望找出在固定硬件条件下,单个采集器能够稳定支持的最大吞吐量,重点关注的是采集效率,而非系统扩容能力。测试的关键是观察单节点在不降级的前提下可以被“压榨”到什么程度,同时也控制在我们的评估预算范围内。我们选择网关采集器作为测试核心组件,因为它直接将数据输出至 ClickHouse,而后者在处理大批量数据插入时效率最高。为实现更高的批处理效率,部署少量、但能力更强的网关采集器是理想方案,因此我们专注对该组件进行优化与测量。

饱和识别

我们通过两个关键信号来判断采集器是否已达到处理极限:

  • 内存激增:当下游系统产生反压时,采集器会将数据缓存于内存中,导致内存快速增长;

  • Kafka 消费延迟:如果采集器处理速度赶不上 Kafka 流的速度,其消费延迟会不断增加,即“上次读取的消息时间”与“当前时间”的间隔在变大。

测试配置

每项测试我们都将数据管道运行在接近饱和的边缘状态,持续 15 分钟,记录以下两个关键指标:

  • 每秒处理的追踪跨度数,由负载生成器记录;

  • 向 ClickHouse 写入的数据吞吐量(MB/s),由 AWS Cloudwatch 监控采集。

带宽这一指标有助于统一比较不同环境下的处理能力,因为追踪跨度在实际场景中大小差异可能很大。在测试配置中,边缘采集器会将数据打包优化后发送至 Kafka,这样可以减少消息总数,同时提高单条消息的体积。

测试过程中,我们也记录了 Rotel 与 ClickHouse 的平均 CPU 使用率。

OpenTelemetry 的 ClickHouse 数据模式

本次测试使用的 ClickHouse 表结构同时兼容 OpenTelemetry Collector 与 Rotel。这一数据模式是官方推荐的方案之一,适用于 ClickHouse 与 ClickStack 可观测性平台,支持 OTel 的指标、日志与追踪数据。

OTel 的数据模型高度依赖键值属性(key/value attributes)来描述基础设施与应用环境中的关键特征,有助于进一步分析。最初在 ClickHouse 中,这类字段采用 Map 类型存储。但近期,OTel Collector 引入了支持 JSON 列类型(JSON column type)的新特性,使得原有结构可以转换为 JSON 格式。虽然这会给 ClickHouse 带来更高的 CPU 压力,但查询表达能力也因此大大增强。我们的测试选择启用这一新特性。你可以在这里查看我们使用的完整 ClickHouse 追踪数据结构(https://gist.github.com/mheffner/dc332a61f3b9ba1d03fd7c7d5c1b7fbb)。

我们的测试配置中还使用了 ClickHouse 的 Null 表引擎,这是一项关键优化手段。Null 引擎可以接受写入请求但不进行实际存储,因此能帮助我们剥离磁盘 I/O 的影响,专注评估写入吞吐能力与数据结构正确性。在完成峰值吞吐的测试后,我们会进一步评估 ClickHouse 如何处理真实的磁盘写入负载。

负载生成器

我们尝试使用 telemetrygen CLI 工具生成数据,但它难以达到所需的数据量。因此我们改用之前内部构建的负载生成器,该工具最初用于测试 OpenTelemetry 与其他遥测管道。该项目可在 Github 的 otel-loadgen 仓库中找到。它还具备验证端到端数据传输等增强功能,我们将在后续文章中进一步介绍。

我们构造的每个追踪包含约 100 个跨度,涵盖丰富的属性与元数据。和所有合成测试一样,这些数据并不完全等同于真实生产环境中的流量。

测试硬件

所有基准测试均在 AWS EC2 实例上执行。数据管道的每一层组件部署在独立的实例中,所有实例均位于同一可用区,以确保测试结果的一致性与准确性。

图片

为了最大化磁盘吞吐能力,我们将 Kafka 和 ClickHouse 的数据卷直接挂载在实例自带的 NVMe 本地磁盘上。所有测试均使用 Amazon Linux 2023 操作系统,通过 Docker Compose 编排运行各个组件。

本次基准测试的目标是评估单台网关采集器主机所能承载的最高吞吐量。我们最终选择的测试机器为 m8i.2xlarge 实例,配备 8 核 CPU 和 32GB 内存。随着测试规模扩大,数据管道中的其他节点进行了扩容,但网关采集器始终保持不变,便于横向对比。

测试 OpenTelemetry Collector

测试从 OpenTelemetry Collector 开始,它在测试中既作为边缘采集器,也作为网关采集器。

测试配置

图片

你可以在这里查看 Docker Compose 的配置文件(https://github.com/streamfold/rotel-clickhouse-benchmark/blob/main/docker-compose-otelcoll.yml)。

测试结果如下:

在运行单个 Collector 实例时,我们在处理速率达到约 70 万个追踪跨度每秒(约 40 MB/s)时遇到了性能瓶颈。此后内存占用开始持续上升,尽管此时 CPU 利用率尚不足 50%。

OTel 的 Kafka 接收器采用单个 goroutine(轻量线程)处理消息,这很可能成为吞吐量的限制瓶颈。我们尝试了若干 Kafka 参数调整,包括消息大小限制等,但都未能显著提升性能。于是我们转向横向扩展方案,在同一主机上启动第二个 Collector 实例(通过 Docker Compose 的 scale: 2 设置)。

当两个 Collector 实例各自消费一半 Kafka 分区后,系统最大稳定吞吐量达到 110 万追踪跨度每秒(69 MB/s)。一旦超过这个阈值,发送队列开始堆积,内存使用迅速上升。当队列完全填满后,Kafka 接收器仍然会继续读取消息,但会直接丢弃数据。这意味着表面上 Kafka 消费延迟没有上升,但实际上我们已经在丢失追踪数据!

测试期间,网关采集器的 CPU 峰值使用率超过 83%,成为主要瓶颈。而 ClickHouse 的 CPU 使用率维持在 23% 左右。

图片

测试 Rotel

测试配置

图片

你可以在这里查看 Rotel 的 Docker Compose 配置(https://github.com/streamfold/rotel-clickhouse-benchmark/blob/main/docker-compose-rotel.yml)。

测试结果如下:

Rotel 同样使用一个接收循环从 Kafka 拉取数据,与 OTel Collector 架构类似。这使我们初步判断存在串行处理瓶颈。果然,当我们在主机上运行两个 Rotel 实例并充分利用 CPU 后,吞吐量大幅提升。

在两个 Rotel 实例并行运行时,我们实现了每秒 145 万个追踪跨度(76 MB/s)的最大吞吐能力,较 OTel Collector 提升约 1.3 倍。继续提升负载后,我们观察到 Kafka 消费延迟开始缓慢上升,说明消费速率已逼近极限。

此时,网关采集器 CPU 使用率达到 91.3%,成为新的瓶颈;ClickHouse CPU 使用率也升至 60.4%。

图片

我们随后继续挖掘进一步的优化空间,将注意力转向了如何更高效地处理 JSON 列类型的数据传输。

优化 RowBinary 格式下的 JSON 编码

在 Rotel 中,我们基于官方的 Rust ClickHouse 库 clickhouse-rs 进行了改造。该库使用的是 RowBinary 格式——一种面向行的二进制序列化协议,通过 HTTP 与 ClickHouse 进行数据读写。相比之下,OTel Collector 所使用的 Go 驱动和 ClickHouse 内部组件则采用面向列的原生协议。

在处理 JSON 列时,clickhouse-rs 的默认做法是:先将 JSON 内容转为字符串后再发送。虽然 ClickHouse 本身不会以原样字符串存储 JSON 列,但这一“字符串化”过程是为了传输而必须进行的。不过,这样做在高并发情况下代价很大:客户端需要序列化 JSON,服务端则需重新解析,还必须扫描键名和字符串值以转义引号、反斜杠等字符。尤其当字符串体量较大时,这一过程非常耗资源。

在 ClickHouse Slack 社区的帮助下,我们发现其实可以将 JSON 列直接编码为 RowBinary 原生格式。这种方式下,JSON 会被序列化为一系列键值对,每个键为字符串,紧跟一个表示值类型的标签,再跟上该值的原始二进制内容。这种结构可以跳过整个 JSON 的序列化解析过程,从而实现更高效的结构化数据传输。

比如,考虑下面这样一个简单的 JSON 对象:

{  "a": 42,  "b": "dog",  "c": 98}
复制代码

RowBinary 格式下的编码方式如下:

图片

首先,它会将键值对的数量以变长整数(varint)编码,例如上例中是 03 对;然后逐个对键值对编码。每个键会先写入一个表示字符串长度的 varint(如 01),再写入字符串本身,接着是对应值的编码。如果值的类型在 JSON 声明中已知(如上例中的键 a 是整数 42),那么会直接用固定类型编码,如 2a 00 00 00 00 00 00 00。如果类型未声明,则使用动态类型编码方式,例如键 c 用 0a 表示 Int64 类型,后跟值 98(62 00 00 00 00 00 00 00)。最后,键 b 表示字符串类型(15),跟上字符串长度 03 和字符串内容 “dog”。

相比传统的 JSON 传输方式,这种方法在客户端和服务端两端都能显著减少序列化和解析的资源消耗。虽然 clickhouse-rs 库目前尚未原生支持这种编码方式,但我们计划参与贡献该功能的开发。

重新测试

在将 Rotel 升级为使用上述优化编码后,我们重新进行了性能测试,以评估实际效果。结果显示,虽然总体吞吐量依然维持在之前的峰值——每秒 145 万个跨度未变,但 ClickHouse 服务器的 CPU 使用率下降了约 10%,网关采集器的 CPU 占用也略有减少。这种改善在多次测试中均表现稳定,说明服务端解析负担确实得到了缓解。

需要说明的是,此次测试中使用的合成负载并未包含大量长字符串,同时每个跨度的属性数量也与真实生产环境可能有所不同。因此,虽然客户端的改进不明显,但我们相信,在处理属性字段较多、字符串数据较大的实际业务场景中,这种更快的 JSON 序列化路径将带来更明显的性能收益。

图片

用一个意想不到的技巧将吞吐量翻倍(定位并解决内存分配器锁争用)

在这轮测试中,为了充分利用网关采集器主机的 8 个 vCPU 并将事件吞吐量提升至每秒 145 万条写入 ClickHouse,我们必须同时运行两个 Rotel 实例。Rotel 的 Kafka 接收器逻辑原本运行在一个 Tokio 异步任务中,其处理流程简化如下:

图片

这种实现方式存在两个主要问题:

1. 整个处理流程是串行执行的,缺乏并行能力;

2. 数据反序列化(unmarshaling)是一项高度依赖 CPU 的操作,容易阻塞 Tokio 的执行线程。

Tokio 是 Rust 语言的异步运行时,采用协作式调度模型。这要求任务在运行过程中需主动在 `.await` 或其他让出点交还执行权给调度器。网络上已有大量文章探讨该机制的重要性以及忽视它可能带来的严重性能问题。通俗来说,一个 tokio 任务应尽可能靠近 `.await` 点,业界建议任务在两个 `.await` 之间的执行时间应控制在 10 到 100 微秒以内。

在 Rotel 的 exporter 模块中,我们使用了一个专用线程池来处理诸如数据序列化与压缩等 CPU 密集型任务。而在 Kafka 接收流程中,rust-rdkafka 库会将解压缩工作分派至后台线程,在调用 `recv()` 之前完成。但最初我们依然将数据的反序列化逻辑保留在 Tokio 异步任务中。随着分析深入,我们确认反序列化过程极其耗费 CPU,因此决定将其改为异步提交到与 exporter 共用的线程池中,以避免阻塞主线程。

经过这一重构后,Kafka 接收器的主处理循环结构如下:

loop {  select! {    message = recv() => {        unmarshaling_futures.push(spawn_blocking(unmarshal(message)))         },    unmarshaled_res = unmarshaling_futures.next() => {        send_to_pipeline(unmarshaled_res)    }  }}
复制代码

我们随后在网关采集器上使用单个 Rotel 实例重新运行测试,并将负载生成器设置为此前的最大值——每秒 145 万个 trace span。结果系统依旧稳定运行。但让我们颇为意外的是:CPU 使用率相比优化前下降了约 40%!

原先我们之所以要运行两个 Rotel 实例,是因为单个实例未能充分利用主机资源,Kafka 接收模块存在明显的串行处理瓶颈。而此次重构将反序列化逻辑迁移至专用线程池后,这一限制被有效解除。我们预期,在这种并行架构下,单实例就能实现与双实例相同的吞吐性能,并维持类似的资源利用率。

在看到 CPU 占用大幅下降后,我们继续提升负载压力。最终,我们成功将吞吐量从每秒 145 万条提升至 360 万条 trace span,实现翻倍增长!

当处理速率达到每秒 360 万条时,CPU 占用再次达到约 93%,系统达到新一轮饱和。

图片

在验证性能大幅提升后,我们开始着手分析背后 CPU 效率提升的具体原因。为此,我们借助 Linux 的 Perf 工具以及 flame graph(火焰图),对优化前后的 Rotel 构建版本进行了性能剖析,从而定位 CPU 时间的实际开销位置。

使用 Flamegraph 剖析 Rotel 的 Kafka 接收器

我们针对 Rotel Kafka 接收器的旧版本与新版本重新运行了一轮测试,并捕获了 flamegraph(火焰图)用于性能分析。起初乍一看,两者并未显现出明显差异。你是否能发现其中关键?在两个版本中,我们都观察到:将追踪数据准备并导出至 ClickHouse 占据了主要运行时间,此外,接收器中的消息反序列化操作(在 prost::message::Message::decode 函数中执行)也消耗了相当多的资源。这类负载会创建大量生命周期极短的对象,因此系统在内存分配与释放上耗费了大量时间。

旧版本:

图片

新版本:

图片

使用 Linux Perf 评估接收器优化前后的变化

通过运行 perf stat,我们发现两个版本在底层表现上差异巨大。

perf stat -c cycles,instructions,cache-misses,cache-references,context-switches,cpu-migrations 
复制代码

旧版本

295612663445 cycles 264853636815 instructions # 0.90 insn per cycle 615670230 cache-misses 1867733351 cache-references 1224819 context-switches 1230 cpu-migrations 50.296446757 seconds time elapsed
复制代码

新版本:

150590256805 cycles 287007890213 instructions # 1.91 insn per cycle 598469068 cache-misses 1163669589 cache-references 37675 context-switches 43 cpu-migrations 43.716966122 seconds time elapsed
复制代码

新版本平均每周期执行 1.9 条指令,每秒仅发生 862 次上下文切换。尽管这在指令级并行性(ILP)方面不算极致表现,但相较之下已经有明显进步。而旧版本平均仅能达到 0.9 条指令/周期,且上下文切换次数竟高达每秒 24,350 次 —— 新版本将此指标降低了约 32.5 倍!说明旧版本几乎无法并行处理,线程频繁被挂起与唤醒,调度开销巨大。

此外,CPU 迁移数据也显示出改进:新版本平均仅有 1 次迁移/秒,表明线程在相同 CPU 核上保持良好的缓存亲和性,而旧版本则高达 24.5 次/秒。这些迹象显示,旧版本中调度器难以保持线程驻留在固定核心上。

新版本具备更优秀的并行处理特性,使得我们可以进一步扩大吞吐负载。而这一切的背后,仅仅是我们将部分任务拆分至独立线程处理。

我们原先推测旧版本性能瓶颈可能源于 Tokio 执行线程被阻塞,导致运行时不得不频繁轮询、空转,甚至尝试工作窃取。但 perf report 的深入分析为我们揭示了更具体的问题。

深入分析 perf report 结果

通过 perf record 捕获更详细的运行数据后,我们对比了优化前后的性能差异。

在新版本中,大部分计算时间用于压缩数据、将 OTLP 转换为 ClickHouse 所需的数据行结构、以及内存分配与释放等正常开销,整体运行表现健康。

图片

但旧版本的问题非常突出:15% 的运行时间用于释放内存,而压缩和构造数据的时间仅占 9.75%,相比之下新版本达到了 20%。同时,我们发现大量 CPU 时间耗费在如下底层函数中:_raw_spin_unlock_irqrestore、finish_task_switch.isra.0、__lll_lock_wait_private 与 __lll_lock_wake_private。

图片

开启子调用视图后,我们发现这些锁操作多数出现在内存释放阶段。

图片

这些函数具体负责什么?

  • _raw_spin_unlock_irqrestore 是 Linux 内核中的函数,用于释放自旋锁并恢复中断状态。当任务即将被抢占时会调用它,以便调度器执行上下文切换;

  • finish_task_switch.isra.0 是编译器优化后的上下文切换清理函数,负责完成切换后的调度收尾;

  • __lll_lock_wait_private 和 __lll_lock_wake_private 是 glibc 内部函数,用于 mutex 互斥锁等同步机制的实现。

值得注意的是,这些锁相关的函数在释放内存时频繁出现,暗示我们的旧版本在这一过程中产生了严重的锁争用。

回过头来看旧版本的 flamegraph,这一问题其实非常明显。理想情况下,如果我们能使用类似差分 flamegraph(differential flamegraph)工具对比两个版本图谱,可能会更快定位瓶颈点(此处也再次呼吁出现更多易用的 flamegraph 工具)。不过幸运的是,我们还是通过 perf stat 和 perf record 快速找到了问题根因 —— 并不是 Kafka 接收器的串行处理导致瓶颈,而是 ClickHouse exporter 中的序列化函数(TransformPayload)在进行 marshaling 操作时产生了锁争用。

图片

Glibc 多线程内存分配机制下的锁争用问题

从测试数据来看,我们已能清楚解释为何在优化后,系统的 CPU 使用率大幅下降而吞吐能力却反而提升。原先的版本确实消耗了大量计算资源,但主要耗在了无效的系统开销上 —— 本质上,它的大量 CPU 时间都被花在释放内存时的自旋锁等待中。

为了理解这种情况,我们需要简单了解 glibc 默认内存分配器的工作方式。系统将内存划分为多个“arena”(内存区域),每个 arena 都通过一个互斥锁(mutex)来保障内存的分配与释放线程安全。为了减少锁争用,不同线程会尝试创建独立的 arena,随着线程池规模扩大,arena 数量也会同步增长。

但关键在于,如果某段内存在 A 线程上分配,最终却在 B 线程上释放,B 线程就必须锁住 A 所属的 arena,这很容易造成其他线程等待,从而产生锁争用。

在旧版本中,我们在 Kafka 接收器的反序列化(unmarshaling)过程中,于 Tokio 的 I/O 执行线程上分配处理 trace 数据所需的内存。这类异步任务调度在线程数受限的 Tokio executor 上 —— 网关采集器上仅有 8 个线程,刚好一核一个。之后,这段内存在 ClickHouse exporter 的数据序列化过程中被释放,而这部分逻辑则运行在一个大规模阻塞线程池上(包含几十甚至上百个线程)。线程之间频繁切换导致 arena 访问被频繁锁定,进而引发了锁争用问题。

图片

新版本中,我们将反序列化阶段的内存分配也迁移到了与释放操作相同的阻塞线程池中,解决了跨线程释放的问题。随着管道内数据量增加,线程池自动扩展,arena 的数量也随之增加,锁争用的风险被大幅降低。

图片

验证 arena 锁争用:Jemalloc 的对比测试

一位审阅本文的工程师提问:“如果换成 jemalloc 分配器,会出现同样的问题吗?”Jemalloc 是一个专为多线程环境优化的内存分配器,目标之一就是减少锁争用。我们曾在早期测试中尝试过 jemalloc,但当时未见显著性能收益。然而,随着 ClickHouse exporter 的负载提升,以及 Kafka 接收器架构的变化,内存分配压力大幅增加,这促使我们重新测试旧版和新版在 jemalloc 下的表现。

我们将旧版本切换为使用 jemalloc 后,CPU 使用率从 93% 降至 40%,与我们将反序列化迁移至共享线程池后的效果几乎一致,进一步印证了锁争用才是核心瓶颈。

尽管 jemalloc 能缓解 CPU 压力,但在满负载条件下,Kafka 消费延迟却有所增加。加上 jemalloc 当前已不再活跃维护,我们决定不将其设为默认分配器。不过,未来我们可能会引入 feature flag,让用户根据需求自由选择 jemalloc、mimalloc 等自定义内存分配器。

额外提升:开启快速 LZ4 压缩优化

Rotel 采用 ClickHouse 官方推荐的 LZ4 压缩配置来减少网络传输数据量。我们使用的是 lz4_flex crate —— 与 clickhouse-rs 相同的依赖库,但我们是直接引用的。在引入相关压缩支持时,我们忽略了 Cargo.toml 中的功能标志配置。

Lz4-flex 同时提供安全(safe)和非安全(unsafe)两种实现版本,其中 unsafe 版本的性能更优(关于 Rust 中的 unsafe,请参考官方文档(https://doc.rust-lang.org/book/ch20-01-unsafe-rust.html))。默认情况下,需要显式启用 unsafe 特性才能使用高性能实现。

clickhouse-rs 默认启用了 unsafe 模式,而我们最初未启用此选项。启用之后,我们观察到网关采集器的吞吐能力由每秒 360 万条 trace span 提升至 370 万条,网络压缩输入达到了 209 MB/s。

与此同时,网关采集器的 CPU 使用率也进一步小幅下降。

图片

完整的端到端性能评估

在对 Rotel 进行了多轮性能优化,并基于 ClickHouse 的 Null 表引擎完成初步测试后,我们成功将单实例的吞吐能力从最初的每秒 110 万条 trace span 提升至 370 万条,相当于每核每秒处理 46.25 万条。这比最早测试 OTel Collector 所获得的吞吐性能提升了超过 4 倍。

我们随后将评估重点转向了整个链路的最后一环 —— 数据写入 ClickHouse 并真正持久化至磁盘。在扩展 ClickHouse 写入能力时,通常需要从写入性能与查询效率两个维度优化数据表结构。本次测试我们仍采用默认的 OTel 数据模式,因此主要通过选择具备足够写入能力的实例来支撑高负载。

为应对大规模写入负载,我们将 ClickHouse 部署在更高性能的 AWS 实例上,并通过 4 块本地存储构建 RAID0,以确保不会受到磁盘带宽瓶颈限制。

在测试期间,我们关闭了 Rotel 的异步写入功能,并将批处理规模大幅提升至 --batch-max-size=102400,以提升整体写入效率。通过设置 --clickhouse-exporter-async-inserts=false,我们成功维持了每秒 370 万条 trace span 的网关采集器吞吐量。

此时 ClickHouse 的 CPU 占用率约为 50%,压缩后的写入流量达到 210MB/s。

图片

Visual inspection 

可视化效果上,我们在 ClickHouse 中成功查询到了超过 30 亿条追踪数据,验证了端到端链路的可用性。

图片

总结

极限规模下,效率至关重要

ClickHouse 内部 LogHouse 平台所运行的 PB 级可观测性场景表明,效率不再是优化选项,而是生存必要条件。他们将管道吞吐能力提升了 20 倍,同时仅使用之前 10% 的资源。如果仍按原有路径扩展,运维成本将变得无法承受。Netflix 与 OpenAI 等大型技术公司也达成了类似共识 —— 当数据量达到如此规模时,效率的优劣将直接影响业务运转。

本项目的目标正是在这一背景下,推动 OpenTelemetry 数据采集效率提升,并推出 Rotel。

近 4 倍的吞吐性能提升

通过本次工作,我们将 Rotel 打造为一款高吞吐量的 OpenTelemetry 流式采集工具。在相同硬件环境下,Rotel 的处理能力几乎是 OpenTelemetry Collector 的 4 倍。这种差异在大规模场景下可以带来显著的资源节省。Rotel 原生支持 OpenTelemetry 的 trace、metric 和 log 类型。本篇文章聚焦追踪数据,未来我们还将扩展基准测试到日志与指标场景。

我们也希望了解,在海量数据处理场景下,用户最看重哪些功能特性。如果你有宝贵经验或希望分享你的扩展实践,欢迎加入我们的 Discord 社区(https://rotel.dev/discord),或在 GitHub 上(https://github.com/streamfold/rotel)提交贡献。

后续方向

以下是我们在完成本次测试后,计划进一步探索的几个方向:

深入探讨 Kafka 的可靠传输机制

本文仅简单提及了 Rotel 的消息可靠性设计。Rotel 支持端到端消息确认机制,确保从 Kafka 中读取数据时实现“至少一次(at-least-once)”语义,避免依赖 Kafka 默认的自动提交机制可能导致的数据丢失。为此我们对数据管道做了多处修改,并进行了严格测试,以确保在避免重复的同时不丢失任何数据。未来我们计划单独撰文,深入介绍其设计与验证方法。

评估 ClickHouse 原生通信协议

Rotel 当前通过 clickhouse-rs 实现与 ClickHouse 的集成,采用基于 HTTP 的 RowBinary 协议。相比之下,OTel Collector 使用 Go 实现的 ClickHouse 驱动,采用的是 ClickHouse 的原生协议。该协议也是 ClickHouse 节点之间通信所用方式,基准测试显示其性能比 RowBinary 高出约 20%。ClickHouse 还新增了对 Apache Arrow Flight 的支持,后者基于内存格式 Arrow 实现高效传输。我们计划评估是否可将 RowBinary 替换为这些列式协议,以进一步提升 Rotel 吞吐性能。

进一步分析 tokio 中的阻塞任务影响

类似反序列化这类阻塞操作对 tokio 的运行时性能影响显著。在本次评估中我们首次直观感受到其影响,因此希望在其他处理路径中继续探讨类似瓶颈。目前我们已知 Rotel 的 OTLP 接收器在处理连接时会在异步任务中直接执行较重的 Protobuf 反序列化操作,该处理逻辑由 tonic crate 承担。我们计划分析如何将其拆分为独立任务。初步通过 perf 工具观察,预计该处存在巨大优化潜力。

优化内存分配路径

虽然 Rust 本身没有垃圾回收机制,但高频率的内存分配与释放在高负载场景中依然会成为瓶颈。Rotel 在处理短生命周期对象时存在大量内存分配行为。如果我们采用内存重用池(freelist)的方式跳过分配器,将常用缓冲区复用,有望显著减少开销。当然,这类机制的实现难度较高,若不慎也可能导致内存占用飙升。我们可能需要深入修改 tonic crate 才能实现该优化。

我们特别感谢 Sujay Jayakar、Ben Sigelman、Rick Branson、Vlad Seliverstov、Rory Crispin 和 Achille Roussel 对本文早期版本的审阅与反馈。

附录

评估过程中考虑但未纳入的项目

在设计本次基准测试框架时,我们曾希望纳入更多支持 OpenTelemetry 的数据平面工具。但实际测试中发现,它们与我们所设定的测试流程并不兼容。我们之所以选择分布式追踪作为测试对象,是因为它是推动 OTel 被广泛采用的关键场景之一,且在大规模系统中数据增长迅速。然而,日志与指标则属于传统监控领域,很多工具对 trace 类型的遥测数据仍缺乏完善支持。因此虽然未在本次测试中覆盖这些工具,我们仍计划未来开展日志与指标方面的基准评估。

Vector

Vector 是一款专为构建高性能遥测数据管道设计的轻量级工具。它支持广泛的数据源和输出目标,能很好地融入多种系统中。该项目目前由 DataDog 主导开发,并被用于其 observability pipelines 产品。

不过,Vector 对 OpenTelemetry 的支持还处于早期阶段,目前尚无法与多种目标系统对接。尤其在 trace 数据方面,其内部数据模型起初并不支持追踪结构,因此目前对 OTel trace 的支持较为有限。由于 Kafka 和 ClickHouse 的输出插件对 trace 数据尚不兼容,我们未能将其纳入此次测试。

例如,我们曾尝试:

  • 从 OpenTelemetry source 向 Kafka sink 发送 trace 数据(https://github.com/vectordotdev/vector/discussions/21018);

  • 在 ClickHouse 中存储 trace span(https://github.com/vectordotdev/vector/issues/17307#issuecomment-1641075239)。

Fluent Bit

Fluent Bit 是一个以性能为重点、由 C 编写的 Fluentd 替代方案。它提供了对 OpenTelemetry 的输入与输出支持,包括日志、指标与追踪数据。Fluent Bit 支持 Kafka 输入输出,因此理论上可用于构建可靠的数据流管道。

然而我们测试发现,当前版本中,在将 OpenTelemetry 作为输入的同时通过 Kafka 或 HTTP 输出 trace 与 metric 数据,尚不完全支持。这一限制使其暂时无法参与本次评估。

简化 OTel 与 ClickHouse 的迁移操作

根据 ClickHouse 官方文档建议,用户应在部署前手动创建表结构,而不是依赖导出器自动建表。但由于这些迁移脚本通常打包在 OTel exporter 中,缺乏独立部署方式,因此部署过程并不直观。

为了解决这个问题,我们在 Rotel 中将表结构管理逻辑拆分为一个独立的命令行工具 —— clickhouse-ddl,用于便捷地部署数据模式(schema)。该工具可创建与 Rotel 和 OTel Collector 完全兼容的表结构。

我们将该工具封装为一个 Docker 镜像,用户只需运行一条命令即可快速创建用于接收 OTel trace 数据的表。例如,下面是一个用于创建 trace span 表结构的命令示例:

docker run streamfold/rotel-clickhouse-ddl create \    --endpoint https:    
复制代码

此外,也可以像我们在本篇文章中所做的那样,使用 Null 表引擎来创建 schema,以便进行基准测试:

docker run streamfold/rotel-clickhouse-ddl create \    --endpoint https:        
复制代码

参考资料

/END/

征稿启示

面向社区长期正文,文章内容包括但不限于关于 ClickHouse 的技术研究、项目实践和创新做法等。建议行文风格干货输出 &图文并茂。质量合格的文章将会发布在本公众号,优秀者也有机会推荐到 ClickHouse 官网。请将文章稿件的 WORD 版本发邮件至:Tracy.Wang@clickhouse.com。