标签 虚拟机 下的文章

已经在虚拟机部署好Apache DolphinScheduler了,想尝试下在Flink新建一个Flink节点,然后用Flink消费Kafka数据。

Apache DolphinScheduler用的是单机部署,具体操作可以参考官方文档:DolphinScheduler | 文档中心(https://dolphinscheduler.apache.org/zh-cn/docs/3.3.2/guide/in...).

  • 前置条件:已经安装Java 11、DolphinScheduler 3.3.2、Flink 1.18.1、Kafka 3.6.0,Zookeeper用Kafka内置的。建议这些安装都下载二进制的安装包到虚拟机安装,用命令安装的不可控,我下载的二进制包如下:

配置好Flink的环境变量

1、编辑环境变量:

sudo vim ~/.bashrc

增加Flink的路径

2、使环境变量生效:

#使环境变量生效
source ~/.bashrc
#查看环境变量
echo $Flink_HOME

修改Kafka、Flink以及DolphinScheduler的配置文件

因为用的是虚拟机,为了让外面的主机能够访问到虚拟机的网络,需要修改下配置文件

  1. 修改Kafka配置:找到Kafka安装包下的config文件夹,修改config下的server.properties文件,修改listeners是为了外面的主机能够访问到虚拟机的Kafka,还有把advertised.listeners改成虚拟机地址,写样例的时候能连上虚拟机的Kafka地址,不然默认连localhost
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
#192.168.146.132修改成虚拟机ip
advertised.listeners=PLAINTEXT://192.168.146.132:9092

  1. 修改Flink配置:找到Flink安装包下的conf文件夹,修改conf下的Flink-conf.yaml文件,把里面所有的localhost地址全部改成0.0.0.0,以便主机能访问到虚拟机的Flink。还有增加jobmanager和taskmanager的内存
jobmanager.rpc.address: 0.0.0.0
jobmanager.bind-host: 0.0.0.0
jobmanager.cpu.cores: 1
jobmanager.memory.process.size: 1600m
taskmanager.bind-host: 0.0.0.0
taskmanager.host: 0.0.0.0
taskmanager.memory.process.size: 2048m
taskmanager.cpu.cores: 1

  1. 修改Apache DolphinScheduler的配置文件,从Apache DolphinScheduler的启动脚本文件dolphinscheduler-daemon.sh可以看出,配置环境变量用的是bin/env文件夹下的dolphinscheduler_env.sh

查看dolphinscheduler-daemon.sh文件:

修改dolphinscheduler_env.sh文件,新增JAVA、Flink路径:

#修改成自己的JAVA、Flink路径
export JAVA_HOME=/data/jdk-11.0.29
export Flink_HOME=/data/Flink-1.18.1

关闭防火墙,启动应用

启动应用,包括Zookeeper、Kafka、Flink以及Apache DolphinScheduler。

#关闭防火墙
sudo systemctl stop firewalld
 
# 在 Flink 根目录下,执行以下命令启动 Flink 集群
bin/start-cluster.sh
 
# 启动 ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
 
# 启动 Kafka 服务器
bin/Kafka-server-start.sh config/server.properties &
 
#创建 Kafka 主题
bin/Kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
 
#使用命令行生产者发送消息
bin/Kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
 
#消费
bin/Kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

# 启动 Standalone Server 服务
bash ./bin/dolphinscheduler-daemon.sh start standalone-server

测试

测试Flink、Apache DolphinScheduler是否能访问成功。

  1. Flink访问地址:http://localhost:8081/,localhost改成自己虚拟机地址

  1. Apache DolphinScheduler访问地址:http://localhost:12345/dolphinscheduler/ui ,localhost改成自己虚拟机地址即可登录系统 UI。默认的用户名和密码是 admin/dolphinscheduler123

编写样例

用Flink消费Kafka数据,然后打包上传到Apache DolphinScheduler,启动Flink任务:

  1. 编写样例:

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.example</groupId>
    <artifactId>Flink-Kafka-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <Flink.version>1.18.1</Flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <Kafka.version>3.6.0</Kafka.version>
    </properties>
 
    <dependencies>
        <!-- Flink核心依赖 -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-java</artifactId>
            <version>${Flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-streaming-java</artifactId>
            <version>${Flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-clients</artifactId>
            <version>${Flink.version}</version>
        </dependency>
 
        <!-- 连接器基础依赖 -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-connector-base</artifactId>
            <version>${Flink.version}</version>
        </dependency>
 
        <!-- Kafka连接器(关键修改点) -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-connector-Kafka</artifactId>
            <version>3.1.0-1.18</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Kafka</groupId>
            <artifactId>Kafka-clients</artifactId>
            <version>${Kafka.version}</version>
        </dependency>
 
        <!-- 日志依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.36</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>
 
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>https://maven.aliyun.com/repository/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>apache-releases</id>
            <url>https://repository.apache.org/content/repositories/releases/</url>
        </repository>
    </repositories>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.Flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

FlinkKafkaConsumerExample.java

import org.apache.Flink.api.common.functions.FlatMapFunction;
import org.apache.Flink.api.java.tuple.Tuple2;
import org.apache.Flink.api.java.utils.ParameterTool;
import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.Flink.streaming.api.datastream.DataStream;
import org.apache.Flink.streaming.api.functions.ProcessFunction;
import org.apache.Flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.Flink.util.Collector;
import org.apache.Flink.streaming.connectors.Kafka.FlinkKafkaConsumer;
import org.apache.Flink.api.common.serialization.SimpleStringSchema;
import org.apache.Kafka.clients.consumer.ConsumerConfig;
import org.apache.Kafka.common.serialization.StringDeserializer;
 
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
 
 
public class FlinkKafkaConsumerExample {
    private static volatile int messageCount = 0;
    private static volatile boolean shouldStop = false;
    public static void main(String[] args) throws Exception {
        // 设置执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        // Kafka 配置
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092"); // Kafka broker 地址
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消费者组
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
        // 创建 Kafka 消费者
        FlinkKafkaConsumer<String> KafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        KafkaConsumer.setStartFromEarliest(); // 从最早的消息开始消费
        DataStream<String> stream = env.addSource(KafkaConsumer);
 
        // 处理数据:分词和计数
        DataStream<Tuple2<String, Integer>> counts = stream
                .flatMap(new Tokenizer())
                .keyBy(value -> value.f0)
                .sum(1);
 
 
        counts.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {
            @Override
            public void invoke(Tuple2<String, Integer> value, Context context) {
                System.out.println(value);
                messageCount++;
 
                // 检查是否达到停止条件
                if (messageCount >= 2 && !shouldStop) {
                    System.out.println("Processed 2 messages, stopping job.");
                    shouldStop = true; // 设置标志位,表示应该停止
                }
            }
        });
 
        // 执行作业并获取 JobClient
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                // 启动作业并获取 JobClient
                org.apache.Flink.core.execution.JobClient jobClient = env.executeAsync("Flink Kafka WordCount");
                System.out.println("Job ID: " + jobClient.getJobID());
 
                // 监测条件并取消作业
                while (!shouldStop) {
                    Thread.sleep(100); // 每100毫秒检查一次
                }
 
                // 达到停止条件时取消作业
                if (shouldStop) {
                    System.out.println("Cancelling the job...");
                    jobClient.cancel().get(); // 取消作业
                }
 
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
 
        // 在主线程中等待作业结束
        future.join(); // 等待作业完成
    }
 
    // Tokenizer 类用于将输入字符串转化为单词
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
 
}
  1. 打包上传到Apache DolphinScheduler

  1. 新建Flink节点,并启动

在Apache DolphinScheduler的任务实例看启动日志:

在虚拟机启动生产者,输出字符串,然后可以在Flink查看输出Kafka生产的消息:

原文链接:https://blog.csdn.net/Analyze_ing/article/details/156940553

UTM 5.0.1 发布 - 基于 QEMU 的 macOS 虚拟机与模拟器应用

在 iPhone 和 iPad 中虚拟化 Windows、Linux 和 Unix,如此简单!

请访问原文链接:https://sysin.org/blog/utm-5/ 查看最新版。原创作品,转载请保留出处。

作者主页:sysin.org


UTM for Mac 是一款基于 QEMU 的 macOS 虚拟机与模拟器应用,用于在 Mac 上运行和测试 Windows、Linux 等多种操作系统。

UTM 是什么?

UTM 是一款面向 iOS 和 macOS 的全功能系统模拟器和虚拟机宿主,基于 QEMU。简而言之,它可以让你在 Mac、iPhone 和 iPad 上运行 Windows、Linux 等多种操作系统。

QEMU(Quick Emulator)是一款开源的机器模拟器和虚拟化软件,由 Fabrice Bellard 于 2003 年创建。它通过动态二进制转换技术实现跨平台虚拟化,支持 x86、ARM、MIPS 等多种处理器架构。QEMU 既可以作为独立虚拟机运行完整的操作系统,也可与 KVM(基于内核的虚拟机)配合使用实现硬件加速虚拟化 (sysin),这种组合方案能够提供接近原生性能的虚拟化体验。

QEMU 核心特点是跨平台虚拟化支持,支持 x86、ARM、RISC-V、PowerPC 等多种处理器架构,可在 Windows、Linux、macOS 等操作系统上运行,这意味着在 iOS 上也可以运行 Windows x86 系统。

UTM 和 QEMU 的关系可以概括为:UTM 是基于 QEMU 构建的图形化虚拟化与系统模拟工具。

快速入门

快速入门 - 步骤一

1、使用 “+” 按钮打开新的虚拟机向导。

2、 选择一个虚拟机以显示其详细信息;对虚拟机进行右键点击或使用 Force Touch 可打开 操作菜单 (actions menu)。

3、使用启动按钮快速启动虚拟机。

快速入门 - 步骤二

4、通过工具栏最右侧的图标打开 设置 (settings)。

5、在详情视图中,点击 “浏览…” 按钮选择一个 共享目录 ( shared directory)。

6、在详情视图中打开驱动器菜单,选择要挂载(或弹出)的可移动磁盘映像

UTM 5 新增功能

亮点

  • 改进了 Linux 的图形加速:在 Linux 客户机中,通过 Mesa 的 VirtIO Venus 驱动现已支持 Vulkan 1.3;在 macOS(仅限)上,借助全新的 Apple Core OpenGL 后端支持 OpenGL 4.1。

已知问题

  • (macOS)Apple CoreGL 后端不支持 Vulkan。
  • 由于缺少相关特性,DXVK 无法工作(#7575)。
  • KosmicKrisp 目前以 WIP(进行中)形式提供,但上游当前版本尚不完整,因此推荐使用 MoltenVK 驱动。
  • (macOS 26)由于 HV_UNSUPPORTED 错误,虚拟机无法启动。这是一个构建问题,将在下一个版本中修复。临时解决方法:在设置中禁用 Vulkan(#7579)。

更改内容(v5.0.1)

  • CocoaSpice:重构了 Metal 渲染器,以提升性能并降低延迟
  • 修复了当 BIOS 文件名中包含逗号时无法加载的问题
  • 默认 FPS 现在将设置为 macOS 和 iPadOS 显示器的最大刷新率(配备 ProMotion 的 iPhone 默认仍为 60Hz,但可在设置中覆盖为 120Hz)
  • (macOS)修复了在启用 Vulkan 时启动出现 HV_UNSUPPORTED 的问题(#7579)
  • (iOS)修复了外接显示器的分辨率问题(#6040)
  • (iOS)双指缩放将自动关闭来自来宾系统的缩放自动更新(可通过调整大小按钮重置)
  • (iOS)修复了外接显示器菜单未更新的问题 (sysin)
  • (iOS)修复了外接显示器缩放比例不正确的问题,并且在连接外接显示器时将自动缩放以适配屏幕
  • (iOS)修复了关闭虚拟机电源后会显示主屏幕但不会在后台终止 UTM 的问题

下载地址

UTM v5.0.1 Release 2026-01-20

更多:macOS 下载汇总 (系统、应用和教程)