深度探秘 Apache DolphinScheduler 数据库模式
本文将深入介绍 Apache DolphinScheduler 所采用的数据库模式,此模式主要用于持久化存储工作流定义、执行状态、调度信息以及系统元数据。它具备广泛的兼容性,可支持 MySQL、PostgreSQL 和 H2 等多种数据库,其具体定义存储在 DolphinScheduler 的数据库模式分为七个主要功能组: DolphinScheduler 严格区分定义(模板)和实例(执行)。这实现了版本控制、并发执行和审计跟踪。 关键设计原则: 工作流模板的主表。 索引: 存储工作流定义所有版本的审计日志。 镜像 可在工作流中重用的任务模板。 通过指定任务之间的边来定义 DAG 结构。 注意: 工作流的运行时执行记录。 索引: 单个任务的运行时执行记录。 索引: 处理流程: DolphinScheduler 使用复杂的版本控制系统,支持: DolphinScheduler 集成了 Quartz 调度程序以实现基于 cron 的调度。模式包括标准 Quartz 表以及一个映射表。 Quartz 表要点: 存储 SQL 任务的数据库连接配置。 约束: 注意:此表在模式中已标记为弃用。资源元数据正在迁移到单独的存储后端。 默认租户:系统创建一个默认租户, 对于不使用 ZooKeeper 的部署,DolphinScheduler 提供基于 JDBC 的注册表用于服务协调。 存储类似于 ZooKeeper 节点的注册表项。 实现分布式锁。 跟踪活动客户端以清理临时数据。 清理逻辑:当客户端的心跳过期时,其临时注册表数据和锁将自动删除。 由工作流/任务失败或完成生成的告警记录。 索引: 告警通道组。 该模式包含针对常见查询模式精心设计的索引: 在数据库级别强制执行的关键业务规则: DolphinScheduler 在 数据库访问通过 大多数操作使用 Spring 的 系统使用 HikariCP 进行连接池,在 
dolphinscheduler - dao/src/main/resources/sql 目录下。模式架构
组 目的 关键表 工作流管理 存储带有版本控制的工作流和任务定义 t_ds_workflow_definition、t_ds_task_definition、t_ds_workflow_task_relation执行状态 跟踪运行时实例及其状态 t_ds_workflow_instance、t_ds_task_instance、t_ds_command调度 通过 Quartz 管理基于 cron 的调度 t_ds_schedules、QRTZ_* 表资源管理 数据源、文件和 UDF 元数据 t_ds_datasource、t_ds_resources、t_ds_udfs管理 用户、租户、项目和权限 t_ds_user、t_ds_tenant、t_ds_project告警 告警配置和历史记录 t_ds_alert、t_ds_alertgroup服务注册 基于 JDBC 的协调(ZooKeeper 的替代方案) t_ds_jdbc_registry_* 表工作流和任务定义模型
定义与实例分离

核心表参考
工作流定义表
t_ds_workflow_definition列 类型 描述 id int 自动递增主键 code bigint 唯一工作流标识符(跨版本稳定) version int 版本号(默认 1) name varchar(255) 工作流名称 project_code bigint 所属项目 release_state tinyint 0 = 离线,1 = 在线 global_params text JSON 格式的全局参数 execution_type tinyint 0 = 并行,1 = 串行等待,2 = 串行丢弃,3 = 串行优先级 timeout int 超时时间(分钟) user_id int 创建者用户 ID UNIQUE KEY workflow_unique (name, project_code)UNIQUE KEY uniq_workflow_definition_code (code)KEY idx_project_code (project_code)t_ds_workflow_definition_logt_ds_workflow_definition 的结构,额外列:operator、operate_time,主键:(code, version)。t_ds_task_definition列 类型 描述 code bigint 唯一任务标识符 version int 版本号 task_type varchar(50) Shell、SQL、Python、Spark 等 task_params longtext JSON 格式的任务配置 worker_group varchar(255) 目标工作线程组 fail_retry_times int 失败重试次数 fail_retry_interval int 重试间隔(分钟) timeout int 任务超时时间(分钟) cpu_quota int CPU 限制(-1 = 无限制) memory_max int 内存限制(MB,-1 = 无限制) t_ds_workflow_task_relation列 类型 描述 workflow_definition_code bigint 父工作流 workflow_definition_version int 工作流版本 pre_task_code bigint 前置任务(根节点为 0) post_task_code bigint 后置任务 condition_type tinyint 0 = 无,1 = 判断,2 = 延迟 condition_params text JSON 格式的条件配置 pre_task_code = 0 表示根节点(无前驱任务)。执行状态表
t_ds_workflow_instance列 类型 描述 id int 主键 workflow_definition_code bigint 引用定义 workflow_definition_version int 本次执行锁定的版本 state tinyint 0 = 提交,1 = 运行中,2 = 暂停准备,3 = 已暂停,4 = 停止准备,5 = 已停止,6 = 失败,7 = 成功,8 = 需要容错,9 = 已终止,10 = 等待,11 = 等待依赖 state_history text 状态转换日志 start_time datetime 执行开始时间 end_time datetime 执行结束时间 command_type tinyint 0 = 开始,1 = 从当前开始,2 = 恢复,3 = 恢复暂停,4 = 从失败处开始,5 = 补充,6 = 调度,7 = 重新运行,8 = 暂停,9 = 停止,10 = 恢复等待 host varchar(135) 执行此工作流的主服务器主机 executor_id int 触发执行的用户 tenant_code varchar(64) 用于资源隔离的租户 next_workflow_instance_id int 用于串行执行模式 KEY workflow_instance_index (workflow_definition_code, id)KEY start_time_index (start_time, end_time)t_ds_task_instance列 类型 描述 id int 主键 task_code bigint 引用任务定义 task_definition_version int 锁定的版本 workflow_instance_id int 父工作流实例 state tinyint 与 workflow_instance 相同的状态值submit_time datetime 提交到队列的时间 start_time datetime 实际执行开始时间 end_time datetime 执行结束时间 host varchar(135) 执行任务的工作线程主机 execute_path varchar(200) 工作线程上的工作目录 log_path text 日志文件路径 retry_times int 当前重试次数 var_pool text 供下游任务使用的变量 KEY idx_task_instance_code_version (task_code, task_definition_version)命令模式与工作流执行
命令队列
t_ds_command 表实现了基于队列的执行模型,其中命令触发工作流实例。
t_ds_command 结构列 类型 描述 command_type tinyint 0 = 开始,1 = 从当前开始,2 = 恢复,3 = 恢复暂停,4 = 从失败处开始,5 = 补充,6 = 调度,7 = 重新运行,8 = 暂停,9 = 停止 workflow_definition_code bigint 目标工作流 workflow_instance_id int 用于恢复/重新执行操作 workflow_instance_priority int 0 = 最高,1 = 高,2 = 中,3 = 低,4 = 最低 command_param text JSON 格式的执行参数 worker_group varchar(255) 目标工作线程组 tenant_code varchar(64) 执行的租户 dry_run tinyint 0 = 正常,1 = 试运行(无实际执行) t_ds_command。MasterSchedulerThread 持续扫描该表(按优先级、id 排序)。t_ds_workflow_instance 记录。t_ds_task_instance 记录。t_ds_error_command。版本控制系统
基于代码的版本控制模型

版本管理规则
t_ds_workflow_definition 和 t_ds_task_definition 中。*_log 表中,具有 UNIQUE KEY (code, version)。release_state = 1(在线)。调度体系架构
Quartz 集成

t_ds_schedules列 类型 描述 workflow_definition_code bigint 目标工作流(唯一) start_time datetime 调度活动开始时间 end_time datetime 调度活动结束时间 timezone_id varchar(40) cron 表达式的时区 crontab varchar(255) cron 表达式 release_state int 0 = 离线,1 = 在线 failure_strategy int 失败时的行为 workflow_instance_priority int 实例的默认优先级 QRTZ_TRIGGERS.NEXT_FIRE_TIME:已索引,便于高效扫描。QRTZ_CRON_TRIGGERS.CRON_EXPRESSION:解析后的 cron 定义。QRTZ_SCHEDULER_STATE:跟踪 Quartz 调度程序实例。资源和配置表
数据源管理
t_ds_datasource列 类型 描述 name varchar(64) 数据源名称 type tinyint 数据库类型(MySQL、PostgreSQL、Hive 等) connection_params text JSON 格式的连接配置(主机、端口、数据库、凭据) user_id int 所有者用户 UNIQUE KEY (name, type) - 防止数据源重复。文件资源
t_ds_resources(已弃用)列 类型 描述 full_name varchar(128) 包括租户的完整路径 type int 文件类型(文件/UDF) size bigint 文件大小(字节) is_directory boolean 目录标志 pid int 父目录 ID 多租户与管理
项目、用户和租户层次结构

关键管理表
t_ds_tenant列 类型 描述 tenant_code varchar(64) 唯一租户标识符(唯一) queue_id int 任务的默认 YARN 队列 description varchar(255) 租户描述 id = -1,tenant_code = 'default'。t_ds_user列 类型 描述 user_name varchar(64) 登录用户名(唯一) user_password varchar(64) 哈希密码 user_type tinyint 0 = 普通用户,1 = 管理员 tenant_id int 关联的租户(默认 -1) email varchar(64) 电子邮件地址 state tinyint 0 = 禁用,1 = 启用 t_ds_project列 类型 描述 code bigint 唯一项目代码(唯一) name varchar(255) 项目名称(唯一) user_id int 创建者/所有者 description varchar(255) 项目描述 JDBC 注册表

注册表详情
t_ds_jdbc_registry_data列 类型 描述 data_key varchar(256) 类似路径的键(唯一) data_value text 序列化数据 data_type varchar(64) EPHEMERAL(客户端断开连接时删除)或 PERSISTENTclient_id bigint 所属客户端 列 类型 描述 last_update_time timestamp 上次修改时间 t_ds_jdbc_registry_lock列 类型 描述 lock_key varchar(256) 锁标识符(唯一) lock_owner varchar(256) 持有锁的客户端(格式:ip_processId) client_id bigint 所属客户端 t_ds_jdbc_registry_client_heartbeat列 类型 描述 id bigint 客户端 ID(主键) client_name varchar(256) 客户端标识符 last_heartbeat_time bigint 上次心跳时间戳 connection_config text 连接元数据 告警系统
告警表

t_ds_alert列 类型 描述 title varchar(512) 告警标题 sign char(40) 内容的 SHA1 哈希值(用于去重) content text 告警消息正文 alert_status tinyint 0 = 等待,1 = 成功,2 = 失败 warning_type tinyint 1 = 工作流成功,2 = 工作流/任务失败 workflow_instance_id int 源工作流实例 alertgroup_id int 目标告警组 KEY idx_sign (sign) - 实现去重。t_ds_alertgroup列 类型 描述 group_name varchar(255) 唯一组名 alert_instance_ids varchar(255) 逗号分隔的插件实例 ID description varchar(255) 组描述 索引与查询优化
关键索引
- 按定义查询工作流实例:
`KEY workflow_instance_index (workflow_definition_code, id)`
- 按定义查询任务实例:
`KEY idx_task_instance_code_version (task_code, task_definition_version)`
- 用于监控的时间范围查询*:
`KEY start_time_index (start_time, end_time)`基于优先级的命令扫描:
`KEY priority_id_index (workflow_instance_priority, id)`- 正向和反向 DAG 遍历:
`KEY idx_pre_task_code_version (pre_task_code, pre_task_version)`
正向和反向 DAG 遍历:
`KEY idx_post_task_code_version (post_task_code, post_task_version)`
`KEY idx_code (project_code, workflow_definition_code)`唯一约束
表 约束 目的 t_ds_workflow_definitionUNIQUE (name, project_code)项目中无重复的工作流名称 t_ds_workflow_definitionUNIQUE (code)全局工作流标识符 t_ds_workflow_definition_logUNIQUE (code, version)每个版本一条记录 t_ds_datasourceUNIQUE (name, type)每种类型无重复的数据源名称 t_ds_schedulesUNIQUE (workflow_definition_code)每个工作流一个调度 模式演变与升级
dolphinscheduler - dao/src/main/resources/sql/upgrade 中维护用于跨版本模式迁移的升级脚本。近期模式变更
3.3.0 变更
t_ds_dq_*)。3.2.0 变更
execution_type(并行/串行模式)。next_workflow_instance_id。tenant_code。t_ds_project_parameter 和 t_ds_project_preference。数据库交互模式
服务层访问
dolphinscheduler - dao 中的 DAO 层进行抽象。
关键服务类:ProcessService:工作流/任务定义和实例的 CRUD 操作。CommandService:命令队列管理。ProjectService:项目和权限管理。ResourcesService:资源元数据操作。事务管理
@Transactional 注解实现:连接池
application.yaml 中配置: