从零开始学Flink:实时数仓与维表时态Join实战
在前一篇 《Flink 双流 JOIN 实战详解》 中,我们用「订单流 + 支付流」搞懂了事实双流之间的时间关联。 但在真实的实时数仓项目里,光有事实流还不够,业务同学更关心的是: 这些信息通常存放在 维度表(维表)中,例如 MySQL 的 这就是 维表时态 Join(Temporal Table Join) 要解决的问题。 本文我们就以「订单事实流 + 用户维表」为例,完成一个从 Kafka 到 MySQL 的简易实时数仓 Demo,并重点理解 Flink SQL 中维表时态 Join 的语法和注意事项。 设想一个简化的电商业务场景: 我们想要在 Flink 中构建一张「订单明细宽表」,字段大致包括: 并且要求: 这正是 时态 Join 和「实时数仓」的关键:按事件发生时刻回放维度视图。 本篇默认你已经完成前几篇中的环境准备: 在此基础上,我们还需要: 和 Kafka 一样,JDBC 连接器也需要以 JAR 包形式放到 Flink 的 以 Flink 1.20.x 对应的 确认 Flink 安装目录(假设为 下载 JDBC Connector JAR 到 Flink 的 如果你使用的是独立集群或远程集群,需要重启 Flink 集群,让新 JAR 在 JobManager/TaskManager 上生效: 重启 Flink SQL Client,使用新 Connector: 如果你在 Windows + WSL2 上部署,只需在 WSL2 内执行上述命令即可;或者手动下载 JAR 后拷贝到 首先在 MySQL 中准备一张简单的用户维度表,用来存用户的基础属性。 在 MySQL 中执行: 为了演示「时态」效果,你可以在后续实验中手动更新某个用户的等级或城市,例如: 这样我们在 Flink 里做时态 Join 时,就能观察“变更前后”的区别。 接下来回到 Flink SQL Client,把 Kafka 中的订单事实流和 MySQL 中的维表都注册成 Flink 表。 和上一篇双流 JOIN 类似,我们假设 Kafka 中有一个 在 Flink SQL Client 中执行: 你可以沿用上一篇中 Kafka 造数的方式,用 然后把刚才在 MySQL 中建好的 注意几点: 在生产环境中,你可以把 MySQL 作为维度存储,或者通过 CDC 把维表变更同步到 Kafka,构造成 changelog 流,这些都可以和 Temporal Join 结合使用。 有了订单事实表 Flink SQL 中的 Temporal Table Join 对于 JDBC 这类 外部维表,通常采用「处理时间(Processing Time)」语义来做 Lookup Join,典型写法如下: 在 SQL Client 中执行这段查询,会看到实时流式刷新的结果,每一行订单都带上了对应的用户属性。 为了验证这是“时态 Join”而不是“始终查最新维度”,可以按下面步骤操作: 回到 MySQL,执行: 在命令行中输入一条 JSON 数据(按回车发送一条): 这就说明 Flink 的时态 Join 确实是“按订单发生时刻去回放维度视图”的,而不是简单查当前最新值。 在真实项目中,我们不会只在 SQL Client 里 例如,可以把结果写回 Kafka,作为 DWD 层的订单宽表: 这样,下游的实时应用或 BI 查询就可以直接订阅 你也可以把结果同步到 MySQL、ClickHouse 等分析型数据库中,构建实时明细表,为报表和可视化提供数据。 通过这篇文章,我们完成了这样一件事: 这背后有几个非常重要的实时数仓设计理念: 在后续的文章中,我们可以继续沿着这个方向深入: 如果你已经跑通了本文的 Demo,不妨试着自己设计一张商品维表 dim_user、dim_product 等。我们希望在实时计算时,能把「事实流」和「维表」在时间维度上正确地关联起来,构建一张带有完整业务属性的明细宽表。一、业务场景与数仓目标
orders 订单事实流dim_user 用户维表,包含用户等级、所属城市、注册渠道等信息二、环境前提与依赖准备
1. 基础组件
2. 安装 Flink JDBC Connector
lib 目录中。flink-connector-jdbc 为例:/opt/flink):export FLINK_HOME=/opt/flinklib 目录:cd $FLINK_HOME/lib
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.3.0-1.20/flink-connector-jdbc-3.3.0-1.20.jarcd $FLINK_HOME
bin/stop-cluster.sh
bin/start-cluster.shcd $FLINK_HOME
bin/sql-client.shlib 目录,步骤完全一致。三、准备 MySQL 用户维度表 dim_user
CREATE DATABASE IF NOT EXISTS realtime_dwh;
USE realtime_dwh;
CREATE TABLE dim_user (
user_id VARCHAR(32) PRIMARY KEY,
user_name VARCHAR(64),
user_level VARCHAR(16),
city VARCHAR(64),
register_time DATETIME
);
INSERT INTO dim_user (user_id, user_name, user_level, city, register_time) VALUES
('u_1', '张三', 'VIP1', '北京', '2025-12-01 10:00:00'),
('u_2', '李四', 'VIP2', '上海', '2025-12-05 11:00:00'),
('u_3', '王五', 'VIP1', '广州', '2025-12-10 12:00:00');UPDATE dim_user
SET user_level = 'VIP3'
WHERE user_id = 'u_2';四、在 Flink 中注册事实流与维表
1. Kafka 订单事实表 orders
orders Topic,写入订单事实数据。CREATE TABLE orders (
order_id STRING,
user_id STRING,
order_amount DECIMAL(10, 2),
order_time TIMESTAMP_LTZ(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,
proc_time AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'properties.group.id' = 'flink-orders-dim',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);kafka-console-producer.sh 发送 JSON 订单数据,只需要保证字段名一致。2. MySQL 用户维表 dim_user(JDBC Lookup 表)
dim_user 注册为 Flink 的 JDBC 表:CREATE TABLE dim_user (
user_id STRING,
user_name STRING,
user_level STRING,
city STRING,
register_time TIMESTAMP(3),
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/realtime_dwh',
'table-name' = 'dim_user',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = '1qaz@WSX'
);PRIMARY KEY (user_id) NOT ENFORCED 告诉 Flink 这是一张以 user_id 为主键的表,是做时态 Join 的前提五、维表时态 Join:把订单打上用户维度
orders 和维度表 dim_user,就可以通过时态 Join 来构建订单明细宽表。1. 基础时态 Join 语法
SELECT
o.order_id,
o.user_id,
d.user_name,
d.user_level,
d.city,
o.order_amount,
o.order_time
FROM orders AS o
LEFT JOIN dim_user FOR SYSTEM_TIME AS OF o.proc_time AS d
ON o.user_id = d.user_id;
这里有几个关键点:proc_time AS PROCTIME() 是在 orders 上定义的处理时间字段FOR SYSTEM_TIME AS OF o.proc_time 表示“以 Flink 处理这条订单记录的当前时间,去查维表的一个快照”,这是 JDBC Lookup 支持的典型用法user_id 等值关联LEFT JOIN 可以保留找不到维度的订单,并用空值来表示“维度缺失”2. 验证时态效果:修改维表再观察 Join
orders Topic 写入几条订单数据,例如用户 u_2 下单的记录u_2 的等级是 VIP2UPDATE dim_user
SET user_level = 'VIP3'
WHERE user_id = 'u_2';u_2bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic orders{"order_id":"o_3","user_id":"u_2","order_amount":200.00,"order_time":"2026-02-19T14:42:00Z"}
这时你会看到:VIP2VIP3六、把结果写回 Kafka 或 MySQL,形成实时数仓明细层
SELECT 一下就结束,而是要把 Join 后的订单明细宽表,写回到下游存储,形成实时数仓的一个层级。CREATE TABLE dwd_order_user_wide (
order_id STRING,
user_id STRING,
user_name STRING,
user_level STRING,
city STRING,
order_amount DECIMAL(10, 2),
order_time TIMESTAMP_LTZ(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'dwd_order_user_wide',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'properties.group.id' = 'flink-dwd-order-wide',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
INSERT INTO dwd_order_user_wide
SELECT
o.order_id,
o.user_id,
d.user_name,
d.user_level,
d.city,
o.order_amount,
o.order_time
FROM orders AS o
LEFT JOIN dim_user FOR SYSTEM_TIME AS OF o.proc_time AS d
ON o.user_id = d.user_id;dwd_order_user_wide 这个 Topic,拿到已经打好用户标签的订单明细数据。七、小结与下一步建议
ordersdim_userFOR SYSTEM_TIME AS OF 语法做维表时态 Joindim_product,再给订单打上商品品类维度,体验一下“事实 + 多维表时态 Join”在 Flink SQL 里的完整味道。