标签 ZooKeeper 下的文章

已经在虚拟机部署好Apache DolphinScheduler了,想尝试下在Flink新建一个Flink节点,然后用Flink消费Kafka数据。

Apache DolphinScheduler用的是单机部署,具体操作可以参考官方文档:DolphinScheduler | 文档中心(https://dolphinscheduler.apache.org/zh-cn/docs/3.3.2/guide/in...).

  • 前置条件:已经安装Java 11、DolphinScheduler 3.3.2、Flink 1.18.1、Kafka 3.6.0,Zookeeper用Kafka内置的。建议这些安装都下载二进制的安装包到虚拟机安装,用命令安装的不可控,我下载的二进制包如下:

配置好Flink的环境变量

1、编辑环境变量:

sudo vim ~/.bashrc

增加Flink的路径

2、使环境变量生效:

#使环境变量生效
source ~/.bashrc
#查看环境变量
echo $Flink_HOME

修改Kafka、Flink以及DolphinScheduler的配置文件

因为用的是虚拟机,为了让外面的主机能够访问到虚拟机的网络,需要修改下配置文件

  1. 修改Kafka配置:找到Kafka安装包下的config文件夹,修改config下的server.properties文件,修改listeners是为了外面的主机能够访问到虚拟机的Kafka,还有把advertised.listeners改成虚拟机地址,写样例的时候能连上虚拟机的Kafka地址,不然默认连localhost
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
#192.168.146.132修改成虚拟机ip
advertised.listeners=PLAINTEXT://192.168.146.132:9092

  1. 修改Flink配置:找到Flink安装包下的conf文件夹,修改conf下的Flink-conf.yaml文件,把里面所有的localhost地址全部改成0.0.0.0,以便主机能访问到虚拟机的Flink。还有增加jobmanager和taskmanager的内存
jobmanager.rpc.address: 0.0.0.0
jobmanager.bind-host: 0.0.0.0
jobmanager.cpu.cores: 1
jobmanager.memory.process.size: 1600m
taskmanager.bind-host: 0.0.0.0
taskmanager.host: 0.0.0.0
taskmanager.memory.process.size: 2048m
taskmanager.cpu.cores: 1

  1. 修改Apache DolphinScheduler的配置文件,从Apache DolphinScheduler的启动脚本文件dolphinscheduler-daemon.sh可以看出,配置环境变量用的是bin/env文件夹下的dolphinscheduler_env.sh

查看dolphinscheduler-daemon.sh文件:

修改dolphinscheduler_env.sh文件,新增JAVA、Flink路径:

#修改成自己的JAVA、Flink路径
export JAVA_HOME=/data/jdk-11.0.29
export Flink_HOME=/data/Flink-1.18.1

关闭防火墙,启动应用

启动应用,包括Zookeeper、Kafka、Flink以及Apache DolphinScheduler。

#关闭防火墙
sudo systemctl stop firewalld
 
# 在 Flink 根目录下,执行以下命令启动 Flink 集群
bin/start-cluster.sh
 
# 启动 ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
 
# 启动 Kafka 服务器
bin/Kafka-server-start.sh config/server.properties &
 
#创建 Kafka 主题
bin/Kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
 
#使用命令行生产者发送消息
bin/Kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
 
#消费
bin/Kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

# 启动 Standalone Server 服务
bash ./bin/dolphinscheduler-daemon.sh start standalone-server

测试

测试Flink、Apache DolphinScheduler是否能访问成功。

  1. Flink访问地址:http://localhost:8081/,localhost改成自己虚拟机地址

  1. Apache DolphinScheduler访问地址:http://localhost:12345/dolphinscheduler/ui ,localhost改成自己虚拟机地址即可登录系统 UI。默认的用户名和密码是 admin/dolphinscheduler123

编写样例

用Flink消费Kafka数据,然后打包上传到Apache DolphinScheduler,启动Flink任务:

  1. 编写样例:

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.example</groupId>
    <artifactId>Flink-Kafka-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <Flink.version>1.18.1</Flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <Kafka.version>3.6.0</Kafka.version>
    </properties>
 
    <dependencies>
        <!-- Flink核心依赖 -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-java</artifactId>
            <version>${Flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-streaming-java</artifactId>
            <version>${Flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-clients</artifactId>
            <version>${Flink.version}</version>
        </dependency>
 
        <!-- 连接器基础依赖 -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-connector-base</artifactId>
            <version>${Flink.version}</version>
        </dependency>
 
        <!-- Kafka连接器(关键修改点) -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-connector-Kafka</artifactId>
            <version>3.1.0-1.18</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Kafka</groupId>
            <artifactId>Kafka-clients</artifactId>
            <version>${Kafka.version}</version>
        </dependency>
 
        <!-- 日志依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.36</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>
 
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>https://maven.aliyun.com/repository/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>apache-releases</id>
            <url>https://repository.apache.org/content/repositories/releases/</url>
        </repository>
    </repositories>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.Flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

FlinkKafkaConsumerExample.java

import org.apache.Flink.api.common.functions.FlatMapFunction;
import org.apache.Flink.api.java.tuple.Tuple2;
import org.apache.Flink.api.java.utils.ParameterTool;
import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.Flink.streaming.api.datastream.DataStream;
import org.apache.Flink.streaming.api.functions.ProcessFunction;
import org.apache.Flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.Flink.util.Collector;
import org.apache.Flink.streaming.connectors.Kafka.FlinkKafkaConsumer;
import org.apache.Flink.api.common.serialization.SimpleStringSchema;
import org.apache.Kafka.clients.consumer.ConsumerConfig;
import org.apache.Kafka.common.serialization.StringDeserializer;
 
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
 
 
public class FlinkKafkaConsumerExample {
    private static volatile int messageCount = 0;
    private static volatile boolean shouldStop = false;
    public static void main(String[] args) throws Exception {
        // 设置执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        // Kafka 配置
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092"); // Kafka broker 地址
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消费者组
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
        // 创建 Kafka 消费者
        FlinkKafkaConsumer<String> KafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        KafkaConsumer.setStartFromEarliest(); // 从最早的消息开始消费
        DataStream<String> stream = env.addSource(KafkaConsumer);
 
        // 处理数据:分词和计数
        DataStream<Tuple2<String, Integer>> counts = stream
                .flatMap(new Tokenizer())
                .keyBy(value -> value.f0)
                .sum(1);
 
 
        counts.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {
            @Override
            public void invoke(Tuple2<String, Integer> value, Context context) {
                System.out.println(value);
                messageCount++;
 
                // 检查是否达到停止条件
                if (messageCount >= 2 && !shouldStop) {
                    System.out.println("Processed 2 messages, stopping job.");
                    shouldStop = true; // 设置标志位,表示应该停止
                }
            }
        });
 
        // 执行作业并获取 JobClient
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                // 启动作业并获取 JobClient
                org.apache.Flink.core.execution.JobClient jobClient = env.executeAsync("Flink Kafka WordCount");
                System.out.println("Job ID: " + jobClient.getJobID());
 
                // 监测条件并取消作业
                while (!shouldStop) {
                    Thread.sleep(100); // 每100毫秒检查一次
                }
 
                // 达到停止条件时取消作业
                if (shouldStop) {
                    System.out.println("Cancelling the job...");
                    jobClient.cancel().get(); // 取消作业
                }
 
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
 
        // 在主线程中等待作业结束
        future.join(); // 等待作业完成
    }
 
    // Tokenizer 类用于将输入字符串转化为单词
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
 
}
  1. 打包上传到Apache DolphinScheduler

  1. 新建Flink节点,并启动

在Apache DolphinScheduler的任务实例看启动日志:

在虚拟机启动生产者,输出字符串,然后可以在Flink查看输出Kafka生产的消息:

原文链接:https://blog.csdn.net/Analyze_ing/article/details/156940553

写在前面,本人目前处于求职中,如有合适内推岗位,请加:lpshiyue 感谢。同时还望大家一键三连,赚点奶粉钱。本系列已完结,完整版阅读课联系本人

高可用不是简单的冗余堆砌,而是无状态化、水平扩展与故障转移三者协同的艺术品

在掌握了系统压测方法论,能够准确评估系统容量边界后,我们面临一个更根本的挑战:如何让系统在真实流量冲击和故障发生时保持稳定?高可用架构设计正是解决这一挑战的核心手段。本文将深入解析无状态化、水平扩展与故障转移三大支柱技术的协同设计,帮助构建真正弹性可靠的系统架构。

1 高可用的本质:从故障避免到故障容忍的哲学转变

1.1 高可用性的核心价值重估

传统观念中,高可用意味着尽可能避免故障,而在分布式系统环境下,这一理念已转变为快速发现和恢复故障。根据Gartner的统计,企业IT系统平均每分钟的宕机成本超过5600美元,对于大型电商平台,这个数字可能达到数万美元。

高可用设计的哲学转变体现在三个层面:

  • 从完美预防到快速恢复:接受故障必然性,专注于最小化MTTR(平均修复时间)
  • 从单体坚固到分布式韧性:通过系统设计而非组件质量保证可用性
  • 从人工干预到自动化愈合:建立系统自愈能力,减少人工依赖

这种转变使我们需要重新定义高可用的成功标准:不是追求100%无故障,而是确保故障发生时业务影响可控、恢复过程自动

1.2 可用性等级的理性定位

不同业务场景对可用性有不同要求,理性定位是避免过度设计的第一步:

99.9%可用性(年停机时间≤8.76小时)适合内部管理系统
99.95%可用性(年停机时间≤4.38小时)适合一般业务系统
99.99%可用性(年停机时间≤52.6分钟)适合核心业务系统
99.999%可用性(年停机时间≤5.26分钟)适合金融交易系统

确立合理的可用性目标后,我们才能有针对性地选择技术方案,在成本与可靠性间找到平衡点。

2 无状态化:弹性架构的基石

2.1 无状态设计的本质与价值

无状态化不是简单去除会话数据,而是将状态与计算分离,使应用实例变得可替代。这种分离是水平扩展和故障转移的基础。

有状态架构的典型问题

// 问题示例:会话绑定导致扩展困难
@RestController
public class StatefulController {
    // 会话状态存储在内存中
    private Map<String, UserSession> userSessions = new ConcurrentHashMap<>();
    
    @GetMapping("/userinfo")
    public String getUserInfo(HttpSession session) {
        UserSession userSession = (UserSession) session.getAttribute("currentUser");
        // 此实例绑定特定用户会话,无法随意替换
        return userSession.getUserInfo();
    }
}

状态内嵌导致实例不可替换

无状态化改造方案

@Configuration
@EnableRedisHttpSession // 启用Redis会话存储
public class StatelessConfig {
    // 会话外部化配置
}

@RestController
public class StatelessUserController {
    @GetMapping("/userinfo")
    public String getUserInfo(@RequestHeader("Authorization") String token) {
        // 从Redis获取用户信息,不依赖本地状态
        String userJson = redisTemplate.opsForValue().get("session:" + token);
        User user = JsonUtil.fromJson(userJson, User.class);
        return user.toString();
    }
}

状态外置使实例可任意替换

2.2 无状态化的多层次实践

无状态化需要在不同层级实施协同策略:

应用层无状态:会话数据外部化到专用存储(Redis Cluster)
服务层无状态:API设计保证请求自包含,不依赖服务实例内存状态
任务层无状态:计算任务参数和结果完全自包含,支持任意重调度

无状态设计的业务适配策略

  • 完全无状态:适合查询类、计算型业务(商品查询、价格计算)
  • 外部状态:适合需要会话保持但无需实例绑定的业务(用户登录状态)
  • 轻量状态:适合短暂业务流程,状态生命周期与请求周期一致

2.3 无状态架构的代价与应对

无状态化不是银弹,需要认识其代价并制定应对策略:

性能代价:状态外部化增加网络开销,需要通过缓存、批处理优化
一致性挑战:分布式状态需要处理并发更新,采用乐观锁或版本控制
复杂度增加:需要引入额外组件(Redis、ZooKeeper),增加运维复杂度

合理的无状态化是有选择的无状态,而非盲目去除所有状态。核心是确保实例可替换性,而非完全消除状态。

3 水平扩展:流量压力的分布式化解

3.1 水平扩展的本质与架构前提

水平扩展通过增加实例数量而非提升单机性能来应对流量增长,其有效性直接依赖于无状态化程度。

水平扩展的架构前提

  • 无状态设计:实例间无数据依赖,可任意增减
  • 负载均衡:流量按策略分发到多个实例
  • 服务发现:动态感知实例上下线,实时更新路由
  • 健康检查:自动隔离故障实例,保证流量只会到达健康节点

3.2 分层扩展策略

系统不同层级需要采用不同的水平扩展策略:

接入层扩展:通过DNS轮询、全局负载均衡实现流量入口扩展

# Nginx上游服务配置示例
upstream backend_servers {
    server 10.0.1.10:8080 max_fails=3 fail_timeout=30s;
    server 10.0.1.11:8080 max_fails=3 fail_timeout=30s;
    server 10.0.1.12:8080 backup;  # 备份节点
    least_conn;  # 最少连接负载均衡
}

接入层通过集群化实现扩展

应用层扩展:无状态服务实例水平扩展,结合自动伸缩策略

# Kubernetes HPA配置示例
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: frontend-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: frontend
  minReplicas: 3
  maxReplicas: 100
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70

应用层根据负载自动伸缩

数据层扩展:通过分片、读写分离等技术实现数据访问扩展

-- 数据库分片示例:用户数据按ID分片
-- 分片1:用户ID以0-4结尾
CREATE TABLE users_1 (
    id BIGINT PRIMARY KEY,
    name VARCHAR(100),
    -- 其他字段
);

-- 分片2:用户ID以5-9结尾  
CREATE TABLE users_2 (
    id BIGINT PRIMARY KEY,
    name VARCHAR(100),
    -- 其他字段
);

数据层通过分片实现水平扩展

3.3 水平扩展的粒度控制

科学的水平扩展需要精细化粒度控制,避免过度或不足扩展:

单元化扩展:按业务单元而非整体系统进行扩展,如用户服务独立于订单服务扩展
弹性伸缩:基于预测和实时指标动态调整实例数量,平衡性能与成本
分级扩展:核心服务与非核心服务差异化扩展策略,确保关键业务资源

4 故障转移:从被动应对到主动容错

4.1 故障检测:快速发现的艺术

有效的故障转移始于精准的故障检测,需要在及时性与准确性间找到平衡:

多层次健康检查策略

# Kubernetes就绪与存活探针配置
apiVersion: v1
kind: Pod
metadata:
  name: web-application
spec:
  containers:
  - name: web
    image: nginx:latest
    livenessProbe:
      httpGet:
        path: /health
        port: 8080
      initialDelaySeconds: 30
      periodSeconds: 10
      timeoutSeconds: 5
      failureThreshold: 3
    readinessProbe:
      httpGet:
        path: /ready  
        port: 8080
      initialDelaySeconds: 5
      periodSeconds: 5
      timeoutSeconds: 3
      failureThreshold: 1

通过探针机制实现精准故障检测

智能故障判定:结合多个指标(响应时间、错误率、资源使用率)综合判断,避免单指标误判。

4.2 故障隔离:防止雪崩的屏障

故障转移不仅是将流量从故障实例移走,更重要的是隔离故障影响

熔断器模式:在连续失败达到阈值时自动熔断,避免重试风暴

@Component
public class ProductService {
    @CircuitBreaker(name = "productService", 
                   fallbackMethod = "getProductFallback")
    public Product getProduct(Long productId) {
        return remoteProductService.getProduct(productId);
    }
    
    public Product getProductFallback(Long productId, Exception ex) {
        return cacheService.getBasicProduct(productId);
    }
}

熔断器防止故障扩散

隔离策略

  • 线程池隔离:不同服务使用独立线程池,避免资源竞争
  • 信号量隔离:控制并发调用数,防止资源耗尽
  • 超时控制:设置合理超时时间,避免长时间阻塞
  • 限流降级:流量超过阈值时自动降级,保护系统不被冲垮

4.3 流量切换:无缝转移的技术实现

故障转移的核心是流量重路由,需要在不同层级实现协同:

负载均衡器切换:健康检查失败时自动从路由表中移除故障节点

upstream backend {
    server 10.0.1.10:8080 max_fails=3 fail_timeout=30s;
    server 10.0.1.11:8080 max_fails=3 fail_timeout=30s;
    server 10.0.1.12:8080 backup;
    
    # 故障转移配置
    proxy_next_upstream error timeout http_500 http_502 http_503;
}

负载均衡器实现自动故障转移

服务网格流量管理:基于Istio等服务网格实现细粒度流量控制

apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
  name: product-service
spec:
  host: product-service
  trafficPolicy:
    outlierDetection:
      consecutiveErrors: 5
      interval: 10s
      baseEjectionTime: 30s
      maxEjectionPercent: 50

服务网格提供高级故障检测与转移能力

5 三大支柱的协同设计

5.1 协同工作的架构模式

无状态化、水平扩展与故障转移不是孤立技术,而是相互依赖的有机整体:

无状态化赋能水平扩展:只有无状态设计,才能实现真正的无缝水平扩展
水平扩展增强故障转移:多实例为故障转移提供目标节点,使转移成为可能
故障转移保障水平扩展:在扩展过程中,故障转移确保个别实例故障不影响整体

协同架构示例

用户请求 → 负载均衡器(故障检测/转移)
                   ↓
           无状态应用集群(水平扩展)
                   ↓  
          集中式状态存储(Redis集群)
                   ↓
          数据存储层(分片/主从)

5.2 协同设计的反模式与陷阱

伪无状态陷阱:表面无状态但实际存在隐性状态依赖(如本地缓存、文件存储)
不平衡扩展:计算层扩展但数据层成为瓶颈,或相反
过度转移:过于敏感的故障检测导致频繁转移,反而影响稳定性
单点转移:故障转移机制本身存在单点故障

5.3 协同效能的度量体系

三大支柱的协同效果需要可度量的指标验证:

无状态化程度指标

  • 实例启动时间(应小于30秒)
  • 请求路由一致性(任意实例处理结果相同)
  • 状态外部化比例(超过90%状态外部化)

水平扩展效能指标

  • 线性扩展比(实例增加与性能提升比例)
  • 扩展速度(从触发到完成扩展的时间)
  • 资源利用率(避免过度或不足扩展)

故障转移质量指标

  • 故障检测时间(秒级检测)
  • 转移恢复时间(分钟级恢复)
  • 转移成功率(超过99%的转移成功)

6 实战案例:电商平台高可用架构演进

6.1 单体架构的高可用改造

初始状态:单体应用,会话绑定,数据库单点

改造步骤

  1. 无状态化改造:用户会话外置到Redis集群
  2. 水平扩展准备:应用容器化,配置负载均衡
  3. 故障转移基础:数据库主从分离,读写分离
  4. 渐进式迁移:先读流量,后写流量;先非核心功能,后核心功能

改造效果:可用性从99.9%提升至99.95%,扩展时间从小时级降至分钟级

6.2 微服务架构的高可用深化

架构特点:服务拆分,分布式依赖,复杂调用链

深化措施

  • 精细化无状态:API网关无状态化,业务服务按需无状态
  • 弹性扩展策略:基于业务优先级差异化扩展策略
  • 智能故障转移:基于调用链分析的精准故障定位和隔离

深化效果:可用性提升至99.99%,故障恢复时间从30分钟降至5分钟以内

总结

高可用架构的本质是通过无状态化、水平扩展、故障转移三大支柱的协同设计,构建能够容忍故障、快速恢复的弹性系统。

核心洞察

  1. 无状态化是基础:只有解耦状态与计算,才能实现真正的弹性
  2. 水平扩展是手段:通过分布式架构将集中式风险分解为可管理单元
  3. 故障转移是保障:在故障发生时快速隔离和恢复,最小化业务影响
  4. 协同设计是关键:三大支柱必须统一设计,相互配合,而非孤立优化

成功的高可用架构不是追求零故障,而是确保在故障发生时:

  • 系统能够快速检测定位问题
  • 故障影响被有效隔离,防止扩散
  • 业务流量被无缝转移到健康实例
  • 系统能够自动恢复,减少人工干预

在云原生时代,随着Kubernetes、服务网格等技术的成熟,高可用能力已经日益平台化、标准化。然而,技术选型只是起点,真正的挑战在于根据业务特点合理运用这些能力,构建既可靠又经济的高可用体系。


📚 下篇预告
《CDN与边缘缓存策略——静态、动态与签名鉴权的组合拳》—— 我们将深入探讨:

  • 🌐 缓存层次体系:浏览器缓存、边缘缓存、中心缓存的协同分工
  • 动态内容加速:边缘计算、智能路由与协议优化技术
  • 🔐 安全缓存挑战:签名URL、权限验证与敏感内容保护
  • 📊 缓存效能优化:命中率提升、失效策略与成本平衡
  • 🚀 边缘架构演进:从内容分发到边缘计算的范式转变

点击关注,构建高效安全的全球内容分发体系!

今日行动建议

  1. 评估现有应用的无状态化程度,制定状态外部化改造路线
  2. 设计水平扩展的容量规划与自动伸缩策略
  3. 建立多层级的故障检测与转移机制,定期进行故障演练
  4. 制定三大支柱协同效能的度量体系,持续优化高可用能力

今天跟大家分享一个etcd的内存大量占用的问题,这是前段时间在我们开源软件Easegress中遇到的问题,问题是比较简单的,但是我还想把前因后果说一下,包括,为什么要用etcd,使用etcd的用户场景,包括etcd的一些导致内存占用比较大的设计,以及最后一些建议。希望这篇文章不仅仅只是让你看到了一个简单的内存问题,还能让你有更多的收获。当然,也欢迎您关注我们的开源软件,给我们一些鼓励。

为什么要用ETCD

先说一下为什么要用etcd。先从一个我们自己做的一个API网关 – Easegress(源码)说起。

Easegress 是我们开发并开源的一个API应用网关产品,这个API应用网关不仅仅只是像nginx那样用来做一个反向代理,这个网关可以做的事很多,比如:API编排、服务发现、弹力设计(熔断、限流、重试等)、认证鉴权(JWT,OAuth2,HMAC等)、同样支持各种Cloud Native的架构如:微服务架构,Service Mesh,Serverless/FaaS的集成,并可以用于扛高并发、灰度发布、全链路压力测试、物联网……等更为高级的企业级的解决方案。所以,为了达到这些目标,在2017年的时候,我们觉得在现有的网关如Nginx上是无法演进出来这样的软件的,必需重新写一个(后来其他人也应该跟我们的想法一样,所以,Lyft写了一个Envoy。只不过,Envoy是用C++写的,而我用了技术门槛更低的Go语言)

另外,Easegress最核心的设计主要有三个:

  • 一是无第三方依赖的自己选主组集群的能力
  • 二是像Linux管道命令行那样pipeline式的插件流式处理(支持Go/WebAssembly)
  • 三是内置一个Data Store用于集群控制和数据共享。

对于任何一个分布式系统,都需要有一个强一制性的基于Paxos/Raft的可以自动选主机制,并且需要在整个集群间同步一些关键的控制/配置和相关的共享数据,以保证整个集群的行为是统一一致的。如果没有这么一个东西的话,就没有办法玩分布式系统的。这就是为什么会有像Zookeeper/etcd这样的组件出现并流行的原因。注意,Zookeeper他们主要不是给你存数据的,而是给你组集群的。

Zookeeper是一个很流行的开源软件,也被用于各大公司的生产线,包括一些开源软件,比如:Kafka。但是,这会让其它软件有一个依赖,并且在运维上带来很大的复杂度。所以,Kafka在最新的版本也通过内置了选主的算法,而抛弃了外挂zookeeper的设计。Etcd是Go语言社区这边的主力,也是kubernetes组建集群的关键组件。Easegress在一开始(5年前)使用了gossip协议同步状态(当时想的过于超前,想做广域网的集群),但是后发现这个协议太过于复杂,而且很难调试,而广域网的API Gateway也没遇到相应的场景。所以,在3年前的时候,为了稳定性的考量,我们把其换成了内嵌版本的etcd,这个设计一直沿用到今天。

Easegress会把所有的配置信息都放到etcd里,还包括一些统计监控数据,以及一些用户的自定义数据(这样用户自己的plugin不但可以在一条pipeline内,还可以在整个集群内共享数据),这对于用户进行扩展来说是非常方便的。软件代码的扩展性一直是我们追求的首要目标,尤其是开源软件更要想方设法降低技术门槛让技术易扩展,这就是为什么Google的很多开源软件都会选使用Go语言的原因,也是为什么Go正在取代C/C++的做PaaS基础组件的原因。

背景问题

好了,在介绍完为什么要用etcd以后,我开始分享一个实际的问题了。我们有个用户在使用 Easegress 的时候,在Easegress内配置了上千条pipeline,导致 Easegress的内存飙升的非常厉害- 10+GB 以上,而且长时间还下不来。

用户报告的问题是——

在Easegress 1.4.1 上创建一个HTTP对象,1000个Pipeline,在Easegres初始化启动完成时的内存占用大概为400M,运行80分钟后2GB,运行200分钟后达到了4GB,这期间什么也没有干,对Easegress没有进行过一次请求。

一般来说,就算是API再多也不应该配置这么多的处理管道pipeline的,通常我们会使用HTTP API的前缀把一组属于一个类别的API配置在一个管道内是比较合理的,就像nginx下的location的配置,一般来说不会太多的。但是,在用户的这个场景下配置了上千个pipeline,我们也是头一次见,应该是用户想做更细粒度的控制。

经过调查后,我们发现内存使用基本全部来自etcd,我们实在没有想到,因为我们往etcd里放的数据也没有多少个key,感觉不会超过10M,但不知道为什么会占用了10GB的内存。这种时候,一般会怀疑etcd有内存泄漏,上etcd上的github上搜了一下,发现etcd在3.2和3.3的版本上都有内存泄露的问题,但都修改了,而 Easegress 使用的是3.5的最新版本,另外,一般来说内存泄漏的问题不会是这么大的,我们开始怀疑是我们哪里误用了etcd。要知道是否误用了etcd,那么只有一条路了,沉下心来,把etcd的设计好好地看一遍。

大概花了两天左右的时间看了一下etcd的设计,我发现了etcd有下面这些消耗内存的设计,老实说,还是非常昂贵的,这里分享出来,避免后面的同学再次掉坑。

首当其冲是——RaftLog。etcd用Raft Log,主要是用于帮助follower同步数据,这个log的底层实现不是文件,而是内存。所以,而且还至少要保留 5000 条最新的请求。如果key的size很大,这 5000条就会产生大量的内存开销。比如,不断更新一个 1M的key,哪怕是同一个key,这 5000 条Log就是 5000MB = 5GB 的内存开销。这个问题在etcd的issue列表中也有人提到过  issue #12548 ,不过,这个问题不了了之了。这个5000还是一个hardcode,无法改。(参看 DefaultSnapshotCatchUpEntries 相关源码

// DefaultSnapshotCatchUpEntries is the number of entries for a slow follower
// to catch-up after compacting the raft storage entries.
// We expect the follower has a millisecond level latency with the leader.
// The max throughput is around 10K. Keep a 5K entries is enough for helping
// follower to catch up.
DefaultSnapshotCatchUpEntries uint64 = 5000

另外,我们还发现,这个设计在历史上etcd的官方团队把这个默认值从10000降到了5000,我们估计etcd官方团队也意识到10000有点太耗内存了,所以,降了一半,但是又怕follwer同步不上,所以,保留了 5000条……(在这里,我个人感觉还有更好的方法,至少不用全放在内存里吧……)

另外还有下面几项也会导致etcd的内存会增加

  1. 索引。etcd的每一对 key-value 都会在内存中有一个 B-tree 索引。这个索引的开销跟key的长度有关,etcd还会保存版本。所以B-tree的内存跟key的长度以及历史版本号数量也有关系。
  2. mmap。还有,etcd 使用 mmap 这样上古的unix技术做文件映射,会把他的blotdb的内存map到虚拟内存中,所以,db-size越大,内存越大。
  3. Watcher。watch也会占用很大的内存,如果watch很多,连接数多,都会堆积内存。

(很明显,etcd这么做就是为了一个高性能的考虑)

Easegress中的问题更多的应该是Raft Log 的问题。后面三种问题我们觉得不会是用户这个问题的原因,对于索引和mmap,使用 etcd 的 compact 和 defreg (压缩和碎片整理应该可以降低内存,但用户那边不应该是这个问题的核心原因)。

针对用户的问题,大约有1000多条pipeline,因为Easegress会对每一条pipeline进行数据统计(如:M1, M5, M15, P99, P90, P50等这样的统计数据),统计信息可能会有1KB-2KB左右,但Easegress会把这1000条pipeline的统计数据合并起来写到一个key中,这1000多条的统计数据合并后会导致出现一个平均尺寸为2MB的key,而5000个in-memory的RaftLog导致etcd要消耗了10GB的内存。之前没有这么多的pipeline的场景,所以,这个内存问题没有暴露出来。

于是,我们最终的解决方案也很简单,我们修改我们的策略,不再写这么大的Value的数据了,虽然以前只写在一个key上,但是Key的值太大,现在把这个大Key值拆分成多个小的key来写,这样,实际保存的数据没有发生变化,但是RaftLog的每条数据量就小了,所以,以前是5000条 2M(10GB),现在是5000条 1K(500MB),就这样解决了这个问题。相关的PR在这里 PR#542

总结

要用好 etcd,有如下的实践

  • 避免大尺寸的key和value,一方面会通过一个内存级的 Raft Log 占大量内存,另一方面,B-tree的多版本索引也会因为这样耗内存。
  • 避免DB的尺寸太大,并通过 compact和defreg来压缩和碎片整理降低内存。
  • 避免大量的Watch Client 和 Watch数。这个开销也是比较大的。
  • 最后还有一个,就是尽可能使用新的版本,无论是go语言还是etcd,这样会少很多内存问题。比如:golang的这个跟LInux内核心相关的内存问题 —— golang 1.12的版sget的是 MADV_FREE 的内存回收机制,而在1.16的时候,改成了 MADV_DONTNEED ,这两者的差别是,FREE表示,虽然进程标记内存不要了,但是操作系统会保留之,直到需要更多的内存,而 DONTNEED 则是立马回收,你可以看到,在常驻内存RSS 上,前者虽然在golang的进程上回收了内存,但是RSS值不变,而后者会看到RSS直立马变化。Linux下对 MADV_FREE 的实现在某些情况下有一定的问题,所以,在go 1.16的时候,默认值改成了 MADV_DONTNEED 。而 etcd 3.4 是用 来1.12 编译的。

最后,欢迎大家关注我们的开源软件! https://github.com/megaease/ 

Apache DolphinScheduler3.1.9+Minio 海报

目录

这里按照官方提供的文档进行操作:

前提

官方提供的开发手册位置

1、软件要求

在搭建 DolphinScheduler 开发环境之前请确保你已经安装以下软件:

  • Git
  • JDK: v1.8.x (当前暂不支持 jdk 11)
  • Maven: v3.5+
  • Node: v16.13+ (dolphinScheduler 版本低于 3.0, 请安装 node v12.20+)
  • Pnpm: v6.x

2、克隆代码库

通过你 git 管理工具下载 git 代码,下面以 git-core 为例

mkdir dolphinscheduler
cd dolphinscheduler
git clone git@github.com:apache/dolphinscheduler.git

3、编译源码

支持的系统:
* MacOS
* Linux
【这个我没有运行试试】
运行 `mvn clean install -Prelease -Dmaven.test.skip=true`

DolphinScheduler 普通开发模式

上面是官方提供的,我觉得有用就复制下来,

这里开始我就按照自己的操作顺序记录

1、编译问题:

1、git相关
1-1:开启 Windows Git 长路径支持,
管理员 PowerShell 执行,解决 DolphinScheduler 路径太深导致 git add 失败
git config --system core.longpaths true

1-2:先初始化git仓库,只在本地,不涉及账号、不推远程,Spotless 需要 HEAD
git init
git add .
git commit -m "initial commit"

2、Maven 编译 / 格式化(IDEA 里的 Terminal)
2-1:依赖 Git HEAD,自动修复格式问题
mvn spotless:apply
2-2:编译整个项目(跳过测试),确保所有模块已 install
mvn clean install -DskipTests

3、前端相关:

查看 Node.js 是否已安装
node -v

查看 npm 版本
npm -v

安装 pnpm
npm install -g pnpm
pnpm -v

编译都没有问题

2、启动zookeeper

官方内容

下载 ZooKeeper,解压

存储配置

启动脚本

搞个txt编辑完后,后缀该bat即可

@echo off
echo 正在启动 ZooKeeper...
cd /d E:\\install\\ZooKeeper\\zookeeper-3.8.3\\bin
zkServer.cmd
pause

3、workspace.xml 修改

【可以不用,我也是看其他文章有添加的,不过我没添加也能正常运行,这里只做记录】

在其他文章看到说在这里添加这行,说是让 IDEA 在运行时动态使用模块的 classpath,而不是用启动时生成的静态 classpath。

注意点:
这个作用只会影响本地 IDEA 启动,线上环境如果有问题这个是解决不了的。

"dynamic.classpath": "true",

4、数据库

我这里用的是mysql,所以需要修改

4-1:数据初始化
创建名为【dolphinscheduler】的新数据库后,
把这个位置的sql直接拷贝复制执行即可。

如图:

4-2:依赖相关修改
如果使用 MySQL 作为元数据库,需要先修改 `dolphinscheduler/pom.xml`,
将 `mysql-connector-java` 依赖的 `scope` 改为 `compile`,
使用 PostgreSQL 则不需要

test 改成 compile

5、application.yaml 修改数据库配置

5-1:dolphinscheduler-master
如图,配置文件中修改这些数据:三个内容都是一样的

spring:
  config:
    activate:
      on-profile: mysql
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
    username: 账户名
    password: 数据库密码

5-2:dolphinscheduler-worker

5-3:dolphinscheduler-api

6、logback-spring.xml 修改日志级别

6-1:dolphinscheduler-master
<appender-ref ref="STDOUT"/>

6-2:dolphinscheduler-worker

6-3:dolphinscheduler-api

7、启动后端三个服务

我们需要启动三个服务,包括 MasterServer,WorkerServer,ApiApplicationServer

* MasterServer:在 Intellij IDEA 中执行 `org.apache.dolphinscheduler.server.master.MasterServer` 中的 `main` 方法,并配置 *VM Options* `-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql`

* WorkerServer:在 Intellij IDEA 中执行 `org.apache.dolphinscheduler.server.worker.WorkerServer` 中的 `main` 方法,并配置 *VM Options* `-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql`

* ApiApplicationServer:在 Intellij IDEA 中执行 `org.apache.dolphinscheduler.api.ApiApplicationServer` 中的 `main` 方法,并配置 *VM Options* `-Dlogging.config=classpath:logback-spring.xml -Dspring.profiles.active=api,mysql`。启动完成可以浏览 Open API 文档,地址为 http://localhost:12345/dolphinscheduler/swagger-ui/index.html

> VM Options `-Dspring.profiles.active=mysql` 中 `mysql` 表示指定的配置文件
7-1:MasterServer
配置 VM Options
按照操作配置这个:打开后填入即可

-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql

7-2:WorkerServer
配置 VM Options

跟上面一样操作:

-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql
7-3:ApiApplicationServer
配置 VM Options
-Dlogging.config=classpath:logback-spring.xml -Dspring.profiles.active=api,mysql

总的就这三个:

8、启动前端服务

命令:
安装前端依赖并运行前端组件

cd dolphinscheduler-ui
pnpm install
pnpm run dev

9、浏览器访问

账号密码:
浏览器访问:
http://localhost:5173/home

默认账号密码:

账号:admin
密码:dolphinscheduler123
成功访问:

相关问题

1、存储未启用、租户\用户 指定

问题:测试能否创建文件夹、上传文件等,提示【存储未启用】

问题:当前登录用户的租户信息未被指定

解决方法:

Minio 安装、启动

我这里直接用minio来尝试:

1、minio 创建 dolphinscheduler 桶

2、commom.properties 修改

配置文件改了这些地方

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# user data local directory path, please make sure the directory exists and have read write permissions
data.basedir.path=/tmp/dolphinscheduler

# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js

# resource storage type: HDFS, S3, OSS, NONE
# ljh -->   S3 is Minio--------------------------------------
resource.storage.type=S3
# resource store on HDFS/S3 path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.storage.upload.base.path=/dolphinscheduler

# ljh --> The account and password of MinIO-------------------------------
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.access.key.id=minioadmin
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.secret.access.key=minioadmin
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.region=cn-north-1
# ljh --> add bucket ------------------------------
# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name.
resource.aws.s3.bucket.name=dolphinscheduler
# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
# ljh --> localhost convert  127.0.0.1
resource.aws.s3.endpoint=http://127.0.0.1:9000

# alibaba cloud access key id, required if you set resource.storage.type=OSS
resource.alibaba.cloud.access.key.id=<your-access-key-id>
# alibaba cloud access key secret, required if you set resource.storage.type=OSS
resource.alibaba.cloud.access.key.secret=<your-access-key-secret>
# alibaba cloud region, required if you set resource.storage.type=OSS
resource.alibaba.cloud.region=cn-hangzhou
# oss bucket name, required if you set resource.storage.type=OSS
resource.alibaba.cloud.oss.bucket.name=dolphinscheduler
# oss bucket endpoint, required if you set resource.storage.type=OSS
resource.alibaba.cloud.oss.endpoint=https://oss-cn-hangzhou.aliyuncs.com

# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
resource.hdfs.root.user=hdfs
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
resource.hdfs.fs.defaultFS=hdfs://mycluster:8020

# whether to startup kerberos
hadoop.security.authentication.startup.state=false

# java.security.krb5.conf path
java.security.krb5.conf.path=/opt/krb5.conf

# login user from keytab username
login.user.keytab.username=hdfs-mycluster@ESZ.COM

# login user from keytab path
login.user.keytab.path=/opt/hdfs.headless.keytab

# kerberos expire time, the unit is hour
kerberos.expire.time=2

# resourcemanager port, the default value is 8088 if not specified
resource.manager.httpaddress.port=8088
# if resourcemanager HA is enabled, please set the HA IPs; if resourcemanager is single, keep this value empty
yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
# if resourcemanager HA is enabled or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ds1 to actual resourcemanager hostname
yarn.application.status.address=http://ds1:%s/ws/v1/cluster/apps/%s
# job history status url when application number threshold is reached(default 10000, maybe it was set to 1000)
yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s

# datasource encryption enable
datasource.encryption.enable=false

# datasource encryption salt
datasource.encryption.salt=!@#$%^&*

# data quality option
data-quality.jar.name=dolphinscheduler-data-quality-dev-SNAPSHOT.jar

#data-quality.error.output.path=/tmp/data-quality-error-data

# Network IP gets priority, default inner outer

# Whether hive SQL is executed in the same session
support.hive.oneSession=false

# use sudo or not, if set true, executing user is tenant user and deploy user needs sudo permissions; if set false, executing user is the deploy user and doesn't need sudo permissions
sudo.enable=true
setTaskDirToTenant.enable=false

# network interface preferred like eth0, default: empty
#dolphin.scheduler.network.interface.preferred=

# network IP gets priority, default: inner outer
#dolphin.scheduler.network.priority.strategy=default

# system env path
#dolphinscheduler.env.path=dolphinscheduler_env.sh

# development state
development.state=false

# rpc port
alert.rpc.port=50052

# set path of conda.sh
conda.path=/opt/anaconda3/etc/profile.d/conda.sh

# Task resource limit state
task.resource.limit.state=false

# mlflow task plugin preset repository
ml.mlflow.preset_repository=https://github.com/apache/dolphinscheduler-mlflow
# mlflow task plugin preset repository version
ml.mlflow.preset_repository_version="main"

# ljh --> minio must open path style
resource.aws.s3.path.style.access=true
3、dolphinscheduler 可视化页面添加租户

安全中心 - 租户管理 - 创建租户

用户添加租户

演示

创建文件夹、上传文件成功

如图,数据已经存放在我指定的minio文件夹里面了