1. 简介

Spring Cloud Data Flow 是一个用于构建数据集成和实时数据处理流水线的工具集

这些流水线本质上是基于 Spring Boot 构建的应用,通常使用 Spring Cloud StreamSpring Cloud Task 框架开发。

在本教程中,我们将演示如何将 Spring Cloud Data Flow 与 Apache Spark 结合使用,完成本地 Spark Job 的调度与执行。


2. 启动本地 Data Flow Server

要部署任务,首先需要启动 Spring Cloud Data Flow Server。

2.1 添加依赖

pom.xml 中添加以下依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
    <version>1.7.4.RELEASE</version>
</dependency>

2.2 启动 Server

主类添加 @EnableDataFlowServer 注解:

@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowServerApplication.class, args);
    }
}

启动后,本地 Data Flow Server 将运行在 localhost:9393


3. 创建 Spark Job 项目

我们将创建一个简单的 Spark 应用用于估算 π 值,并打包为 fat jar 供后续部署使用。

3.1 添加 Spark 依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>2.4.0</version>
</dependency>

3.2 编写 Job 代码

public class PiApproximation {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("BaeldungPIApproximation");
        JavaSparkContext context = new JavaSparkContext(conf);
        int slices = args.length >= 1 ? Integer.valueOf(args[0]) : 2;
        int n = (100000L * slices) > Integer.MAX_VALUE ? Integer.MAX_VALUE : 100000 * slices;

        List<Integer> xs = IntStream.rangeClosed(0, n)
          .mapToObj(element -> Integer.valueOf(element))
          .collect(Collectors.toList());

        JavaRDD<Integer> dataSet = context.parallelize(xs, slices);

        JavaRDD<Integer> pointsInsideTheCircle = dataSet.map(integer -> {
           double x = Math.random() * 2 - 1;
           double y = Math.random() * 2 - 1;
           return (x * x + y * y ) < 1 ? 1: 0;
        });

        int count = pointsInsideTheCircle.reduce((integer, integer2) -> integer + integer2);

        System.out.println("The pi was estimated as:" + count / n);

        context.stop();
    }
}

✅ 请确保使用 Maven 或 Gradle 构建出可执行的 fat jar,路径如 /apache-spark-job-0.0.1-SNAPSHOT.jar


4. 使用 Data Flow Shell 管理任务

Data Flow Shell 是一个用于与 Server 交互的命令行工具,支持 DSL 命令操作流水线。

4.1 添加 Shell 依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dataflow-shell</artifactId>
    <version>1.7.4.RELEASE</version>
</dependency>

4.2 启动 Shell 应用

主类添加 @EnableDataFlowShell 注解:

@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowShellApplication.class, args);
    }
}

启动后,会进入交互式命令行界面。


5. 部署与执行 Spark Job

Spring Cloud Data Flow 提供了用于运行 Spark 任务的 Task Runner,支持三种模式:cluster、yarn、client。本例使用本地 client 模式。

5.1 注册 Spark Client Task

app register --type task --name spark-client --uri maven://org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT

5.2 创建任务

task create spark1 --definition "spark-client \
  --spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation \
  --spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"

5.3 执行任务

task launch spark1

✅ 任务执行后,控制台将输出类似如下内容:

The pi was estimated as: 3.14159265

6. 总结

本教程演示了如何通过 Spring Cloud Data Flow 集成 Apache Spark,完成本地 Spark Job 的注册、部署与执行。

  • ✅ Spring Cloud Data Flow 提供了完整的任务调度能力
  • ✅ Spark Job 可以通过简单的参数配置进行部署
  • ⚠️ 注意 jar 包路径和类名的正确性,否则任务会执行失败

更多资料可参考 Spring Cloud Data Flow 官方文档


原始标题:Spring Cloud Data Flow With Apache Spark | Baeldung