标签 Apache Spark 下的文章

配置 Gravitino Lance REST 服务

作者:Qi Yu
最后更新:2026-01-23

概述

在本教程中,您将学习如何配置和使用 Gravitino Lance REST 服务。完成本指南后,您将拥有一个功能完整的 Lance REST 服务,使 Lance 客户端能够通过 HTTP API 与 Gravitino 交互。

Gravitino Lance REST 服务为管理 Lance 数据集提供 RESTful 接口,实现标准的 Lance REST API。它充当集中式 catalog 服务,允许 Lance 客户端(如 Spark 和 Ray)发现和访问由 Gravitino 管理的 Lance 数据集。

核心概念:

  • Lance REST catalog:用于 Lance 数据集操作的标准 HTTP API 规范
  • Gravitino Lance REST 服务:实现 Lance REST API 并与 Gravitino 的元数据系统集成
  • 统一元数据:在 Gravitino 中存储 Lance 数据集元数据,实现集中治理

REST 端点基础路径为 http://<host>:<port>/lance/

前提条件

开始本教程之前,您需要:

系统要求:

  • Linux 或 macOS 操作系统,具有出站互联网访问权限用于下载
  • Python 环境(3.10+)用于运行 PySpark 或 Ray 客户端

必需组件:

可选组件:

  • 带有 Lance 运行时 JAR 的 Apache Spark,用于客户端验证(推荐用于测试)
  • Ray 框架,用于分布式 Lance 数据处理

继续之前,请验证您的 Python 安装并安装所需包:

python --version
pip install pyspark==3.5.0 lance-ray==0.1.0 lance-namespace

架构概述:

gravitino-lance-rest-architecture.png[gravitino-lance-rest-architecture]

设置

步骤 1:启动带有 Lance REST 服务的 Gravitino 服务器

如果您希望将 Lance REST 服务嵌入到完整的 Gravitino 服务器中(包括 Web UI、统一 REST API 等),请使用此方法。

配置 Lance REST 作为辅助服务

1. 安装 Gravitino 服务器发行版

按照之前的教程 02-setup-guide/README.md 下载或构建 Gravitino 服务器包。

2. 启用 Lance REST 作为辅助服务

修改 conf/gravitino.conf 以启用 lance-rest 服务并进行配置:

# 启用 Lance REST 服务
gravitino.auxService.names = lance-rest
gravitino.lance-rest.httpPort = 9101
gravitino.lance-rest.host = 0.0.0.0
gravitino.lance-rest.namespace-backend = gravitino
gravitino.lance-rest.gravitino-uri = http://localhost:8090
gravitino.lance-rest.gravitino-metalake = lance_metalake
注意:当您访问 Lance REST 服务时,lance_metalake 应该在 Gravitino 中存在。如果不存在,您可以在启动 Gravitino 服务器后通过 Gravitino REST API 或 Web UI 创建它。

3. 启动 Gravitino 服务器

./bin/gravitino.sh start

4. 创建 Metalake(如果不存在)

curl -X POST -H "Content-Type: application/json" \
  -d '{"name":"lance_metalake","comment":"comment"}' \
  http://localhost:8090/api/metalakes

5. 检查服务器日志(可选)

tail -f logs/gravitino-server.log

步骤 2:验证 Lance REST 端点并创建 catalog namespace

测试服务端点

您可以通过以下命令验证服务是否正在运行:

curl -X GET http://localhost:9101/lance/v1/namespace/$/list \
  -H 'Content-Type: application/json'

成功时,您应该看到包含 namespace 信息的 JSON 响应。

创建 catalog namespace

创建一个 catalog namespace(例如 lance_catalog),它将用于包含您的 Lance Schema 和 Table:

curl -X POST http://localhost:9101/lance/v1/namespace/lance_catalog/create \
  -H 'Content-Type: application/json' \
  -d '{
    "id": ["lance_catalog"],
    "mode": "exist_ok"
  }'

如果成功,它会返回 namespace 信息。

步骤 3:从 Spark 中连接

配置您的 PySpark 会话以使用 Lance REST catalog。

配置 Spark

前提条件

  • 安装 pyspark:pip install pyspark==3.5.0
  • 下载与您的 Spark 版本匹配的 lance-spark bundle jar(例如 lance-spark-bundle-3.5_2.12-0.0.15.jar

执行示例操作

运行以下 Python 脚本:

from pyspark.sql import SparkSession
import os

# 设置 lance-spark bundle 的路径
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--jars /path/to/lance-spark-bundle-3.5_2.12-0.0.15.jar "
    "--conf \"spark.driver.extraJavaOptions=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED\" "
    "--conf \"spark.executor.extraJavaOptions=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED\" "
    "--master local[1] pyspark-shell"
)

spark = SparkSession.builder \
    .appName("lance_rest_demo") \
    .config("spark.sql.catalog.lance", "com.lancedb.lance.spark.LanceNamespaceSparkCatalog") \
    .config("spark.sql.catalog.lance.impl", "rest") \
    .config("spark.sql.catalog.lance.uri", "http://localhost:9101/lance") \
    .config("spark.sql.catalog.lance.parent", "lance_catalog") \
    .config("spark.sql.defaultCatalog", "lance") \
    .getOrCreate()

# 创建 schema 和 table
spark.sql("CREATE DATABASE IF NOT EXISTS demo_schema")
spark.sql("""
    CREATE TABLE demo_schema.test_table (id INT, value STRING)
    USING lance
    LOCATION '/tmp/lance_catalog/demo_schema/test_table'
""")

# 插入和查询数据
spark.sql("INSERT INTO demo_schema.test_table VALUES (1, 'test')")
spark.sql("SELECT * FROM demo_schema.test_table").show()

步骤 4:使用 Ray 连接

您还可以使用 Ray 与 Lance Ray 集成来访问 Spark 创建的数据。

使用 Lance REST catalog 配置 Ray

前提条件

  • 安装所需包:pip install lance-ray==0.1.0 lance-namespace

执行示例操作

import ray
import lance_namespace as ln
from lance_ray import read_lance, write_lance

ray.init()

# 连接到 Lance REST
namespace = ln.connect("rest", {"uri": "http://localhost:9101/lance"})

# 读取 Spark 创建的 table
# 注意:Table ID 是 [catalog, schema, table]
ds = read_lance(namespace=namespace, table_id=["lance_catalog", "demo_schema", "test_table"])
print(f"Row count: {ds.count()}")
ds.show()

# 执行过滤操作
result = ds.filter(lambda row: row["id"] < 100).count()
print(f"Filtered row count: {result}")

故障排除

常见问题及其解决方案:

服务连接问题:

  • 服务启动失败:检查 logs/gravitino-server.log 中的启动错误和配置问题
  • 连接被拒绝:验证 gravitino.lance-rest.httpPort(默认 9101)是否开放且可访问
  • curl 返回 404:确认 Lance REST 基础路径是 /lance,端口与配置匹配

客户端连接问题:

  • Spark ClassNotFoundException:确保 lance-spark-bundle jar 在 PYSPARK_SUBMIT_ARGS--jars 中正确引用
  • Namespace 未找到:记住在创建 Schema 或 Table 之前创建父 catalog namespace(例如 lance_catalog
  • Ray 连接错误:验证 lance-raylance-namespace 包已安装,REST 端点可访问

配置问题:

  • Metalake 未找到:确保 gravitino.lance-rest.gravitino-metalake 中指定的 metalake 在 Gravitino 中存在
  • 权限错误:检查 Gravitino 服务器是否对配置的存储位置具有适当的访问权限

恭喜

您已成功完成 Gravitino Lance REST 服务配置!

您现在拥有一个功能完整的 Lance REST 服务,包括:

  • 在端口 9101 上运行的已配置 Lance REST 端点
  • 为组织 Lance 数据集配置的 catalog namespace
  • 通过 Apache Spark 和 Ray 验证的客户端连接
  • 对跨不同计算引擎的 Lance 数据集操作的理解

进一步阅读

有关更高级配置和详细文档:

下一步


Apache Gravitino 正在快速发展,本文基于最新版本 1.1.0 编写。如果您遇到问题,请参考官方文档或在 GitHub 上提交问题。

数字公告板提供商 Pinterest 发布了一篇文章,解释了其新平台Moka在大规模数据处理方面的未来蓝图。该公司正在将核心工作负载从老化的 Hadoop 基础设施迁移到基于 Kubernetes 的系统上,该系统运行在亚马逊 EKS 上,以 Apache Spark 作为主要引擎,并即将支持其他框架。

 

在一个包含两篇文章的博客系列中,Soam Acharya、Rainie Li、William Tom 和 Ang Zhang 描述了 Pinterest 大数据平台团队如何考虑下一代大规模数据处理平台的替代方案,因为现有的基于 Hadoop 的系统(内部称为 Monarch)的局限性变得越来越明显。他们将 Moka 作为搜索的结果,以及基于 EKS 的云原生数据处理平台,该平台现在运行的生产负载达到了 Pinterest 的规模。该系列的第一部分关注整体设计和应用层。相比之下,第二部分转向作者所说的“Moka 的基础设施重点方面,包括经验和未来方向”。

 

文章从实际角度描述了向 Kubernetes 的转变。它展示了一个全行业的转变,即大型技术公司现在将 Kubernetes 视为数据的控制平面,而不仅仅是无状态的服务平台。在大数据社区日益增长的受欢迎程度和越来越多的采用的鼓励下,团队探索了基于 Kubernetes 的系统,作为 Hadoop 2.x 最有可能的替代品。任何候选平台都必须满足可扩展性、安全性、成本以及托管多个处理引擎的精确标准。Moka 是如何在不放弃现有 Spark 投资的情况下现代化 Hadoop 时代的数据平台的一个例子。

 

第二篇文章的核心主题是如何在 Kubernetes 上以非常大的规模运行 Spark。作者解释了他们如何围绕 Moka 添加日志、指标和作业历史服务,以便工程师可以在不了解底层集群拓扑的情况下调试和调整作业。他们使用 Fluent Bit 对日志集合进行标准化,并使用 OpenTelemetry 和 Prometheus 兼容的端点发布统一指标。这为基础设施和应用程序团队提供了系统健康的一致视图。

 

Pinterest 还投资于通过基础设施即代码的方式使平台可重复使用。在文章中,团队概述了他们如何使用 Terraform 和 Helm 创建 EKS 集群、配置网络和安全以及部署支持组件,如 Spark 历史服务器。

 

Pinterest 的工程师还讨论了处理不同的硬件架构。他们描述了他们如何构建多架构镜像,以便他们的数据工作负载在 Intel 和基于 ARM 的实例上运行良好,包括 AWS Graviton,并将此与集群规模的成本和效率目标联系起来。InfoQ 编辑 Eran Stiller 在 LinkedIn上对该项目中的总结指出,Moka“提供了容器级别的隔离、ARM 支持、YuniKorn 调度,并通过整合工作负载和跨实例类型的自动扩展实现了显著的成本节省”。这些细节将工作置于云用户寻求在不牺牲性能的情况下削减基础设施成本的更大趋势之中。

 

关于处理引擎的更广泛的行业对话为 Pinterest 的故事增添了细微差别。在另一篇LinkedIn帖子中,Acharya 写道:“虽然 Spark 是我们的主要主力,但 Moka 的成功意味着 Pinterest 的其他用例也在效仿:Flink Batch 已经投入生产,Apache Ray 紧随其后,Flink Streaming 也将在今年晚些时候推出”。通过对 Spark 和 Flink 技术的深入探讨,我们可以了解到这一点的重要性。强调 Spark 仍然非常适合大型批处理和交互式分析工作负载,而 Flink 是“为实时、有状态的流处理而构建的”,具有严格的逐事件处理。团队将 Moka 呈现为一个灵活的基础,可以根据特定工作负载的需求添加不同的引擎,而不是一个只支持 spark 的平台。

 

外部观察者从 Pinterest 案例中吸取了教训。ML工程师通讯将 Moka 文章描述为“在 Kubernetes 上部署 EKS 集群、Fluent Bit 日志、OTEL 指标管道、镜像管理和 Spark 的自定义 Moka UI”的例子,将其与其他现代数据基础设施案例研究并列。这些反应表明,Moka 被视为一类云原生数据系统的参考架构。

 

然而,团队确实将他们的迁移工作呈现为一个正在进行的旅程,而不是一个已经完成的项目。在博客和进一步的LinkedIn帖子中,Pinterest 作者讨论了“经验和未来的方向”,并描述了早期概念验证如何导致随着对新堆栈的信心增长而逐步远离 Hadoop 的迁移。Acharya 指出,“最好的问题出现在规模上”,构建平台涉及“解决难题”,因为团队转移了实际工作负载。对于其他组织来说,这种经验可能是最重要的教训。复制围绕 Kubernetes、EKS 和 Spark 的技术选择相对简单,但从遗留系统中解耦并投资于可观测性、自动化和多引擎支持的过程可能是未来真正的工作。

 

原文链接:

https://www.infoq.com/news/2026/01/pinterest-kubernetes-bigdata/

一、背景

得物经过10年发展,计算任务已超10万+,数据已经超200+PB,为了降低成本,计算引擎和存储资源需要从云平台迁移到得物自建平台,计算引擎从云平台Spark迁移到自建Apache Spark集群、存储从ODPS迁移到OSS。

在迁移时,最关键的一点是需要保证迁移前后数据的一致性,同时为了更加高效地完成迁移工作(目前计算任务已超10万+,手动比数已是不可能),因此比数平台便应运而生。

二、数据比对关键挑战与目标

关键挑战一:如何更快地完成全文数据比对

现状痛点:

在前期迁移过程中,迁移同学需要手动join两张表来识别不一致数据,然后逐条、逐字段进行人工比对验证。这种方式在任务量较少时尚可应付,但当任务规模达到成千上万级别时,就无法实现并发快速分析。

核心问题:

  • 效率瓶颈:每天需要完成数千任务的比对,累计待迁移任务达10万+,涉及表数十万张。
  • 扩展性不足:传统人工比对方式无法满足大规模并发处理需求。

关键挑战二:如何精准定位异常数据

现状痛点:

迁移同学在识别出不一致数据后,需要通过肉眼观察来定位具体问题,经常导致视觉疲劳和分析效率低下。

核心问题:

  • 分析困难:在比对不通过的情况下,比对人员需要人工分析失败原因。
  • 复杂度高:面对数据量庞大、加工逻辑复杂的场景,特别是在处理大JSON数据时,肉眼根本无法有效分辨差异。
  • 耗时严重:单次比对不通过场景的平均分析时间高达1.67小时/任务。

比数核心目标

基于以上挑战,数据比对系统需要实现以下核心目标:

  • 高并发处理能力:支持每天数千任务的快速比对,能够处理10万+待迁移任务和数十万张表的规模。
  • 自动化比对机制:实现全自动化的数据比对流程,减少人工干预,提升比对效率。
  • 智能差异定位:提供精准的差异定位能力,能够快速识别并高亮显示不一致的字段和数据。
  • 可视化分析界面:构建友好的可视化分析平台,支持大JSON数据的结构化展示和差异高亮。
  • 性能优化:将用户单次比对分析时间从小时级大幅缩短至分钟级别。
  • 可扩展架构:设计可水平扩展的系统架构,能够随着业务增长灵活扩容。

三、解决方案实现原理

快速完成全文数据比对方法

比数方法调研

待比对两表数据大小:300GB,计算资源:1000c


经过调研分析比数平台采用第二种和第三种相结合的方式进行比数。

先Union再分组数据一致性校验原理

假如我们有如下a和b两表张需要进行数据比对

表a:


表b:


表行数比较:

select count(1) from a ;
select count(1) from b ;

针对上面的查询结果,如果数量不一致则退出比对,待修复后重新比数;数量一致则继续字段值比较。

字段值比较:

第一步:union a 和 b

select 1 as _t1_count, 0 as _t2_count, `id`, `name`, `age`, `score`
from a
union all
select 0 as _t1_count, 1 as _t2_count, `id`, `name`, `age`, `score`
from b

第二步:sum(_t1_count),sum(_t2_count) 后分组

select sum(_t1_count) as sum_t1_count, sum(_t2_count) as sum_t2_count, `id`, `name`, `age`, `score`
from (
select 1 as _t1_count, 0 as _t2_count, `id`, `name`, `age`, `score`
from a
union all
select 0 as _t1_count, 1 as _t2_count, `id`, `name`, `age`, `score`
from b
) as union_table
group by `id`, `name`, `age`, `score`


第三步:把不一致数据写入新的表中(即上面表中sum_t1_count和sum_t2_count不相等的数据)

drop table if exists a_b_diff_20240908;
create table a_b_diff_20240908 as select * from (
select sum(_t1_count) as sum_t1_count, sum(_t2_count) as sum_t2_count, `id`, `name`, `age`, `score`
from (
select 1 as _t1_count, 0 as _t2_count, `id`, `name`, `age`, `score`
from a
union all
select 0 as _t1_count, 1 as _t2_count, `id`, `name`, `age`, `score`
from b
) as union_table
group by `id`, `name`, `age`, `score`
having sum(_t1_count) <> sum(_t2_count)
) as tmp

如果a_b_diff_20240908没有数据则两张表没有差异,比数通过,如有差异如下:

第四步:读取不一致记录表,根据主键(比如id)找出不一致字段并写到结果表中。

第五步:针对不一致字段的数据进行根因分析,如 json 、数组顺序问题、浮点数精度问题等,给出不一致具体原因。

哈希值聚合实现高效一致性校验

针对上面union后sum 再 group by 方式 在数据量大的时候还是非常耗资源和时间的,考虑到比数任务毕竟有70%都是一致的,所以我们可以先采用哈希值聚合比较两表的的值是否一致,使用这种高效的方法先把两表数据一致的任务过滤掉,剩下的再采用上面方法继续比较,因为还要找出是哪个字段哪里不一致。原理如下:

SELECT count (*),SUM(xxhash64(cloum1)^xxhash64(cloum2)^...) FROM tableA 
EXCEPT 
SELECT count(*),SUM(xxhash64(cloum1)^xxhash64(cloum2)^...) FROM tableB

如果有记录为空说明数据一致,不为空说明数据不一致需要采用上面提到union 分组的方法去找出具体字段哪里不一样。

通过哈希值聚合,单个任务比数时间从500s降低到160s,节省大约70%的时间。

找到两张表不一致数据后需要对两张的数据进行分析确定不一致的点在哪里?这里就需要知道表的主键,根据主键逐个比对两张表的其他字段,因此系统会先进行主键的自动探查,以及无主键的兜底处理。

精准定位异常数据实现方法

自动探查主键:实现原理如下

刚开始我们采用的前5个字段找主键的方式,如下:

针对表a的前5个字段 循环比对
select count(distinct id) from a 与 select count(1) from a 比较 ,如相等主键为id ,不相等继续往下执行
select count(distinct id,name) from a 与 select count(1) from a比较,如相等主键为id,name ,不相等继续往下执行
select count(distinct id,name,age) from a 与 select count(1) from a比较,如相等主键为id,name,age ,不相等继续往下执行,直到循环结束

采用上面的方法不一致任务中大约有49.6%任务自动探查主键失败:因此需重点提升主键识别能力。

针对以上主键探查成功率低的问题,后续进行了一些迭代,优化后的主键探查流程如下:

一、先采用sum(hash)高效计算方式进行探查:

1.先算出两张表每个字段的sum(hash)值  。

select sum(hash(id)),sum(hash(name)),sum(hash(age)),sum(hash(score)) from a 
union all 
select sum(hash(id)),sum(hash(name)),sum(hash(age)),sum(hash(score)) from b;

2.找出值相等的所有字段,本案例中为 id, name。

3.对id,name 可能是主键进一步确认,先进行行数校验,如 select count(distinct id,name) from a 的值等于select count(1) from a 则进一步校验,否则进入到第二种探查主键方式。

4.唯一性验证,如果值为0则表示探查主键成功,否则进入到第二种探查主键方式。

slect count(*) from ((select id,name from a ) expect (select id,name from b))

二、传统distinct方式探查:

针对表a的前N(所有字段数/2或者前N、后N等)个字段 循环比对:

1.select count(distinct id) from a与select count(1) from a比较 ,如相等主键为id ,不相等继续往下执行。

2.select count(distinct id,name) from a 与 select count(1) from a比较,如相等主键为id,name ,不相等继续往下执行。

3.select count(distinct id,name,age) from a 与 select count(1) from a比较,如相等主键为id,name,age ,不相等继续往下执行,直到循环结束。

三、全字段排序模拟:

如果上面两种方式还是没有找到主键则把不一致记录表进行全字段排序然后对第一条和第二条记录挨个字段进行分析,找出不一致内容,示例如下:

slect * from a_b_diff_20240908 order by id,name,age,score asc limit 10;


通过以上结果表可以得出两表的age字段不一致 ,score不一致(但按key排序后一致)。

如果以上自动化分析还是找不到不一致字段内容,可以人工确认表的主键后到平台手动指定主键字段,然后点击后续分析即可按指定主键去找字段不一致内容。

通过多次迭代优化找主键策略,找主键成功率从最初的50.4%提升到75%,加上全字段order by排序后最前两条数据进行分析,相当于可以把找主键的成功率提升到90%以上。

根因分析:实现原理如下

当数据不一致时,平台会根据主键找出两个表哪些字段数据不一致并进行分析,具体如下:

  • 精准定位: 明确指出哪条记录、哪个字段存在差异,并展示具体的源数据和目标数据值。
  • 智能根因分析: 内置了多种差异模式识别规则,能够自动分析并提示不一致的可能原因,例如:
  • 精度问题:如浮点数计算1.0000000001与1.0的差异。
  • JSON序列化差异:如{"a":1, "b":2}与{"b":2, "a":1},在语义一致的情况下,因键值对顺序不同而被标记为差异。同时系统会提示排序后一致。
  • 空值处理差异:如NULL值与空字符串""的差异判定。
  • 日期时区转换问题:时间戳在不同时区下表示不同。

  • 比对结果统计: 提供总数据量、一致数据量、不一致数据量及不一致率百分比,为项目决策提供清晰的量化依据。
  • 比数人员根据平台分析的差异原因,决定是否手动标记通过或进行任务修复。
  • 效果展示:

四、比数平台功能介绍

数据比对基本流程

任务生成:三种比对模式

  • 两表比对: 最直接的比对方式。用户只需指定源表与目标表,平台即可启动全量数据比对。它适用于临时比对的场景。
  • 任务节点比对: 一个任务可能输出多个表,逐一配置这些表的比对任务繁琐且易遗漏,任务节点比对模式完美解决了这一问题。用户只需提供任务节点ID,平台便会自动解析该节点对应的SQL代码,提取出所有输出表,并自动生成比对任务,极大地提升任务迁移比对效率。
  • SQL查询比对: 业务在进行SDK迁移只关心某些查询在迁移后数据是否一样,因此需要对用户提交的所有查询SQL进行比对,平台会分别在ODPS和Spark引擎上执行该查询,将结果集导出到两张临时表,再生成比对任务。

前置校验:提前发现问题

在启动耗时的全量比对之前,需要对任务进行前置校验,确保比对是在表结构一致、集群环境正常的情况下进行,否则一旦启动比数会占用大量计算资源,最后结果还是比数不通过,会影响比数平台整体的运行效率。因此比数平台一般会针对如下问题进行前置拦截。

  • 元数据一致性校验: 比对双方的字段名、字段类型、字段顺序、字段个数是否一致。
  • 函数缺失校验: 针对Spark引擎,校验SQL中使用的函数是否存在、是否能被正确识别,避免因函数不支持而导致的比对失败。
  • 语法问题校验: 分析SQL语句的语法结构,确保其在目标引擎中能够被顺利解析,避免使用了某些特定写法会导致数据出现不一致情况,提前发现语法层面问题,并对任务进行改写。

更多校验点如下:




通过增加以上前置校验拦截,比数任务数从每天3000+下降到1500+, 减少50% 的无效比数,其中UDF缺失最多,有效拦截任务1238,缺少函数87个(帮比数同学快速定位,一次性解决函数缺失问题,避免多次找引擎同学陆陆续续添加,节省双方时间成本)。

破解比数瓶颈:资源分配与任务调度优化

由于比数平台刚上线的时候只有计算迁移团队在使用,后面随着更多的团队开始使用,性能遇到了如下瓶颈:

1.资源不足问题: 不同业务(计算迁移、存储迁移、SDK迁移)的任务相互影响,基本比数任务与根因分析任务相互抢占资源。

2.任务编排不合理: 没有优先级导致大任务阻塞整体比数进程。

3.引擎参数设置不合理: 并行度不够、数据分块大小等高级参数。

针对以上问题比数平台进行了如下优化:

  • 按不同业务拆分成多个队列来运行,保证各个业务之间的比数任务可以同时进行,不会相互影响。
  • 根因分析使用单独的队列,与数据比对任务的队列分开,避免相互抢占资源发生“死锁”。
  • 相同业务内部按批次分时段、分优先级运行,保障重要任务优先进行比对。
  • 针对Spark引擎默认调优了公共参数、并支持用户自主设置其他高级参数。

通过以上优化达到到了如下效果:

  • 比数任务从每天22点完成提前至18点前,同时支持比数同学自主控制高优任务优先执行,方便比数同学及时处理不一致任务。
  • 通过优化资源队列使用方式,使系统找不到主键辅助用户自主找主键接口响应时间从58.5秒降到 26.2秒。

五、比数平台收益分享

平台持续安全运行500+天,每日可完成2000+任务比对,有效比数128万+次,0误判。

  • 助力计算迁移团队节省45+人日/月,完成数据分析、离线数仓空间任务的比对、交割。
  • 助力存储迁移团队完成20%+存储数据的迁移。
  • 助力引擎团队完成800+批次任务的回归验证,确保每一次引擎发布的安全及高效。
  • 助力SDK迁移团队完成80%+应用的迁移。

六、未来演进方向

接下来,平台计划在以下方面持续改进:

智能分析引擎: 针对Json复杂嵌套类型的字段接入大模型进行数据根因分析,找出不一致内容。

比对策略优化: 针对大表自动切分进行比对,降低比数过程出现因数据量大导致异常,进一步提升比对效率。

通用方案沉淀: 将典型的比对场景和解决方案能用化,应用到更多场景及团队中去。

七、结语

比数平台是得物在迁移过程中,为了应对海量任务、大数据量、字段内容复杂多样、异常数据难定位等挑战,确保业务迁移后数据准确而专门提供的解决方案,未来它不单纯是一个服务计算迁移、存储迁移、SDK迁移、Spark版本升级等需要的数据比对工具,而是演进为数据平台中不可或缺的基础设施。

往期回顾

1.得物App智能巡检技术的探索与实践

2.深度实践:得物算法域全景可观测性从 0 到 1 的演进之路 

3.前端平台大仓应用稳定性治理之路|得物技术

4.RocketMQ高性能揭秘:承载万亿级流量的架构奥秘|得物技术

5.PAG在得物社区S级活动的落地

文 /Galaxy平台

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

Agoda 近日分享了他们如何将多个独立的数据管道整合为一个基于Apache Spark的集中式平台,以消除财务数据中的不一致性的。该公司构建了一个多层质量保障框架,结合自动化校验、基于机器学习的异常检测以及与上游团队签订的数据契约(data contracts),确保用于财务报表和战略规划的财务指标准确无误,同时每天处理数百万笔预订交易。

 

这一问题源于一个典型的企业架构模式,Agoda 的数据工程、商业智能(BI)和数据分析团队各自开发了独立的财务数据管道,并使用不同的逻辑和定义。尽管这种做法在初期提供了简单性和清晰的责任边界,却导致了重复计算和全公司范围内指标不一致的问题。正如 Agoda 工程团队的Warot Jongboondee所解释的那样,这些差异“可能对 Agoda 的财务报表产生实质性的影响”。

独立的财务数据管道 (图片来源)

 

为了解决这一挑战,Agoda 推出了名为 Financial Unified Data Pipeline(FINUDP)的统一财务数据管道,作为销售、成本、收入和利润率等关键财务数据的单一事实来源(single source of truth)。该系统基于Apache Spark构建,每小时向下游团队提供更新,用于对账和财务规划。整合过程耗费了大量的精力:协调产品、财务和工程等多个利益相关方就统一的数据定义达成共识耗费了很长的时间;初始版本的运行时间长达五小时,后通过查询优化和基础设施调整,最终缩短至约 30 分钟。

财务统一数据管道(FINUDP)的架构(图片来源)

 

Agoda 的质量保障框架采用了多重防御机制。自动化校验会检查数据表中的空值、数值范围约束和数据完整性。一旦关键业务规则校验失败,管道会自动暂停,以防处理可能错误的数据。团队使用Quilliup来比对源表与目标表。与上游团队的数据契约(Data Contracts)会明确约定数据格式、内容和质量要求,任何违反契约的行为会立即触发告警。机器学习模型会持续监控数据模式,识别潜在异常。三级告警系统确保通过邮件、Slack 通知以及内部工具实现快速响应,如果数据更新延迟,系统会自动升级至 Agoda 的 7×24 小时网络运营中心(Network Operations Center,NOC)。

 

这一做法契合了行业的整体趋势。根据最新的行业调研,64%的组织将数据质量问题视为最大挑战。Gartner 指出,数据契约正成为“管理、交付和治理数据产品的一种日益流行的方式”。这类生产者与消费者之间的正式协议,明确定义了数据模式(schema)和质量标准。

 

当然,集中化也带来了明确的权衡取舍(trade-offs)包括,开发速度下降,因为任何变更现在都需要对整个管道进行测试。数据依赖,管道必须等待所有上游数据集就绪后才能启动。详尽的文档编写和广泛的干系人共识拖慢了落地进度,却建立了跨团队的信任。Jongboondee 表示,集中化“要求在每个环节都进行更紧密的协作和审慎的变更管理”。

 

目前,该系统已经实现了 95.6%的可用性,并朝着 99.5%的目标迈进。所有变更均需经过影子测试(shadow testing),也就是,在合并请求中,新旧版本的查询会并行运行,并自动比对结果。此外,还有一个与生产环境完全一致的专用 staging 环境,允许团队在正式发布前进行充分的验证。

 

FINUDP 项目表明,当企业处理大规模关键业务数据时,正逐步从零散的、事后补救式的质量检查,转向架构层面强制执行的、端到端的可靠性体系。这种体系优先保障数据的一致性与可审计性,而非单纯的开发速度,这一转变在财务数据日益支撑报表生成、机器学习模型训练和监管合规流程的今天,显得尤为关键。

 

原文链接:

How Agoda Unified Multiple Data Pipelines Into a Single Source of Truth

Agoda 近日分享了他们如何将多个独立的数据管道整合为一个基于Apache Spark的集中式平台,以消除财务数据中的不一致性的。该公司构建了一个多层质量保障框架,结合自动化校验、基于机器学习的异常检测以及与上游团队签订的数据契约(data contracts),确保用于财务报表和战略规划的财务指标准确无误,同时每天处理数百万笔预订交易。

 

这一问题源于一个典型的企业架构模式,Agoda 的数据工程、商业智能(BI)和数据分析团队各自开发了独立的财务数据管道,并使用不同的逻辑和定义。尽管这种做法在初期提供了简单性和清晰的责任边界,却导致了重复计算和全公司范围内指标不一致的问题。正如 Agoda 工程团队的Warot Jongboondee所解释的那样,这些差异“可能对 Agoda 的财务报表产生实质性的影响”。

独立的财务数据管道 (图片来源)

 

为了解决这一挑战,Agoda 推出了名为 Financial Unified Data Pipeline(FINUDP)的统一财务数据管道,作为销售、成本、收入和利润率等关键财务数据的单一事实来源(single source of truth)。该系统基于Apache Spark构建,每小时向下游团队提供更新,用于对账和财务规划。整合过程耗费了大量的精力:协调产品、财务和工程等多个利益相关方就统一的数据定义达成共识耗费了很长的时间;初始版本的运行时间长达五小时,后通过查询优化和基础设施调整,最终缩短至约 30 分钟。

财务统一数据管道(FINUDP)的架构(图片来源)

 

Agoda 的质量保障框架采用了多重防御机制。自动化校验会检查数据表中的空值、数值范围约束和数据完整性。一旦关键业务规则校验失败,管道会自动暂停,以防处理可能错误的数据。团队使用Quilliup来比对源表与目标表。与上游团队的数据契约(Data Contracts)会明确约定数据格式、内容和质量要求,任何违反契约的行为会立即触发告警。机器学习模型会持续监控数据模式,识别潜在异常。三级告警系统确保通过邮件、Slack 通知以及内部工具实现快速响应,如果数据更新延迟,系统会自动升级至 Agoda 的 7×24 小时网络运营中心(Network Operations Center,NOC)。

 

这一做法契合了行业的整体趋势。根据最新的行业调研,64%的组织将数据质量问题视为最大挑战。Gartner 指出,数据契约正成为“管理、交付和治理数据产品的一种日益流行的方式”。这类生产者与消费者之间的正式协议,明确定义了数据模式(schema)和质量标准。

 

当然,集中化也带来了明确的权衡取舍(trade-offs)包括,开发速度下降,因为任何变更现在都需要对整个管道进行测试。数据依赖,管道必须等待所有上游数据集就绪后才能启动。详尽的文档编写和广泛的干系人共识拖慢了落地进度,却建立了跨团队的信任。Jongboondee 表示,集中化“要求在每个环节都进行更紧密的协作和审慎的变更管理”。

 

目前,该系统已经实现了 95.6%的可用性,并朝着 99.5%的目标迈进。所有变更均需经过影子测试(shadow testing),也就是,在合并请求中,新旧版本的查询会并行运行,并自动比对结果。此外,还有一个与生产环境完全一致的专用 staging 环境,允许团队在正式发布前进行充分的验证。

 

FINUDP 项目表明,当企业处理大规模关键业务数据时,正逐步从零散的、事后补救式的质量检查,转向架构层面强制执行的、端到端的可靠性体系。这种体系优先保障数据的一致性与可审计性,而非单纯的开发速度,这一转变在财务数据日益支撑报表生成、机器学习模型训练和监管合规流程的今天,显得尤为关键。

 

原文链接:

How Agoda Unified Multiple Data Pipelines Into a Single Source of Truth