1. 简介

本文将通过一个实际的数据处理应用,带你快速上手 Apache Crunch,并基于 MapReduce 框架运行。

我们会先简要介绍 Crunch 的核心概念,然后进入实战环节:实现一个文本词频统计程序,流程如下:

✅ 从文本文件读取每一行
✅ 拆分为单词,并过滤常见停用词(stop words)
✅ 按单词分组,统计每个词的出现次数
✅ 将结果写入输出文件

整个过程简洁清晰,适合快速理解 Crunch 的编程模型。

2. 什么是 Apache Crunch?

MapReduce 是一种用于在服务器集群上并行处理海量数据的分布式编程模型,Hadoop 和 Spark 都实现了该模型。

Crunch 的定位是:为 Java 开发者提供一套高级 API,用于构建、测试和运行 MapReduce 数据流水线(pipeline)。你不需要直接编写 MapReduce 的 Mapper 和 Reducer,而是通过 Crunch 的 API 定义数据处理流程。Crunch 的 Planner 会自动将其编排为底层的 MapReduce 任务,并按需执行。

整个数据流水线由 Pipeline 接口协调。该接口负责:

  • 通过 Source 读取输入数据
  • 通过 Target 写出结果数据

Crunch 提供了三种核心数据抽象接口:

  1. PCollection<T>:不可变的、分布式的元素集合
  2. PTable<K, V>:不可变的、分布式的键值多映射(类似 Map<K, List>)
  3. PGroupedTable<K, V>:按键排序的分布式映射,值为可迭代一次的 Iterable<V>

⚠️ DoFn 是所有数据处理函数的基类,它对应 MapReduce 中的 Mapper、Reducer 和 Combiner。我们大部分开发工作都集中在编写和测试 DoFn 上。

理解了这些概念后,接下来我们动手实现一个词频统计应用。

3. 项目搭建

使用 Maven 构建 Crunch 项目有两种方式:

  1. 在已有项目中添加依赖
  2. 使用官方 Archetype 快速生成脚手架

3.1. Maven 依赖

pom.xml 中添加核心依赖:

<dependency>
    <groupId>org.apache.crunch</groupId>
    <artifactId>crunch-core</artifactId>
    <version>1.0.0</version>
</dependency>

同时添加 Hadoop 客户端依赖(版本需与集群一致):

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.2.0</version>
    <scope>provided</scope>
</dependency>

建议前往 Maven Central 查看最新版本。

3.2. 使用 Maven Archetype

Crunch 提供了快速生成项目脚手架的 Archetype:

mvn archetype:generate -Dfilter=org.apache.crunch:crunch-archetype

执行后按提示输入版本号和项目信息即可生成基础项目结构,省去手动配置的麻烦。

4. Pipeline 初始化

项目搭建完成后,首先需要创建 Pipeline 实例。Crunch 提供了三种实现:

  • MRPipeline:在 Hadoop MapReduce 上执行
  • SparkPipeline:在 Spark 上执行
  • MemPipeline:纯内存执行,非常适合单元测试

开发阶段推荐使用 MemPipeline 快速验证逻辑,生产环境则切换为 MRPipeline

创建内存管道用于测试:

Pipeline pipeline = MemPipeline.getInstance();

创建 MapReduce 管道用于实际运行:

Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

5. 读取输入数据

通过 Pipeline 接口的 readTextFile 方法可直接读取文本文件:

PCollection<String> lines = pipeline.readTextFile(inputPath);

该方法将文件每一行作为 String 存入 PCollection

配套的单元测试示例:

@Test
public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead() {
    Pipeline pipeline = MemPipeline.getInstance();
    PCollection<String> lines = pipeline.readTextFile("src/test/resources/input.txt");

    assertEquals(21, lines.asCollection()
      .getValue()
      .size());
}

测试验证了文件行数是否符合预期。

6. 数据处理流程

Crunch 提供了多种 DoFn 的子类来简化常见操作:

  • FilterFn:根据条件过滤元素
  • MapFn:一对一映射
  • CombineFn:合并多个值为单个值
  • JoinFn:支持内连接、外连接等

我们将用这些工具实现以下逻辑:

  1. 拆分行为单词
  2. 过滤停用词
  3. 统计词频

6.1. 拆分文本为单词

创建 Tokenizer 类继承 DoFn<String, String>

public class Tokenizer extends DoFn<String, String> {
    private static final Splitter SPLITTER = Splitter
      .onPattern("\\s+")
      .omitEmptyStrings();

    @Override
    public void process(String line, Emitter<String> emitter) {
        for (String word : SPLITTER.split(line)) {
            emitter.emit(word);
        }
    }
}

使用 Guava 的 Splitter 按空白字符拆分,并忽略空字符串。

单元测试验证:

@RunWith(MockitoJUnitRunner.class)
public class TokenizerUnitTest {
 
    @Mock
    private Emitter<String> emitter;

    @Test
    public void givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEmitted() {
        Tokenizer splitter = new Tokenizer();
        splitter.process("  hello  world ", emitter);

        verify(emitter).emit("hello");
        verify(emitter).emit("world");
        verifyNoMoreInteractions(emitter);
    }
}

使用 parallelDo 应用该函数:

PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());

parallelDoPCollection 中每个元素应用 DoFn,返回新的 PCollection

6.2. 过滤停用词

创建 StopWordFilter 继承 FilterFn<String>

public class StopWordFilter extends FilterFn<String> {

    private static final Set<String> STOP_WORDS = ImmutableSet
      .copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but", "by",
        "for", "if", "in", "into", "is", "it", "no", "not", "of", "on",
        "or", "s", "such", "t", "that", "the", "their", "then", "there",
        "these", "they", "this", "to", "was", "will", "with" });

    @Override
    public boolean accept(String word) {
        return !STOP_WORDS.contains(word);
    }
}

单元测试覆盖正反例:

public class StopWordFilterUnitTest {

    @Test
    public void givenFilter_whenStopWordPassed_thenFalseReturned() {
        FilterFn<String> filter = new StopWordFilter();
 
        assertFalse(filter.accept("the"));
        assertFalse(filter.accept("a"));
    }

    @Test
    public void givenFilter_whenNonStopWordPassed_thenTrueReturned() {
        FilterFn<String> filter = new StopWordFilter();
 
        assertTrue(filter.accept("Hello"));
        assertTrue(filter.accept("World"));
    }

    @Test
    public void givenWordCollection_whenFiltered_thenStopWordsRemoved() {
        PCollection<String> words = MemPipeline
          .collectionOf("This", "is", "a", "test", "sentence");
        PCollection<String> noStopWords = words.filter(new StopWordFilter());

        assertEquals(ImmutableList.of("This", "test", "sentence"),
         Lists.newArrayList(noStopWords.materialize()));
    }
}

应用过滤器:

PCollection<String> noStopWords = words.filter(new StopWordFilter());

filter 方法返回过滤后的新集合。

6.3. 统计词频

PCollection 提供了 count() 方法直接统计各元素出现次数:

PTable<String, Long> counts = noStopWords.count();

该方法内部会自动编排 MapReduce 任务,输出为 PTable<String, Long>,即单词到计数的映射。

7. 输出结果

使用 PipelinewriteTextFile 方法将结果写入文件:

pipeline.writeTextFile(counts, outputPath);

该方法会将 PCollectionPTable 格式化为文本写入指定路径。

8. 执行流水线

⚠️ 注意:到目前为止的所有操作都只是定义了数据流水线,尚未真正执行。Crunch 采用惰性执行模型。

必须显式调用执行方法才会触发任务:

  • run():同步执行
  • done():执行所有剩余任务并清理中间文件
  • runAsync():异步执行

我们调用 done() 完成执行:

PipelineResult result = pipeline.done();

该调用会启动底层的 MapReduce 作业,完成读取、处理和写入全过程。

9. 完整流水线整合

将所有步骤整合进主流程:

public int run(String[] args) throws Exception {
    String inputPath = args[0];
    String outputPath = args[1];

    Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

    PCollection<String> lines = pipeline.readTextFile(inputPath);

    PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());

    PCollection<String> noStopWords = words.filter(new StopWordFilter());

    PTable<String, Long> counts = noStopWords.count();

    pipeline.writeTextFile(counts, outputPath);

    PipelineResult result = pipeline.done();

    return result.succeeded() ? 0 : 1;
}

逻辑清晰,一气呵成,这就是 Crunch 的魅力所在。

10. 启动配置

为了让程序能被 Hadoop 正确启动,主类需实现 Tool 接口:

public class WordCount extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new WordCount(), args);
    }

ToolRunner.run 会自动解析命令行参数并启动 MapReduce 作业。

11. 运行应用

打包应用:

mvn package

得到 target/crunch-1.0-SNAPSHOT-job.jar

提交到 Hadoop 执行:

hadoop jar target/crunch-1.0-SNAPSHOT-job.jar /input/file.txt /output/directory

输出示例:

[Add,1]
[Added,1]
[Admiration,1]
[Admitting,1]
[Allowance,1]

此外,也可在 IDE 中直接运行单元测试或作为独立应用调试,开发体验非常友好。

12. 总结

本文通过一个词频统计案例,展示了 Apache Crunch 如何简化 MapReduce 开发。其高级 API 使代码更简洁、易读、易测试,特别适合 Java 技术栈的数据处理场景。

完整源码已托管至 GitHub: https://github.com/eugenp/tutorials/tree/master/libraries-data


原始标题:A Guide to Apache Crunch