标签 Apache SeaTunnel 下的文章

在 MySQL CDC 任务中,很多用户都会遇到这样的问题:任务失败后该从哪里恢复?只知道一个时间点,却拿不到对应的 binlog 位点怎么办?Apache SeaTunnel 2.3.12 通过引入按时间启动(Timestamp Startup)功能,给出了更直观的答案。

本文围绕该能力的设计背景、配置方式与实现机制展开解析,帮助读者理解如何基于时间语义更高效地进行 CDC 任务恢复与数据回溯。

功能概述

Problem:CDC 启动点配置“技术正确,但使用困难”

在 Apache SeaTunnel 2.3.12 之前,MySQL CDC 连接器主要支持从指定 binlog 位点(file + position)或 GTID 启动数据同步任务。这种方式在实现上是精确且可靠的,但在真实生产与运维场景中,往往并不符合用户的使用习惯。

在实际 CDC 运维过程中,用户更容易掌握的是 “时间”,而非底层 binlog 细节,例如:

  • 任务异常中断后,希望从
    “2024-04-01 10:00:00” 之后继续同步
  • 对某一时间窗口的数据进行回溯或补采
  • 只知道“昨天 08:00 之后的变更需要重新同步”,但无法定位对应的 binlog 文件和偏移量

如果仍要求用户手动将时间反推为 binlog 位点,不仅配置复杂,而且极易出错,也显著增加了运维成本。这种“技术友好、但用户不友好”的启动方式,已经成为 CDC 任务恢复和回溯场景中的常见痛点。

Solution:引入按时间启动

为解决上述问题,Apache SeaTunnel 在 2.3.12 版本中为 MySQL CDC 连接器引入了按时间启动功能

该功能允许用户直接指定一个 Unix 时间戳(毫秒级) 作为同步起始点。MySQL CDC 连接器会在启动阶段自动完成以下工作:

  1. 根据指定时间戳定位对应的 binlog 文件与偏移量
  2. 从该 binlog 位置开始读取变更事件
  3. 自动跳过所有早于该时间点的历史事件

通过引入“时间”这一更符合业务语义的维度,SeaTunnel 将 CDC 启动方式从面向底层 binlog 细节,提升为面向业务时间语义,显著降低了 CDC 任务在恢复、回溯和运维场景下的使用门槛。

配置参数

要启用按时间启动功能,需要配置以下两个关键参数:

参数名类型必填说明
startup.modeEnum设置为 "timestamp" 启用时间模式 2
startup.timestampLongUnix 时间戳(毫秒),指定启动时间点 3

配置示例

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 10000
}

source {
  MySQL-CDC {
    url = "jdbc:mysql://localhost:3306/testdb"
    username = "root"
    password = "root@123"
    table-names = ["testdb.table1"]
    
    # 启用按时间启动
    startup.mode = "timestamp"
    startup.timestamp = 1672531200000  # 2023-01-01 00:00:00 UTC
  }
}

sink {
  Console {
  }
}

技术实现

启动模式枚举

MySqlSourceOptions 类中定义了所有支持的启动模式,包括新增的 TIMESTAMP 模式:

public static final SingleChoiceOption<StartupMode> STARTUP_MODE =
    (SingleChoiceOption)
        Options.key(SourceOptions.STARTUP_MODE_KEY)
            .singleChoice(
                StartupMode.class,
                Arrays.asList(
                    StartupMode.INITIAL,
                    StartupMode.EARLIEST,
                    StartupMode.LATEST,
                    StartupMode.SPECIFIC,
                    StartupMode.TIMESTAMP))

时间戳过滤实现

核心实现在 MySqlBinlogFetchTask 类中,当检测到启动模式为 TIMESTAMP 时,会使用 TimestampFilterMySqlStreamingChangeEventSource 来处理 binlog 事件:

StartupMode startupMode = startupConfig.getStartupMode();
if (startupMode.equals(StartupMode.TIMESTAMP)) {
    log.info(
        "Starting MySQL binlog reader,with timestamp filter {}",
        startupConfig.getTimestamp());

    mySqlStreamingChangeEventSource =
        new TimestampFilterMySqlStreamingChangeEventSource(
            sourceFetchContext.getDbzConnectorConfig(),
            sourceFetchContext.getConnection(),
            sourceFetchContext.getDispatcher(),
            sourceFetchContext.getErrorHandler(),
            Clock.SYSTEM,
            sourceFetchContext.getTaskContext(),
            sourceFetchContext.getStreamingChangeEventSourceMetrics(),
            startupConfig.getTimestamp());
}

偏移量计算

MySqlSourceFetchTaskContext 中实现了根据时间戳查找对应 binlog 偏移量的逻辑:

private Offset getInitOffset(SourceSplitBase mySqlSplit) {
    StartupMode startupMode = getSourceConfig().getStartupConfig().getStartupMode();
    if (startupMode.equals(StartupMode.TIMESTAMP)) {
        long timestamp = getSourceConfig().getStartupConfig().getTimestamp();
        try (JdbcConnection jdbcConnection =
                getDataSourceDialect().openJdbcConnection(getSourceConfig())) {
            return findBinlogOffsetBytimestamp(jdbcConnection, binaryLogClient, timestamp);
        } catch (Exception e) {
            throw new SeaTunnelException(e);
        }
    } else {
        return mySqlSplit.asIncrementalSplit().getStartupOffset();
    }
}

启动模式对比与适用场景

为了更好地理解按时间启动功能在整体 CDC 启动体系中的定位,下面对 MySQL CDC 当前支持的几种启动模式进行对比说明:

启动模式启动依据优点适用场景
INITIAL全量 + 当前 binlog一次性完成历史与增量同步首次接入数据源
EARLIEST最早可用 binlog不依赖具体位点binlog 保存周期较长的场景
LATEST当前最新 binlog启动快仅关注未来增量数据
SPECIFIC指定 binlog file + position精确可控已明确掌握 binlog 位点的场景
TIMESTAMP指定时间戳(毫秒)配置直观、符合业务语义任务恢复、数据回溯、按时间窗口同步

可以看到,TIMESTAMP 模式并不是替代 SPECIFIC 或 GTID 的“更底层”方案,而是为了解决“用户只知道时间、不知道 binlog”的典型问题,是一种以可用性和运维友好性为核心的补充能力

测试验证

该功能在集成测试中得到了充分验证,测试用例 MysqlCDCSpecificStartingOffsetIT 验证了按时间戳启动的正确性 7

使用注意事项

  1. 版本要求:需要 SeaTunnel 2.3.12 或更高版本
  2. 时间戳格式:必须使用 Unix 时间戳,单位为毫秒
  3. binlog 可用性:确保指定时间点对应的 binlog 文件仍然可用
  4. 时区考虑:时间戳基于 UTC 时区,需要注意时区转换

总结

SeaTunnel MySQL CDC 的按时间启动功能为数据同步提供了更精确的控制能力,特别适用于需要从特定时间点恢复数据同步的场景。该功能通过时间戳到 binlog 偏移量的转换,实现了高效的时间点定位和数据过滤。

Notes

  • 该功能在工厂类 MySqlIncrementalSourceFactory 中通过条件配置规则进行参数验证
  • 除了 MySQL CDC,其他 CDC 连接器如 SQL Server CDC 也支持类似的时间戳启动功能

每次在 Apache SeaTunnel 里配置非关系型数据库,看着那几百行还要手动定义的字段映射,是不是挺崩溃的?配置错一个字段,任务就报错,这种“体力活”真的该结束了。

最近 Apache SeaTunnel 社区的 Issue #10339 提案捅破了这层窗户纸:既然有 Apache Gravitino 这么强大的元数据服务,为什么不直接让它自动同步 Schema?这个提议一出,社区反响热烈,核心维护者们已经把它列入了年度 RoadMap。目前的讨论很务实,大家正盯着怎么让 Apache SeaTunnel 在提交作业时自动‘抓取’最新的元数据,好让大家彻底告别那种‘对着数据库手敲配置’的原始生活。

🫱 Issue 链接: https://github.com/apache/seatunnel/issues/10339

Issue 概述

先来看看提交这个 Issue 的作者是为什么想到这个点子的,以及他初步的核心设计概念。🔽

本 PR 实现了 Apache Gravitino 与 SeaTunnel 的集成,将其作为非关系型连接器的外部元数据服务。通过 Gravitino 的 REST API 自动获取表结构和元数据,SeaTunnel 用户无需再在连接器配置中手动定义冗长且复杂的 Schema 映射。

背景

目前,Apache SeaTunnel 中的许多非关系型连接器(如 Elasticsearch、向量数据库和数据湖引擎)要求用户在作业配置中显式定义完整的列 Schema。这导致了以下问题:

  • 配置繁琐且易错:字段映射内容冗长,极易发生人为错误。
  • 架构冗余:不同作业之间存在大量重复的 Schema 定义。
  • 数据不一致风险:实际存储层与 SeaTunnel 配置文件之间容易出现架构脱节。

变更内容

本 PR 增加了基于 Gravitino 的 Catalog 和 Schema 解析器,使 SeaTunnel 能够:

  • 通过 REST API 从 Gravitino 查询表定义。
  • 自动获取列名、数据类型及相关属性。
  • 直接根据 Gravitino 元数据构建 SeaTunnel 内部 Schema。
  • 针对受支持的连接器,取消强制手动定义 schema { fields { ... } } 的要求。

实现后,用户只需在作业配置中指定 Gravitino Catalog 和相关的表引用即可。

核心优势

  • 零手动映射:非关系型数据源实现 Schema 自动对齐。
  • 单一事实来源:确保表结构与中心化元数据仓库保持高度一致。
  • 提升可靠性:显著提高配置的准确性,降低长期维护成本。
  • 支持复杂类型:通过统一元数据,简化了对嵌套结构、JSON、向量等高级类型的处理。

执行范围

所有基于 Gravitino 的 Schema 解析和校验均在 SeaTunnel Engine 客户端完成(即在作业提交前)。这种设计确保了:

  • 在作业预检阶段即可发现无效或不兼容的 Schema。
  • 运行时的任务仅接收经过验证和标准化的 Schema,降低了执行失败的概率。

影响

这一更新极大地简化了非关系型连接器的作业设置。除了提升易用性,它还为整个 SeaTunnel 生态系统在统一架构管理、架构演进以及高级数据类型支持方面奠定了技术框架。

核心思路

针对 FTP、S3、ES、MongoDB 等半结构化与非结构化数据源,SeaTunnel 现支持通过 Gravitino REST API 自动解析表结构(Schema)。

需要注意的是,这并非要取代现有的显式配置,而是一项完全向前兼容的可选新机制

解析优先级如下:

1. 显式配置(Inline Schema)永远优先

只要连接器配置中包含了 schema 代码块,SeaTunnel 就必须忽略 Gravitino,直接以显式定义的 Schema 为准。

FtpFile {
  path = "/tmp/seatunnel/sink/text"
  # ... 其他基础配置 ...
  
  # 只要这里定义了,就不会去查 Gravitino
  schema = {
    name = string
    age  = int
  }
}

2. 通过 env 全局配置 Gravitino(推荐模式)

SeaTunnel 已在引擎层面集成了 Gravitino Metalake。
env 中全局开启后,所有非关系型数据源都能直接通过名称引用 Schema。

env {
  metalake_enabled = true
  metalake_type    = "gravitino"
  metalake_url     = "http://localhost:8090/api/metalakes/metalake_name/catalogs/"
}

2.1 使用 schema_path 引用

FtpFile {
  # ... 基础配置 ...
  schema_path = "catalog_name.ykw.test_table"
}

2.2 使用 schema_url 引用

FtpFile {
  # ... 基础配置 ...
  schema_url = "http://localhost:8090/api/metalakes/laowang_test/.../tables/all_type"
}

3. 兜底逻辑:读取操作系统环境变量

如果在作业的 env 块中没有定义 Gravitino,SeaTunnel 会尝试从操作系统环境变量中读取以下配置:
metalake_enabled | metalake_type | metalake_url
其行为逻辑与第 2 节中的 env 配置完全一致。

4. 在连接器层级单独配置 Gravitino

如果全局没有配置元数据中心,也可以在具体的连接器(Connector)内部直接定义 Gravitino。

4.1 直接使用 schema_url

FtpFile {
  # ... 基础配置 ...
  metalake_type = "gravitino"
  schema_url = "http://localhost:8090/api/.../tables/all_type"
}

4.2 组合使用 metalake_url 与 schema_path

FtpFile {
  # ... 基础配置 ...
  metalake_type = "gravitino"
  metalake_url  = "http://localhost:8090/api/metalakes/metalake_name/catalogs/"
  schema_path   = "catalog_name.ykw.test_table"
}

5. 探测器定位 (Find detector)

系统会根据 metalake_type 自动匹配并加载对应的 REST API HTTP 探测器。

6. 映射与构建 CatalogTable

探测器调用拼接好的 URL 获取响应体(ResponseBody),随后将其交给映射器(Mapper)进行类型匹配,最终完成 CatalogTable 的构建。

7. 流程图如下

Issue 进展

目前,Apache SeaTunnel 项目核心贡献者对此提议给出了正面评价,并将其添加到 Apache SeaTunnel Roadmap 中。

Apache SeaTunnel PMC Member 对这个提议提出一些疑问,比如这种集成属于哪一层级,对多引擎兼容性的考量,类型转换的准确性等,并根据社区设计规范,要求发起者提交一份正式的设计文档(Design Document)。提交者的回复非常具有建设性,他通过 “客户端预处理”和“抽象 Catalog 接口” 这两个核心设计点,有效地回应了社区对于系统耦合度和运行稳定性的担忧。

目前,这个讨论的回到了该 Issue 的提交者手中,社区正在等待他提交那份正式的 Design Document。

可以看到,这个方案要是落地,咱以后写任务可能就一两行配置的事儿。目前设计稿正在打磨中,非常需要大家去评论区吐吐槽、提提建议,毕竟这个功能好不好用,咱们一线开发者最清楚。走,去 GitHub 围观一下,说不定你的一个提议就能决定下一个版本的样子!🔽
https://github.com/apache/seatunnel/issues/10339