1. 概述

Apache Spark 是一个快速、分布式的通用数据处理系统,它支持内存计算,通过内存缓存和优化执行机制,实现极高的性能。Spark 提供了面向多种主流编程语言(如 Scala、Python、Java 和 R)的高级 API。

在本篇文章中,我们将深入探讨 Spark 中三个核心概念:DataFrame、Dataset 和 RDD

2. DataFrame

从 Spark 1.3 版本开始,Spark SQL 引入了一种名为 DataFrame 的结构化数据抽象。自那以后,它就成为了 Spark 中最重要的特性之一。当我们需要处理结构化或半结构化的分布式数据时,DataFrame 是一个非常合适的选择。

在第 3 节中我们会讨论 RDD(Resilient Distributed Dataset)。DataFrame 在存储效率上优于 RDD,因为它不仅继承了 RDD 的不可变性、内存计算、容错性和分布式并行处理能力,还为数据添加了 schema(结构),使得 Spark 可以更好地优化执行计划。

此外,DataFrame 会将 SQL 查询翻译为优化后的低级 RDD 操作。

我们可以用以下三种方式创建 DataFrame:

  • 将已有的 RDD 转换而来
  • 通过 SQL 查询生成
  • 从外部数据源加载

在 Spark 2.0 中,官方引入了 SparkSession,统一了之前多个上下文的使用方式,开发者不再需要手动管理不同的上下文对象:

SparkSession session = SparkSession.builder()
  .appName("TouristDataFrameExample")
  .master("local[*]")
  .getOrCreate();

DataFrameReader dataFrameReader = session.read();

我们接下来将分析 Tourist.csv 文件中的数据:

Dataset<Row> data = dataFrameReader.option("header", "true")
  .csv("data/Tourist.csv");

⚠️ 注意:从 Spark 2.0 开始,DataFrame 实际上是类型为 Row 的 Dataset,即 Dataset<Row> 的别名。

我们可以选择特定列进行展示、过滤或分组操作:

data.select(col("country"), col("year"), col("value"))
  .show();

data.filter(col("country").equalTo("Mexico"))
  .show();

data.groupBy(col("country"))
  .count()
  .show();

3. Dataset

Dataset 是一组强类型、结构化的数据集合,它结合了面向对象编程风格和类型安全的优势,可以在编译期进行语法检查和错误捕获。

Dataset 可以看作是 DataFrame 的扩展。从这个角度来说,DataFrame 可以被视作一个无类型的 Dataset。

Dataset API 是在 Spark 1.6 中引入的,正如 Spark 团队所说:

“Spark Dataset 的目标是提供一个既能方便表达对象域上的转换操作,又能充分利用 Spark SQL 执行引擎性能和稳定性的 API。”

首先,我们需要定义一个 Java 类 TouristData 来映射数据:

public class TouristData {
    private String region;
    private String country;
    private String year;
    private String series;
    private Double value;
    private String footnotes;
    private String source;
    // ... getters and setters
}

为了将记录映射到指定类型,我们需要使用 Encoder。Encoder 负责在 Java 对象和 Spark 内部的二进制格式之间进行转换:

// SparkSession 初始化与数据加载
Dataset<Row> responseWithSelectedColumns = data.select(
  col("region"), 
  col("country"), 
  col("year"), 
  col("series"), 
  col("value").cast("double"), 
  col("footnotes"), 
  col("source")
);

Dataset<TouristData> typedDataset = responseWithSelectedColumns
  .as(Encoders.bean(TouristData.class));

像 DataFrame 一样,我们也可以对 Dataset 进行过滤和分组操作:

typedDataset.filter((FilterFunction) record -> record.getCountry()
  .equals("Norway"))
  .show();

typedDataset.groupBy(typedDataset.col("country"))
  .count()
  .show();

我们还可以实现更复杂的操作,例如筛选某个年份范围内的数据,或者计算某列的总和:

typedDataset.filter((FilterFunction) record -> record.getYear() != null 
  && (Long.valueOf(record.getYear()) > 2010 
  && Long.valueOf(record.getYear()) < 2017)).show();

typedDataset.filter((FilterFunction) record -> record.getValue() != null 
  && record.getSeries()
    .contains("expenditure"))
    .groupBy("country")
    .agg(sum("value"))
    .show();

4. RDD

RDD(Resilient Distributed Dataset)是 Spark 最核心的编程抽象。它表示一个不可变、容错、分布式的元素集合

RDD 封装了大量数据,Spark 会自动将 RDD 中的数据分布到集群节点上,并并行执行操作

我们只能通过以下两种方式创建 RDD:

  • 从稳定存储中读取数据
  • 从其他 RDD 上执行转换操作生成

在处理大规模分布式数据时,容错性至关重要。RDD 的容错能力来自于 Spark 内置的恢复机制。Spark 会记录 RDD 的血缘关系(lineage),以便在发生故障时能快速重建丢失的分区

对 RDD 的操作主要分为两类:转换(Transformations)和动作(Actions)

4.1. 转换(Transformations)

转换操作用于对 RDD 中的数据进行处理,每次转换操作都会生成一个新的 RDD,因为 RDD 是不可变的

我们以 Map 和 Filter 为例,这是最常用的两个转换操作。

首先创建一个 JavaSparkContext 并从 Tourist.csv 文件中加载数据为 RDD:

SparkConf conf = new SparkConf().setAppName("uppercaseCountries")
  .setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<String> tourists = sc.textFile("data/Tourist.csv");

然后我们使用 map 操作提取每条记录中的国家名,并将其转换为大写,再保存到磁盘:

JavaRDD<String> upperCaseCountries = tourists.map(line -> {
    String[] columns = line.split(COMMA_DELIMITER);
    return columns[1].toUpperCase();
}).distinct();

upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");

如果我们只想筛选出特定国家的数据,可以使用 filter

JavaRDD<String> touristsInMexico = tourists
  .filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico"));

touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");

4.2. 动作(Actions)

动作操作用于触发实际的计算,并返回结果或将结果写入存储系统。

在 Spark 中,最常用的动作包括 countreduce

我们先统计 CSV 文件中不同国家的数量:

// Spark Context 初始化与数据加载
JavaRDD<String> countries = tourists.map(line -> {
    String[] columns = line.split(COMMA_DELIMITER);
    return columns[1];
}).distinct();

Long numberOfCountries = countries.count();

接下来,我们计算各国旅游支出总和。为此,我们需要筛选出描述中包含 “expenditure” 的记录。

这里我们使用 JavaPairRDD 来处理键值对数据。JavaPairRDD 是一种可以存储键值对的特殊 RDD 类型

JavaRDD<String> touristsExpenditure = tourists
  .filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure"));

JavaPairRDD<String, Double> expenditurePairRdd = touristsExpenditure
  .mapToPair(line -> {
      String[] columns = line.split(COMMA_DELIMITER);
      return new Tuple2<>(columns[1], Double.valueOf(columns[6]));
});

List<Tuple2<String, Double>> totalByCountry = expenditurePairRdd
  .reduceByKey((x, y) -> x + y)
  .collect();

5. 总结

简单总结一下

  • 当我们需要结构化数据处理、高级聚合操作或 SQL 查询能力时,推荐使用 DataFrame 或 Dataset
  • 如果你需要编译时类型安全,Dataset 是更好的选择。
  • 如果数据是非结构化的,或者你需要进行低级别的转换和控制,RDD 仍然是不可替代的。

一如既往,本文所有示例代码都可以在 GitHub 上找到。


原始标题:Apache Spark: Differences between Dataframes, Datasets and RDDs