标签 Apache DolphinScheduler 下的文章

本文为《深入理解 Apache DolphinScheduler:从调度原理到 DataOps 实战》系列专栏第 2 篇,从源码与调度模型视角,解析 DolphinScheduler 的核心抽象设计,重点说明 Workflow、TaskDefinition 与实例对象的职责边界,并结合 DAG 示意图解释调度系统如何基于依赖判断驱动复杂任务编排。

上文回顾:调度系统,不只是一个“定时器”

在真正使用 DolphinScheduler 一段时间之后,很多人都会产生一个疑问:为什么系统里会同时存在流程定义、流程实例、任务定义、任务实例这么多对象?是不是“设计过度”了?

如果从源码和调度系统的运行方式来看,答案恰恰相反——这些抽象是为了压住复杂性而被刻意拆开的

Workflow:一张不会“运行”的 DAG 蓝图

在 DolphinScheduler 的设计中,Workflow(源码中对应 ProcessDefinition)从一开始就被定义为纯静态结构

它描述的内容非常克制:流程里有哪些任务、任务之间如何依赖、是否存在条件分支或子流程。这些信息共同组成了一张 DAG,但这张 DAG 永远不会自己执行

从源码角度看,Workflow 更像是一个结构化配置对象,而不是调度对象。

你可以在数据库里看到,它不记录成功、失败、开始时间,也不关心某次运行发生了什么。

这背后其实是一条很重要的设计原则:

结构和执行必须彻底分离,否则状态会污染定义。

DAG 在 DolphinScheduler 中真正解决的是什么问题

在 DolphinScheduler 里,DAG 的职责非常单一:
判断“某个任务现在能不能被调度”

它不关心任务如何执行,也不关心任务执行结果的业务含义,只关心依赖是否满足。

📌 这里放一张 DAG 的 PNG 示意图,展示了一个典型的多父依赖结构。节点是否被调度,并不取决于执行路径的先后顺序,而取决于其所有上游依赖是否已经完成。这正是 DolphinScheduler 在运行时对 DAG 进行动态判断的核心逻辑。

DS DAG

在源码实现中,DAG 会在流程实例启动时被解析成内存结构,用来驱动后续的调度决策。

当某个 TaskInstance 状态发生变化时,调度器并不是“继续往下跑”,而是重新判断 DAG 中哪些节点被解锁了。

这也是为什么 DolphinScheduler 可以天然支持并行、条件分支和失败阻断。

这些能力并不是“写死的逻辑”,而是 DAG 推理的自然结果。

TaskDefinition:任务的“执行模板”

如果说 Workflow 是流程的蓝图,那么 TaskDefinition 就是单个任务的模板

在源码中,TaskDefinition 保存的是“如果这个任务被调度,它应该如何被执行”的信息,比如:

  • 任务类型(Shell、SQL、Spark、Flink 等)
  • 参数、脚本内容
  • 失败策略、超时配置、资源参数

但有一点非常关键:
TaskDefinition 是完全无状态的。

你在 TaskDefinition 里永远看不到“这次执行是否成功”之类的字段,因为这类信息从语义上就不属于“定义”。

这一点在代码中体现得很明显,例如(示意):

public class TaskDefinition {
    private Long id;
    private String name;
    private TaskType taskType;
    private String taskParams;
    private int timeout;
    private int failRetryTimes;
    // 注意:这里没有任何 execution state
}

TaskDefinition 的职责只有一个:描述“怎么跑”,而不是“跑得怎么样”。

流程定义 vs 流程实例:真正的分水岭

理解 DolphinScheduler,绕不开“定义”和“实例”的区别。

当一个 Workflow 被真正触发执行时,系统会基于 Workflow 和 TaskDefinition 复制出一整套运行态对象,也就是:

  • ProcessInstance
  • TaskInstance

ProcessInstance 代表的是:

“这一次流程执行”

TaskInstance 代表的是:

“这一次任务执行”

所有你在 UI 上看到的状态变化、失败重试、运行日志,全部发生在 Instance 层,而不是 Definition 层。

从源码上看,这个边界非常清晰:

public class ProcessInstance {
    private Long id;
    private Long processDefinitionId;
    private ExecutionStatus state;
    private Date startTime;
    private Date endTime;
}
public class TaskInstance {
    private Long id;
    private Long taskDefinitionId;
    private ExecutionStatus state;
    private int retryTimes;
    private Date startTime;
}

定义是可复用的,实例是一次性的。 这正是调度系统能长期稳定运行的关键。

这些抽象如何支撑“复杂编排”

当任务数量上升、流程开始嵌套、失败变得常态化时,如果没有这些抽象拆分,系统很快就会失控。

DolphinScheduler 通过清晰的模型边界,实现了几件非常重要的事情:

  • 同一个 Workflow 可以并发跑多个实例而互不干扰
  • 失败重试只影响 TaskInstance,不污染定义
  • DAG 判断和任务执行彻底解耦
  • 调度逻辑可以围绕“状态迁移”而不是“业务逻辑”展开

从这个角度看,DolphinScheduler 并不是在“管理任务”,而是在管理状态和依赖的演进过程

小结

如果你把 DolphinScheduler 当成一个“高级 Cron”,这些模型看起来确实复杂;但一旦站在系统和源码的视角看,它反而是一套非常克制、非常工程化的设计

下一篇,我们可以继续顺着这套模型往下拆,聊聊:
调度器是如何围绕状态流转运转起来的,以及失败是如何被“消化”的。

已经在虚拟机部署好Apache DolphinScheduler了,想尝试下在Flink新建一个Flink节点,然后用Flink消费Kafka数据。

Apache DolphinScheduler用的是单机部署,具体操作可以参考官方文档:DolphinScheduler | 文档中心(https://dolphinscheduler.apache.org/zh-cn/docs/3.3.2/guide/in...).

  • 前置条件:已经安装Java 11、DolphinScheduler 3.3.2、Flink 1.18.1、Kafka 3.6.0,Zookeeper用Kafka内置的。建议这些安装都下载二进制的安装包到虚拟机安装,用命令安装的不可控,我下载的二进制包如下:

配置好Flink的环境变量

1、编辑环境变量:

sudo vim ~/.bashrc

增加Flink的路径

2、使环境变量生效:

#使环境变量生效
source ~/.bashrc
#查看环境变量
echo $Flink_HOME

修改Kafka、Flink以及DolphinScheduler的配置文件

因为用的是虚拟机,为了让外面的主机能够访问到虚拟机的网络,需要修改下配置文件

  1. 修改Kafka配置:找到Kafka安装包下的config文件夹,修改config下的server.properties文件,修改listeners是为了外面的主机能够访问到虚拟机的Kafka,还有把advertised.listeners改成虚拟机地址,写样例的时候能连上虚拟机的Kafka地址,不然默认连localhost
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
#192.168.146.132修改成虚拟机ip
advertised.listeners=PLAINTEXT://192.168.146.132:9092

  1. 修改Flink配置:找到Flink安装包下的conf文件夹,修改conf下的Flink-conf.yaml文件,把里面所有的localhost地址全部改成0.0.0.0,以便主机能访问到虚拟机的Flink。还有增加jobmanager和taskmanager的内存
jobmanager.rpc.address: 0.0.0.0
jobmanager.bind-host: 0.0.0.0
jobmanager.cpu.cores: 1
jobmanager.memory.process.size: 1600m
taskmanager.bind-host: 0.0.0.0
taskmanager.host: 0.0.0.0
taskmanager.memory.process.size: 2048m
taskmanager.cpu.cores: 1

  1. 修改Apache DolphinScheduler的配置文件,从Apache DolphinScheduler的启动脚本文件dolphinscheduler-daemon.sh可以看出,配置环境变量用的是bin/env文件夹下的dolphinscheduler_env.sh

查看dolphinscheduler-daemon.sh文件:

修改dolphinscheduler_env.sh文件,新增JAVA、Flink路径:

#修改成自己的JAVA、Flink路径
export JAVA_HOME=/data/jdk-11.0.29
export Flink_HOME=/data/Flink-1.18.1

关闭防火墙,启动应用

启动应用,包括Zookeeper、Kafka、Flink以及Apache DolphinScheduler。

#关闭防火墙
sudo systemctl stop firewalld
 
# 在 Flink 根目录下,执行以下命令启动 Flink 集群
bin/start-cluster.sh
 
# 启动 ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
 
# 启动 Kafka 服务器
bin/Kafka-server-start.sh config/server.properties &
 
#创建 Kafka 主题
bin/Kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
 
#使用命令行生产者发送消息
bin/Kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
 
#消费
bin/Kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

# 启动 Standalone Server 服务
bash ./bin/dolphinscheduler-daemon.sh start standalone-server

测试

测试Flink、Apache DolphinScheduler是否能访问成功。

  1. Flink访问地址:http://localhost:8081/,localhost改成自己虚拟机地址

  1. Apache DolphinScheduler访问地址:http://localhost:12345/dolphinscheduler/ui ,localhost改成自己虚拟机地址即可登录系统 UI。默认的用户名和密码是 admin/dolphinscheduler123

编写样例

用Flink消费Kafka数据,然后打包上传到Apache DolphinScheduler,启动Flink任务:

  1. 编写样例:

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.example</groupId>
    <artifactId>Flink-Kafka-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <Flink.version>1.18.1</Flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <Kafka.version>3.6.0</Kafka.version>
    </properties>
 
    <dependencies>
        <!-- Flink核心依赖 -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-java</artifactId>
            <version>${Flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-streaming-java</artifactId>
            <version>${Flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-clients</artifactId>
            <version>${Flink.version}</version>
        </dependency>
 
        <!-- 连接器基础依赖 -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-connector-base</artifactId>
            <version>${Flink.version}</version>
        </dependency>
 
        <!-- Kafka连接器(关键修改点) -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-connector-Kafka</artifactId>
            <version>3.1.0-1.18</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Kafka</groupId>
            <artifactId>Kafka-clients</artifactId>
            <version>${Kafka.version}</version>
        </dependency>
 
        <!-- 日志依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.36</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>
 
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>https://maven.aliyun.com/repository/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>apache-releases</id>
            <url>https://repository.apache.org/content/repositories/releases/</url>
        </repository>
    </repositories>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.Flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

FlinkKafkaConsumerExample.java

import org.apache.Flink.api.common.functions.FlatMapFunction;
import org.apache.Flink.api.java.tuple.Tuple2;
import org.apache.Flink.api.java.utils.ParameterTool;
import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.Flink.streaming.api.datastream.DataStream;
import org.apache.Flink.streaming.api.functions.ProcessFunction;
import org.apache.Flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.Flink.util.Collector;
import org.apache.Flink.streaming.connectors.Kafka.FlinkKafkaConsumer;
import org.apache.Flink.api.common.serialization.SimpleStringSchema;
import org.apache.Kafka.clients.consumer.ConsumerConfig;
import org.apache.Kafka.common.serialization.StringDeserializer;
 
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
 
 
public class FlinkKafkaConsumerExample {
    private static volatile int messageCount = 0;
    private static volatile boolean shouldStop = false;
    public static void main(String[] args) throws Exception {
        // 设置执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        // Kafka 配置
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092"); // Kafka broker 地址
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消费者组
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
        // 创建 Kafka 消费者
        FlinkKafkaConsumer<String> KafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        KafkaConsumer.setStartFromEarliest(); // 从最早的消息开始消费
        DataStream<String> stream = env.addSource(KafkaConsumer);
 
        // 处理数据:分词和计数
        DataStream<Tuple2<String, Integer>> counts = stream
                .flatMap(new Tokenizer())
                .keyBy(value -> value.f0)
                .sum(1);
 
 
        counts.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {
            @Override
            public void invoke(Tuple2<String, Integer> value, Context context) {
                System.out.println(value);
                messageCount++;
 
                // 检查是否达到停止条件
                if (messageCount >= 2 && !shouldStop) {
                    System.out.println("Processed 2 messages, stopping job.");
                    shouldStop = true; // 设置标志位,表示应该停止
                }
            }
        });
 
        // 执行作业并获取 JobClient
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                // 启动作业并获取 JobClient
                org.apache.Flink.core.execution.JobClient jobClient = env.executeAsync("Flink Kafka WordCount");
                System.out.println("Job ID: " + jobClient.getJobID());
 
                // 监测条件并取消作业
                while (!shouldStop) {
                    Thread.sleep(100); // 每100毫秒检查一次
                }
 
                // 达到停止条件时取消作业
                if (shouldStop) {
                    System.out.println("Cancelling the job...");
                    jobClient.cancel().get(); // 取消作业
                }
 
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
 
        // 在主线程中等待作业结束
        future.join(); // 等待作业完成
    }
 
    // Tokenizer 类用于将输入字符串转化为单词
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
 
}
  1. 打包上传到Apache DolphinScheduler

  1. 新建Flink节点,并启动

在Apache DolphinScheduler的任务实例看启动日志:

在虚拟机启动生产者,输出字符串,然后可以在Flink查看输出Kafka生产的消息:

原文链接:https://blog.csdn.net/Analyze_ing/article/details/156940553

最新消息,Apache DolphinScheduler 3.4.0 已正式发布!

本次版本带来了多租户调度隔离、工作流并行性能优化、任务重试与告警机制增强,以及资源管理和日志处理改进。无论是复杂企业业务场景,还是高并发任务调度,3.4.0 都让系统更高效、更可靠、更易用。立即升级,体验全新调度能力!

升级与下载

下载页面(可选择镜像下载):
https://dolphinscheduler.apache.org/zh-cn/download/3.4.0

GitHub Release 页面
https://github.com/apache/dolphinscheduler/releases/tag/3.4.0
升级时建议参考官方文档中的集群升级指南,确保兼容性和配置一致性。

核心功能增强与重要更新

通用 OIDC 认证支持

3.4.0 引入了对 OpenID Connect(OIDC)的通用支持,旨在简化与企业身份认证系统的集成。通过 OIDC,用户可以使用统一的身份提供商(如 Keycloak、Okta 等)进行 SSO 登录,无需额外实现复杂自定义逻辑。这提升了安全性和用户体验,尤其是在多系统联邦登录与统一认证场景中,能够使 DolphinScheduler 更自然地融入企业级认证体系,减少重复配置和验证成本,从而提高登录配置的扩展性和一致性。


(参考图)

gRPC 任务插件支持

本版本新增了 gRPC 任务插件能力,使调度器能够通过原生 gRPC 协议直接与远程服务交互。用户可以将后端微服务暴露的 gRPC 接口作为任务执行目标,无需中间脚本封装。这种方式特别适合微服务生态或跨语言执行场景,通过明确参数契约和高性能通信协议提升任务整合效率,从而减少资源调度延迟、提高任务可靠性。

支持工作流串行策略

实现了 工作流串行执行策略(Workflow Serial Strategy) 的核心逻辑重构,通过引入一个全新的串行命令队列机制(t_ds_serial_command 表及相关 DAO/Mapper),配合一套串行执行协调器(WorkflowSerialCoordinator)及策略处理器,使 DolphinScheduler 能更智能地管理串行类型的工作流(如 SERIAL_WAITSERIAL_PRIORITYSERIAL_DISCARD)。

该设计改进了工作流触发流程的执行类型判断、状态管理、命令队列处理等关键路径,使串行调度逻辑更清晰、更可靠,有助于提升串行工作流场景下的调度稳定性与可控性。同时,3.4.0 重构了触发器与状态机相关代码,增强该能力的可维护性和扩展性。

移除 PyTorch 任务类型

3.4.0 对任务类型体系进行了精简,正式移除了内置的 PyTorch 任务类型。该调整主要基于实际使用情况和长期维护成本的考量,因为原有 PyTorch 任务实现使用率较低,且与调度器核心任务模型耦合度较高,增加了版本演进和兼容性维护的复杂度。通过移除该任务类型,DolphinScheduler 能保持核心架构的简洁与稳定。

我们鼓励用户通过更通用的 Shell、Python 或插件化方式运行 PyTorch 作业,从而提升系统整体的可维护性和扩展性。

稳定性与重要修复

Kubernetes Worker 部署增强

在 Kubernetes 原生部署场景下,3.4.0 使 Worker StatefulSet 的 Helm Chart 支持注入 Secrets 和 InitContainers。通过 Secrets 注入,可以安全传递证书或凭据;InitContainers 允许在主容器启动前完成必要的初始化逻辑,如准备文件系统或校验环境依赖。

这些增强有助于在容器化环境下实现更安全、更一致的部署策略和生命周期管理。

SQL 任务取消能力

针对 SQL 任务类型,本次版本提供了对任务执行取消的原生支持。当执行的 SQL 语句由于逻辑错误或长期运行导致资源占用时,用户可以通过调度器下发取消操作,使任务尽快中止,而不是简单失败或等待超时。这一能力改善了任务控制能力,避免长时间运行对集群资源的无效占用,有助于提升整体资源利用率和执行调度体验。

条件任务节点在前置失败情况下执行逻辑修复

在某些复杂工作流中,当条件任务节点的前置任务失败时,条件节点未按预期执行。3.4.0 修复了这一调度核心逻辑,确保条件节点能够正确响应前置失败状态。这样,工作流分支逻辑能够按照既定 DAG 定义可靠运行,从而避免因逻辑错误导致的流程中断或不一致执行。

ZooKeeper 节点清理问题修复

在使用 ZooKeeper 作为协调组件的高可用部署中,部分用户反馈 Master Server 在启动失败后未正确清理已注册的 failover 节点路径,可能导致后续状态异常。该版本修复了这个问题,使 Master 在异常启动路径中能够正确清理关联注册节点,保持注册中心状态一致,确保高可用场景下集群状态的健康和可靠性。

Worker Group 分配逻辑错误修复

此前版本中,项目与 Worker Group 关联/移除操作可能在 API 层出现逻辑不一致,导致调度器未能正确识别项目与 Worker Group 的关系。本次版本修正了相关逻辑,使 API 行为与用户预期一致,从而改善 Worker 管控、资源隔离和调度分配体验。

此外,3.4.0 版本还进行了很多功能优化和问题修复,包括文档与配置规范完善(时区、安全、负载均衡)、核心调度与注册中心稳定性增强(TraceId、Failover 清理、可重入锁)、性能与资源管理优化(任务组索引)、前端与插件体验改进(日志查询、DataX 校验、文件展示)、依赖与安全更新(PostgreSQL JDBC、Spring Boot CVE 修复)等,篇幅所限不再一一展开,详情可查询完整更新列表:https://github.com/apache/dolphinscheduler/releases/tag/3.4.0

Bug 修复亮点

标记任务为 Inactive 状态逻辑修复

某些生命周期事件中,当任务状态需要被标记为 Inactive 时,状态变更可能未正确触发,导致 UI 和执行引擎状态不一致。此版本修复了这一逻辑,使状态标记与生命周期事件更加一致。

Workflow Lineage 删除逻辑优化

在工作流血缘关系删除操作中,系统可能未能彻底清理相关引用,导致历史血缘链路残留。3.4.0 改进了删除逻辑,使 DolphinScheduler 在删除血缘链时能够更精确地清理对应关系,避免分析后续依赖时出现错误链路。

其他 Bug 修复包括前置任务失败导致条件节点不执行问题修复、项目级 Worker Group 绑定与移除逻辑修正、子工作流触发参数丢失问题修复等,详情请查询完整 Release Note:https://github.com/apache/dolphinscheduler/releases/tag/3.4.0

文档更新

  1. 发布并完善 Apache DolphinScheduler 3.3.2 版本发布说明文档。
  2. 修复文档 CI 构建错误,提升文档发布流程的稳定性。
  3. 补充 Prometheus 指标接口的认证机制及其在 Kubernetes 环境下的使用说明。
  4. 同步更新 JdbcRegistry 引入事务机制后的相关文档描述,保证文档与实际行为一致。

致谢

本次版本发布离不开社区各位贡献者的热情参与与支持。特别感谢 @ Gallardot 作为 3.4.0 的 Release Manager,从版控、构建、候选版验证到最终投票组织,确保发布流程高质量推进。

同时,感谢以下本次版本的所有贡献者(GitHub ID,排名不分先后):

Gallardot、njnu‑seafish、det101、Mrhs121、EinsteinInIct、sanfeng‑lhh、ruanwenjun、tusaryan、qiong‑zhou、SbloodyS、kvermeulen、npofsi、CauliflowerEater、ChaoquanTao、dill21yu、sdhzwc、zhan7236、KwongHing、jmmc‑tools、liunaijie

感谢所有通过提交 PR、Issue、文档贡献、社区讨论、测试验证等方式参与 Apache DolphinScheduler 项目的人。正是你们的努力推进了 DolphinScheduler 的持续演进与社区繁荣,欢迎更多人加入我们的队伍!

最新消息,Apache DolphinScheduler 3.4.0 已正式发布!

本次版本带来了多租户调度隔离、工作流并行性能优化、任务重试与告警机制增强,以及资源管理和日志处理改进。无论是复杂企业业务场景,还是高并发任务调度,3.4.0 都让系统更高效、更可靠、更易用。立即升级,体验全新调度能力!

升级与下载

下载页面(可选择镜像下载):
https://dolphinscheduler.apache.org/zh-cn/download/3.4.0

GitHub Release 页面
https://github.com/apache/dolphinscheduler/releases/tag/3.4.0
升级时建议参考官方文档中的集群升级指南,确保兼容性和配置一致性。

核心功能增强与重要更新

通用 OIDC 认证支持

3.4.0 引入了对 OpenID Connect(OIDC)的通用支持,旨在简化与企业身份认证系统的集成。通过 OIDC,用户可以使用统一的身份提供商(如 Keycloak、Okta 等)进行 SSO 登录,无需额外实现复杂自定义逻辑。这提升了安全性和用户体验,尤其是在多系统联邦登录与统一认证场景中,能够使 DolphinScheduler 更自然地融入企业级认证体系,减少重复配置和验证成本,从而提高登录配置的扩展性和一致性。


(参考图)

gRPC 任务插件支持

本版本新增了 gRPC 任务插件能力,使调度器能够通过原生 gRPC 协议直接与远程服务交互。用户可以将后端微服务暴露的 gRPC 接口作为任务执行目标,无需中间脚本封装。这种方式特别适合微服务生态或跨语言执行场景,通过明确参数契约和高性能通信协议提升任务整合效率,从而减少资源调度延迟、提高任务可靠性。

支持工作流串行策略

实现了 工作流串行执行策略(Workflow Serial Strategy) 的核心逻辑重构,通过引入一个全新的串行命令队列机制(t_ds_serial_command 表及相关 DAO/Mapper),配合一套串行执行协调器(WorkflowSerialCoordinator)及策略处理器,使 DolphinScheduler 能更智能地管理串行类型的工作流(如 SERIAL_WAITSERIAL_PRIORITYSERIAL_DISCARD)。

该设计改进了工作流触发流程的执行类型判断、状态管理、命令队列处理等关键路径,使串行调度逻辑更清晰、更可靠,有助于提升串行工作流场景下的调度稳定性与可控性。同时,3.4.0 重构了触发器与状态机相关代码,增强该能力的可维护性和扩展性。

移除 PyTorch 任务类型

3.4.0 对任务类型体系进行了精简,正式移除了内置的 PyTorch 任务类型。该调整主要基于实际使用情况和长期维护成本的考量,因为原有 PyTorch 任务实现使用率较低,且与调度器核心任务模型耦合度较高,增加了版本演进和兼容性维护的复杂度。通过移除该任务类型,DolphinScheduler 能保持核心架构的简洁与稳定。

我们鼓励用户通过更通用的 Shell、Python 或插件化方式运行 PyTorch 作业,从而提升系统整体的可维护性和扩展性。

稳定性与重要修复

Kubernetes Worker 部署增强

在 Kubernetes 原生部署场景下,3.4.0 使 Worker StatefulSet 的 Helm Chart 支持注入 Secrets 和 InitContainers。通过 Secrets 注入,可以安全传递证书或凭据;InitContainers 允许在主容器启动前完成必要的初始化逻辑,如准备文件系统或校验环境依赖。

这些增强有助于在容器化环境下实现更安全、更一致的部署策略和生命周期管理。

SQL 任务取消能力

针对 SQL 任务类型,本次版本提供了对任务执行取消的原生支持。当执行的 SQL 语句由于逻辑错误或长期运行导致资源占用时,用户可以通过调度器下发取消操作,使任务尽快中止,而不是简单失败或等待超时。这一能力改善了任务控制能力,避免长时间运行对集群资源的无效占用,有助于提升整体资源利用率和执行调度体验。

条件任务节点在前置失败情况下执行逻辑修复

在某些复杂工作流中,当条件任务节点的前置任务失败时,条件节点未按预期执行。3.4.0 修复了这一调度核心逻辑,确保条件节点能够正确响应前置失败状态。这样,工作流分支逻辑能够按照既定 DAG 定义可靠运行,从而避免因逻辑错误导致的流程中断或不一致执行。

ZooKeeper 节点清理问题修复

在使用 ZooKeeper 作为协调组件的高可用部署中,部分用户反馈 Master Server 在启动失败后未正确清理已注册的 failover 节点路径,可能导致后续状态异常。该版本修复了这个问题,使 Master 在异常启动路径中能够正确清理关联注册节点,保持注册中心状态一致,确保高可用场景下集群状态的健康和可靠性。

Worker Group 分配逻辑错误修复

此前版本中,项目与 Worker Group 关联/移除操作可能在 API 层出现逻辑不一致,导致调度器未能正确识别项目与 Worker Group 的关系。本次版本修正了相关逻辑,使 API 行为与用户预期一致,从而改善 Worker 管控、资源隔离和调度分配体验。

此外,3.4.0 版本还进行了很多功能优化和问题修复,包括文档与配置规范完善(时区、安全、负载均衡)、核心调度与注册中心稳定性增强(TraceId、Failover 清理、可重入锁)、性能与资源管理优化(任务组索引)、前端与插件体验改进(日志查询、DataX 校验、文件展示)、依赖与安全更新(PostgreSQL JDBC、Spring Boot CVE 修复)等,篇幅所限不再一一展开,详情可查询完整更新列表:https://github.com/apache/dolphinscheduler/releases/tag/3.4.0

Bug 修复亮点

标记任务为 Inactive 状态逻辑修复

某些生命周期事件中,当任务状态需要被标记为 Inactive 时,状态变更可能未正确触发,导致 UI 和执行引擎状态不一致。此版本修复了这一逻辑,使状态标记与生命周期事件更加一致。

Workflow Lineage 删除逻辑优化

在工作流血缘关系删除操作中,系统可能未能彻底清理相关引用,导致历史血缘链路残留。3.4.0 改进了删除逻辑,使 DolphinScheduler 在删除血缘链时能够更精确地清理对应关系,避免分析后续依赖时出现错误链路。

其他 Bug 修复包括前置任务失败导致条件节点不执行问题修复、项目级 Worker Group 绑定与移除逻辑修正、子工作流触发参数丢失问题修复等,详情请查询完整 Release Note:https://github.com/apache/dolphinscheduler/releases/tag/3.4.0

文档更新

  1. 发布并完善 Apache DolphinScheduler 3.3.2 版本发布说明文档。
  2. 修复文档 CI 构建错误,提升文档发布流程的稳定性。
  3. 补充 Prometheus 指标接口的认证机制及其在 Kubernetes 环境下的使用说明。
  4. 同步更新 JdbcRegistry 引入事务机制后的相关文档描述,保证文档与实际行为一致。

致谢

本次版本发布离不开社区各位贡献者的热情参与与支持。特别感谢 @ Gallardot 作为 3.4.0 的 Release Manager,从版控、构建、候选版验证到最终投票组织,确保发布流程高质量推进。

同时,感谢以下本次版本的所有贡献者(GitHub ID,排名不分先后):

Gallardot、njnu‑seafish、det101、Mrhs121、EinsteinInIct、sanfeng‑lhh、ruanwenjun、tusaryan、qiong‑zhou、SbloodyS、kvermeulen、npofsi、CauliflowerEater、ChaoquanTao、dill21yu、sdhzwc、zhan7236、KwongHing、jmmc‑tools、liunaijie

感谢所有通过提交 PR、Issue、文档贡献、社区讨论、测试验证等方式参与 Apache DolphinScheduler 项目的人。正是你们的努力推进了 DolphinScheduler 的持续演进与社区繁荣,欢迎更多人加入我们的队伍!