1. 概述
Apache Avro 是一种广泛使用的数据序列化系统,因其高效性和模式演进能力而在大数据应用中特别受欢迎。本教程将介绍如何通过Avro将对象转换为JSON,以及将整个Avro文件转换为JSON文件。这在数据检查和调试场景中特别有用。
在当今数据驱动的世界中,处理不同数据格式的能力至关重要。Apache Avro常用于需要高性能和存储效率的系统,如Apache Hadoop。
2. 配置
首先,在 pom.xml 文件中添加Avro和JSON的依赖。
本教程使用Apache Avro的 1.11.1 版本:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.1</version>
</dependency>
3. 将Avro对象转换为JSON
通过Avro将Java对象转换为JSON涉及几个步骤:
- 推断/构建Avro模式
- 将Java对象转换为Avro GenericRecord
- 最后将对象转换为JSON
我们将使用Avro的Reflect API动态推断Java对象的模式,而不是手动定义模式。
为演示这一点,创建一个包含两个整型属性 x 和 y 的 Point 类:
public class Point {
private int x;
private int y;
public Point(int x, int y) {
this.x = x;
this.y = y;
}
// Getters and setters
}
现在推断模式:
public Schema inferSchema(Point p) {
return ReflectData.get().getSchema(p.getClass());
}
我们定义了 inferSchema 方法,使用 ReflectData 类的 getSchema 方法从 point 对象推断模式。该模式描述了字段 x 和 y 及其数据类型。
接下来,从 Point 对象创建 GenericRecord 对象并转换为JSON:
public String convertObjectToJson(Point p, Schema schema) {
try {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("x", p.getX());
genericRecord.put("y", p.getY());
Encoder encoder = EncoderFactory.get().jsonEncoder(schema, outputStream);
datumWriter.write(genericRecord, encoder);
encoder.flush();
outputStream.close();
return outputStream.toString();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
convertObjectToJson 方法使用提供的模式将 Point 对象转换为JSON字符串。首先基于模式创建 GenericRecord 对象,用 Point 对象的数据填充它,然后使用 DatumWriter 通过 JsonEncoder 对象将数据写入 ByteArrayOutputStream,最后调用 OutputStream 对象的 toString 方法获取JSON字符串。
验证生成的JSON内容:
private AvroFileToJsonFile avroFileToJsonFile;
private Point p;
private String expectedOutput;
@BeforeEach
public void setup() {
avroFileToJsonFile = new AvroFileToJsonFile();
p = new Point(2, 4);
expectedOutput = "{\"x\":2,\"y\":4}";
}
@Test
public void whenConvertedToJson_ThenEquals() {
String response = avroFileToJsonFile.convertObjectToJson(p, avroFileToJsonFile.inferSchema(p));
assertEquals(expectedOutput, response);
}
4. 将Avro文件转换为JSON文件
将整个Avro文件转换为JSON文件的过程类似,但涉及从文件读取。当磁盘上存储着Avro格式的数据需要转换为更易访问的格式(如JSON)时,这种操作很常见。
首先定义方法 writeAvroToFile,用于将Avro数据写入文件:
public void writeAvroToFile(Schema schema, List<Point> records, File writeLocation) {
try {
if (writeLocation.exists()) {
if (!writeLocation.delete()) {
System.err.println("Failed to delete existing file.");
return;
}
}
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
dataFileWriter.create(schema, writeLocation);
for (Point record: records) {
GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("x", record.getX());
genericRecord.put("y", record.getY());
dataFileWriter.append(genericRecord);
}
dataFileWriter.close();
} catch (IOException e) {
e.printStackTrace();
System.out.println("Error writing Avro file.");
}
}
该方法根据提供的 Schema 将 Point 对象构造成GenericRecord实例转换为Avro格式。GenericDatumWriter 序列化这些记录,然后使用 DataFileWriter 将它们写入Avro文件。
验证文件是否写入成功:
private File dataLocation;
private File jsonDataLocation;
...
@BeforeEach
public void setup() {
// 从resources文件夹加载文件
ClassLoader classLoader = getClass().getClassLoader();
dataLocation = new File(classLoader.getResource("").getFile(), "data.avro");
jsonDataLocation = new File(classLoader.getResource("").getFile(), "data.json");
...
}
...
@Test
public void whenAvroContentWrittenToFile_ThenExist(){
Schema schema = avroFileToJsonFile.inferSchema(p);
avroFileToJsonFile.writeAvroToFile(schema, List.of(p), dataLocation);
assertTrue(dataLocation.exists());
}
接下来,从存储位置读取文件并以JSON格式写入另一个文件。
创建方法 readAvroFromFileToJsonFile 处理此操作:
public void readAvroFromFileToJsonFile(File readLocation, File jsonFilePath) {
DatumReader<GenericRecord> reader = new GenericDatumReader<>();
try {
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(readLocation, reader);
DatumWriter<GenericRecord> jsonWriter = new GenericDatumWriter<>(dataFileReader.getSchema());
Schema schema = dataFileReader.getSchema();
OutputStream fos = new FileOutputStream(jsonFilePath);
JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, fos);
while (dataFileReader.hasNext()) {
GenericRecord record = dataFileReader.next();
System.out.println(record.toString());
jsonWriter.write(record, jsonEncoder);
jsonEncoder.flush();
}
dataFileReader.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
我们从 readLocation 读取Avro数据,以JSON格式写入 jsonFilePath。使用 DataFileReader 从Avro文件读取 GenericRecord 实例,然后使用 JsonEncoder 和 GenericDatumWriter 将这些记录序列化为JSON格式。
验证写入文件的JSON内容:
@Test
public void whenAvroFileWrittenToJsonFile_ThenJsonContentEquals() throws IOException {
avroFileToJsonFile.readAvroFromFileToJsonFile(dataLocation, jsonDataLocation);
String text = Files.readString(jsonDataLocation.toPath());
assertEquals(expectedOutput, text);
}
5. 总结
本文探讨了如何将Avro内容写入文件、读取并存储为JSON格式的文件,并通过示例说明了整个过程。另外值得注意的是,模式也可以存储在单独的文件中,而不是与数据一起包含。
示例和代码片段的实现可以在 GitHub 上找到。