1. 简介
本文将通过一个实际的数据处理应用,带你快速上手 Apache Crunch,并基于 MapReduce 框架运行。
我们会先简要介绍 Crunch 的核心概念,然后进入实战环节:实现一个文本词频统计程序,流程如下:
✅ 从文本文件读取每一行
✅ 拆分为单词,并过滤常见停用词(stop words)
✅ 按单词分组,统计每个词的出现次数
✅ 将结果写入输出文件
整个过程简洁清晰,适合快速理解 Crunch 的编程模型。
2. 什么是 Apache Crunch?
MapReduce 是一种用于在服务器集群上并行处理海量数据的分布式编程模型,Hadoop 和 Spark 都实现了该模型。
Crunch 的定位是:为 Java 开发者提供一套高级 API,用于构建、测试和运行 MapReduce 数据流水线(pipeline)。你不需要直接编写 MapReduce 的 Mapper 和 Reducer,而是通过 Crunch 的 API 定义数据处理流程。Crunch 的 Planner 会自动将其编排为底层的 MapReduce 任务,并按需执行。
整个数据流水线由 Pipeline
接口协调。该接口负责:
- 通过
Source
读取输入数据 - 通过
Target
写出结果数据
Crunch 提供了三种核心数据抽象接口:
PCollection<T>
:不可变的、分布式的元素集合PTable<K, V>
:不可变的、分布式的键值多映射(类似 Map<K, List>) PGroupedTable<K, V>
:按键排序的分布式映射,值为可迭代一次的Iterable<V>
⚠️ DoFn
是所有数据处理函数的基类,它对应 MapReduce 中的 Mapper、Reducer 和 Combiner。我们大部分开发工作都集中在编写和测试 DoFn
上。
理解了这些概念后,接下来我们动手实现一个词频统计应用。
3. 项目搭建
使用 Maven 构建 Crunch 项目有两种方式:
- 在已有项目中添加依赖
- 使用官方 Archetype 快速生成脚手架
3.1. Maven 依赖
在 pom.xml
中添加核心依赖:
<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-core</artifactId>
<version>1.0.0</version>
</dependency>
同时添加 Hadoop 客户端依赖(版本需与集群一致):
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
建议前往 Maven Central 查看最新版本。
3.2. 使用 Maven Archetype
Crunch 提供了快速生成项目脚手架的 Archetype:
mvn archetype:generate -Dfilter=org.apache.crunch:crunch-archetype
执行后按提示输入版本号和项目信息即可生成基础项目结构,省去手动配置的麻烦。
4. Pipeline 初始化
项目搭建完成后,首先需要创建 Pipeline
实例。Crunch 提供了三种实现:
MRPipeline
:在 Hadoop MapReduce 上执行SparkPipeline
:在 Spark 上执行MemPipeline
:纯内存执行,非常适合单元测试
开发阶段推荐使用 MemPipeline
快速验证逻辑,生产环境则切换为 MRPipeline
。
创建内存管道用于测试:
Pipeline pipeline = MemPipeline.getInstance();
创建 MapReduce 管道用于实际运行:
Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
5. 读取输入数据
通过 Pipeline
接口的 readTextFile
方法可直接读取文本文件:
PCollection<String> lines = pipeline.readTextFile(inputPath);
该方法将文件每一行作为 String
存入 PCollection
。
配套的单元测试示例:
@Test
public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead() {
Pipeline pipeline = MemPipeline.getInstance();
PCollection<String> lines = pipeline.readTextFile("src/test/resources/input.txt");
assertEquals(21, lines.asCollection()
.getValue()
.size());
}
测试验证了文件行数是否符合预期。
6. 数据处理流程
Crunch 提供了多种 DoFn
的子类来简化常见操作:
FilterFn
:根据条件过滤元素MapFn
:一对一映射CombineFn
:合并多个值为单个值JoinFn
:支持内连接、外连接等
我们将用这些工具实现以下逻辑:
- 拆分行为单词
- 过滤停用词
- 统计词频
6.1. 拆分文本为单词
创建 Tokenizer
类继承 DoFn<String, String>
:
public class Tokenizer extends DoFn<String, String> {
private static final Splitter SPLITTER = Splitter
.onPattern("\\s+")
.omitEmptyStrings();
@Override
public void process(String line, Emitter<String> emitter) {
for (String word : SPLITTER.split(line)) {
emitter.emit(word);
}
}
}
使用 Guava 的 Splitter
按空白字符拆分,并忽略空字符串。
单元测试验证:
@RunWith(MockitoJUnitRunner.class)
public class TokenizerUnitTest {
@Mock
private Emitter<String> emitter;
@Test
public void givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEmitted() {
Tokenizer splitter = new Tokenizer();
splitter.process(" hello world ", emitter);
verify(emitter).emit("hello");
verify(emitter).emit("world");
verifyNoMoreInteractions(emitter);
}
}
使用 parallelDo
应用该函数:
PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());
parallelDo
对 PCollection
中每个元素应用 DoFn
,返回新的 PCollection
。
6.2. 过滤停用词
创建 StopWordFilter
继承 FilterFn<String>
:
public class StopWordFilter extends FilterFn<String> {
private static final Set<String> STOP_WORDS = ImmutableSet
.copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but", "by",
"for", "if", "in", "into", "is", "it", "no", "not", "of", "on",
"or", "s", "such", "t", "that", "the", "their", "then", "there",
"these", "they", "this", "to", "was", "will", "with" });
@Override
public boolean accept(String word) {
return !STOP_WORDS.contains(word);
}
}
单元测试覆盖正反例:
public class StopWordFilterUnitTest {
@Test
public void givenFilter_whenStopWordPassed_thenFalseReturned() {
FilterFn<String> filter = new StopWordFilter();
assertFalse(filter.accept("the"));
assertFalse(filter.accept("a"));
}
@Test
public void givenFilter_whenNonStopWordPassed_thenTrueReturned() {
FilterFn<String> filter = new StopWordFilter();
assertTrue(filter.accept("Hello"));
assertTrue(filter.accept("World"));
}
@Test
public void givenWordCollection_whenFiltered_thenStopWordsRemoved() {
PCollection<String> words = MemPipeline
.collectionOf("This", "is", "a", "test", "sentence");
PCollection<String> noStopWords = words.filter(new StopWordFilter());
assertEquals(ImmutableList.of("This", "test", "sentence"),
Lists.newArrayList(noStopWords.materialize()));
}
}
应用过滤器:
PCollection<String> noStopWords = words.filter(new StopWordFilter());
filter
方法返回过滤后的新集合。
6.3. 统计词频
PCollection
提供了 count()
方法直接统计各元素出现次数:
PTable<String, Long> counts = noStopWords.count();
该方法内部会自动编排 MapReduce 任务,输出为 PTable<String, Long>
,即单词到计数的映射。
7. 输出结果
使用 Pipeline
的 writeTextFile
方法将结果写入文件:
pipeline.writeTextFile(counts, outputPath);
该方法会将 PCollection
或 PTable
格式化为文本写入指定路径。
8. 执行流水线
⚠️ 注意:到目前为止的所有操作都只是定义了数据流水线,尚未真正执行。Crunch 采用惰性执行模型。
必须显式调用执行方法才会触发任务:
run()
:同步执行done()
:执行所有剩余任务并清理中间文件runAsync()
:异步执行
我们调用 done()
完成执行:
PipelineResult result = pipeline.done();
该调用会启动底层的 MapReduce 作业,完成读取、处理和写入全过程。
9. 完整流水线整合
将所有步骤整合进主流程:
public int run(String[] args) throws Exception {
String inputPath = args[0];
String outputPath = args[1];
Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
PCollection<String> lines = pipeline.readTextFile(inputPath);
PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());
PCollection<String> noStopWords = words.filter(new StopWordFilter());
PTable<String, Long> counts = noStopWords.count();
pipeline.writeTextFile(counts, outputPath);
PipelineResult result = pipeline.done();
return result.succeeded() ? 0 : 1;
}
逻辑清晰,一气呵成,这就是 Crunch 的魅力所在。
10. 启动配置
为了让程序能被 Hadoop 正确启动,主类需实现 Tool
接口:
public class WordCount extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new WordCount(), args);
}
ToolRunner.run
会自动解析命令行参数并启动 MapReduce 作业。
11. 运行应用
打包应用:
mvn package
得到 target/crunch-1.0-SNAPSHOT-job.jar
。
提交到 Hadoop 执行:
hadoop jar target/crunch-1.0-SNAPSHOT-job.jar /input/file.txt /output/directory
输出示例:
[Add,1]
[Added,1]
[Admiration,1]
[Admitting,1]
[Allowance,1]
此外,也可在 IDE 中直接运行单元测试或作为独立应用调试,开发体验非常友好。
12. 总结
本文通过一个词频统计案例,展示了 Apache Crunch 如何简化 MapReduce 开发。其高级 API 使代码更简洁、易读、易测试,特别适合 Java 技术栈的数据处理场景。
完整源码已托管至 GitHub: https://github.com/eugenp/tutorials/tree/master/libraries-data