1. 概述

Apache Avro 是一种广泛使用的数据序列化系统,因其高效性和模式演进能力而在大数据应用中特别受欢迎。本教程将介绍如何通过Avro将对象转换为JSON,以及将整个Avro文件转换为JSON文件。这在数据检查和调试场景中特别有用。

在当今数据驱动的世界中,处理不同数据格式的能力至关重要。Apache Avro常用于需要高性能和存储效率的系统,如Apache Hadoop。

2. 配置

首先,在 pom.xml 文件中添加Avro和JSON的依赖。

本教程使用Apache Avro的 1.11.1 版本:

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.11.1</version>
</dependency>

3. 将Avro对象转换为JSON

通过Avro将Java对象转换为JSON涉及几个步骤:

  • 推断/构建Avro模式
  • 将Java对象转换为Avro GenericRecord
  • 最后将对象转换为JSON

我们将使用Avro的Reflect API动态推断Java对象的模式,而不是手动定义模式。

为演示这一点,创建一个包含两个整型属性 xyPoint 类:

public class Point {
    private int x;
    private int y;

    public Point(int x, int y) {
        this.x = x;
        this.y = y;
    }

    // Getters and setters
}

现在推断模式:

public Schema inferSchema(Point p) {
    return ReflectData.get().getSchema(p.getClass());
}

我们定义了 inferSchema 方法,使用 ReflectData 类的 getSchema 方法从 point 对象推断模式。该模式描述了字段 xy 及其数据类型。

接下来,从 Point 对象创建 GenericRecord 对象并转换为JSON:

public String convertObjectToJson(Point p, Schema schema) {
    try {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
        GenericRecord genericRecord = new GenericData.Record(schema);
        genericRecord.put("x", p.getX());
        genericRecord.put("y", p.getY());
        Encoder encoder = EncoderFactory.get().jsonEncoder(schema, outputStream);
        datumWriter.write(genericRecord, encoder);
        encoder.flush();
        outputStream.close();
        return outputStream.toString();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

convertObjectToJson 方法使用提供的模式将 Point 对象转换为JSON字符串。首先基于模式创建 GenericRecord 对象,用 Point 对象的数据填充它,然后使用 DatumWriter 通过 JsonEncoder 对象将数据写入 ByteArrayOutputStream,最后调用 OutputStream 对象的 toString 方法获取JSON字符串。

验证生成的JSON内容:

private AvroFileToJsonFile avroFileToJsonFile;
private Point p;
private String expectedOutput;

@BeforeEach
public void setup() {
    avroFileToJsonFile = new AvroFileToJsonFile();
    p = new Point(2, 4);
    expectedOutput = "{\"x\":2,\"y\":4}";
}

@Test
public void whenConvertedToJson_ThenEquals() {
    String response = avroFileToJsonFile.convertObjectToJson(p, avroFileToJsonFile.inferSchema(p));
    assertEquals(expectedOutput, response);
}

4. 将Avro文件转换为JSON文件

将整个Avro文件转换为JSON文件的过程类似,但涉及从文件读取。当磁盘上存储着Avro格式的数据需要转换为更易访问的格式(如JSON)时,这种操作很常见。

首先定义方法 writeAvroToFile,用于将Avro数据写入文件:

public void writeAvroToFile(Schema schema, List<Point> records, File writeLocation) {
    try {
        if (writeLocation.exists()) {
            if (!writeLocation.delete()) {
                System.err.println("Failed to delete existing file.");
                return;
            }
        }
        GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
        dataFileWriter.create(schema, writeLocation);
        for (Point record: records) {
            GenericRecord genericRecord = new GenericData.Record(schema);
            genericRecord.put("x", record.getX());
            genericRecord.put("y", record.getY());
            dataFileWriter.append(genericRecord);
        }
        dataFileWriter.close();
    } catch (IOException e) {
        e.printStackTrace();
        System.out.println("Error writing Avro file.");
    }
}

该方法根据提供的 SchemaPoint 对象构造成GenericRecord实例转换为Avro格式。GenericDatumWriter 序列化这些记录,然后使用 DataFileWriter 将它们写入Avro文件。

验证文件是否写入成功:

private File dataLocation;
private File jsonDataLocation;
...

@BeforeEach
public void setup() {
    // 从resources文件夹加载文件
    ClassLoader classLoader = getClass().getClassLoader();
    dataLocation = new File(classLoader.getResource("").getFile(), "data.avro");
    jsonDataLocation = new File(classLoader.getResource("").getFile(), "data.json");
    ...
}

...

@Test
public void whenAvroContentWrittenToFile_ThenExist(){
    Schema schema = avroFileToJsonFile.inferSchema(p);
    avroFileToJsonFile.writeAvroToFile(schema, List.of(p), dataLocation);
    assertTrue(dataLocation.exists());
}

接下来,从存储位置读取文件并以JSON格式写入另一个文件。

创建方法 readAvroFromFileToJsonFile 处理此操作:

public void readAvroFromFileToJsonFile(File readLocation, File jsonFilePath) {
    DatumReader<GenericRecord> reader = new GenericDatumReader<>();
    try {
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(readLocation, reader);
        DatumWriter<GenericRecord> jsonWriter = new GenericDatumWriter<>(dataFileReader.getSchema());
        Schema schema = dataFileReader.getSchema();
        OutputStream fos = new FileOutputStream(jsonFilePath);
        JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, fos);
        while (dataFileReader.hasNext()) {
            GenericRecord record = dataFileReader.next();
            System.out.println(record.toString());
            jsonWriter.write(record, jsonEncoder);
            jsonEncoder.flush();
        }
        dataFileReader.close();
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

我们从 readLocation 读取Avro数据,以JSON格式写入 jsonFilePath。使用 DataFileReader 从Avro文件读取 GenericRecord 实例,然后使用 JsonEncoderGenericDatumWriter 将这些记录序列化为JSON格式。

验证写入文件的JSON内容:

@Test
public void whenAvroFileWrittenToJsonFile_ThenJsonContentEquals() throws IOException {
    avroFileToJsonFile.readAvroFromFileToJsonFile(dataLocation, jsonDataLocation);
    String text = Files.readString(jsonDataLocation.toPath());
    assertEquals(expectedOutput, text);
}

5. 总结

本文探讨了如何将Avro内容写入文件、读取并存储为JSON格式的文件,并通过示例说明了整个过程。另外值得注意的是,模式也可以存储在单独的文件中,而不是与数据一起包含。

示例和代码片段的实现可以在 GitHub 上找到。


原始标题:Convert Avro File to JSON File in Java | Baeldung