1. 引言

本文将探讨在Java中使用Apache Avro序列化和反序列化日期对象的多种方法。Avro是一个数据序列化系统,提供紧凑、快速的二进制数据格式,以及基于模式的数据定义。

在Avro中处理日期时,我们面临一个挑战:Avro的类型系统本身并不原生支持Java的Date类。 下面我们详细分析这个序列化难题。

2. 日期序列化的挑战

首先,在Maven项目中添加Avro依赖:

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.12.0</version>
</dependency>

Avro的类型系统包含原始类型:null, boolean, int, long, float, double, bytes, string,以及复杂类型:record, enum, array, map, union, fixed

看个例子,为什么日期序列化在Avro中可能是个坑:

public class DateContainer {
    private Date date;
    
    // 构造方法、getter和setter
}

当我们尝试使用Avro的反射序列化直接处理这个类时,默认行为会将Date对象内部转换为long值(自纪元以来的毫秒数)。

不幸的是,这个过程可能导致精度问题。比如反序列化后的值可能与原始值相差几毫秒。

3. 实现日期序列化

接下来,我们将通过两种方法实现日期的序列化和反序列化:使用GenericRecord的逻辑类型,以及使用Avro的Conversion API。

3.1. 使用GenericRecord的逻辑类型

自Avro 1.8起,框架提供了逻辑类型。这些类型为底层原始类型添加了必要的语义。

对于日期,我们有三种逻辑类型:

  1. date:表示不含时间的日期,存储为int(自纪元以来的天数)
  2. timestamp-millis:表示毫秒级精度的时间戳,存储为long
  3. timestamp-micros:表示微秒级精度的时间戳,存储为long

现在看如何在Avro模式中使用这些逻辑类型:

public static Schema createDateSchema() {
    String schemaJson = 
        "{"
        + "\"type\": \"record\","
        + "\"name\": \"DateRecord\","
        + "\"fields\": ["
        + "  {\"name\": \"date\", \"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},"
        + "  {\"name\": \"timestamp\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}"
        + "]"
        + "}";
    return new Schema.Parser().parse(schemaJson);
}

注意:我们将逻辑类型应用于底层原始类型,而不是直接应用于字段。

下面是使用逻辑类型实现日期序列化的方法:

public static byte[] serializeDateWithLogicalType(LocalDate date, Instant timestamp) {
    Schema schema = createDateSchema();
    GenericRecord record = new GenericData.Record(schema);
    
    record.put("date", (int) date.toEpochDay());
    
    record.put("timestamp", timestamp.toEpochMilli());
    
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
    Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
    
    datumWriter.write(record, encoder);
    encoder.flush();
    
    return baos.toByteArray();
}

关键步骤: ✅ 将LocalDate转换为自纪元以来的天数 ✅ 将时间戳转换为自纪元以来的毫秒数 ✅ 通过逻辑类型实现精确转换

反序列化方法实现:

public static Pair<LocalDate, Instant> deserializeDateWithLogicalType(byte[] bytes) {
    Schema schema = createDateSchema();
    DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
    Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
    
    GenericRecord record = datumReader.read(null, decoder);
    
    LocalDate date = LocalDate.ofEpochDay((int) record.get("date"));
    
    Instant timestamp = Instant.ofEpochMilli((long) record.get("timestamp"));
    
    return Pair.of(date, timestamp);
}

最后测试我们的实现:

@Test
void whenSerializingDateWithLogicalType_thenDeserializesCorrectly() {

    LocalDate expectedDate = LocalDate.now();
    Instant expectedTimestamp = Instant.now();

    byte[] serialized = serializeDateWithLogicalType(expectedDate, expectedTimestamp);
    Pair<LocalDate, Instant> deserialized = deserializeDateWithLogicalType(serialized);

    assertEquals(expectedDate, deserialized.getLeft());

    assertEquals(expectedTimestamp.toEpochMilli(), deserialized.getRight().toEpochMilli(),
            "时间戳在毫秒精度上应完全匹配");
}

从测试结果看,timestamp-millis逻辑类型保持了精度,时间戳完全匹配。 此外,使用逻辑类型使数据格式在模式定义中更明确,这对模式开发和文档很有价值。

3.2. 使用Avro的Conversion API

Avro提供了能自动处理逻辑类型的Conversion API。 这不是独立的方法,而是建立在逻辑类型之上的优化方案,能加速转换过程。

它帮我们省去了手动转换Java类型和Avro内部表示的麻烦,同时增加了类型安全性。

下面是自动处理逻辑类型的实现:

public static byte[] serializeWithConversionApi(LocalDate date, Instant timestamp) {
    Schema schema = createDateSchema();
    GenericRecord record = new GenericData.Record(schema);
    
    Conversion<LocalDate> dateConversion = new org.apache.avro.data.TimeConversions.DateConversion();
    LogicalTypes.date().addToSchema(schema.getField("date").schema());
    
    Conversion<Instant> timestampConversion = 
                                new org.apache.avro.data.TimeConversions.TimestampMillisConversion();
    LogicalTypes.timestampMillis().addToSchema(schema.getField("timestamp").schema());
    
    record.put("date", dateConversion.toInt(date, 
                                            schema.getField("date").schema(), 
                                            LogicalTypes.date()));
    record.put("timestamp", 
                timestampConversion.toLong(timestamp, schema.getField("timestamp").schema(), 
                LogicalTypes.timestampMillis()));
    
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
    Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
    
    datumWriter.write(record, encoder);
    encoder.flush();
    
    return baos.toByteArray();
}

与前一种方法不同,这里我们使用LogicalTypes.date()LogicalTypes.timestampMillis()进行转换。

反序列化方法实现:

public static Pair<LocalDate, Instant> deserializeWithConversionApi(byte[] bytes) {
    Schema schema = createDateSchema();
    DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
    Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
    
    GenericRecord record = datumReader.read(null, decoder);
    
    Conversion<LocalDate> dateConversion = new DateConversion();
    LogicalTypes.date().addToSchema(schema.getField("date").schema());
    
    Conversion<Instant> timestampConversion = new TimestampMillisConversion();
    LogicalTypes.timestampMillis().addToSchema(schema.getField("timestamp").schema());
    
    int daysSinceEpoch = (int) record.get("date");
    long millisSinceEpoch = (long) record.get("timestamp");
    
    LocalDate date = dateConversion.fromInt(
        daysSinceEpoch, 
        schema.getField("date").schema(), 
        LogicalTypes.date()
    );
    
    Instant timestamp = timestampConversion.fromLong(
        millisSinceEpoch, 
        schema.getField("timestamp").schema(), 
        LogicalTypes.timestampMillis()
    );
    
    return Pair.of(date, timestamp);
}

验证实现:

@Test
void whenSerializingWithConversionApi_thenDeserializesCorrectly() {

    LocalDate expectedDate = LocalDate.now();
    Instant expectedTimestamp = Instant.now();

    byte[] serialized = serializeWithConversionApi(expectedDate, expectedTimestamp);
    Pair<LocalDate, Instant> deserialized = deserializeWithConversionApi(serialized);

    assertEquals(expectedDate, deserialized.getLeft());
    assertEquals(expectedTimestamp.toEpochMilli(), deserialized.getRight().toEpochMilli(),
            "时间戳在毫秒精度上应匹配");
}

4. 处理使用Date的遗留代码

许多现有Java应用仍在使用遗留的java.util.Date类。对于这类代码库,我们需要策略来处理这些对象的序列化。

一个简单粗暴的方案是:在序列化前将遗留日期转换为现代Java时间API:

public static byte[] serializeLegacyDateAsModern(Date legacyDate) {
    Instant instant = legacyDate.toInstant();
    
    LocalDate localDate = instant.atZone(ZoneId.systemDefault()).toLocalDate();
    
    return serializeDateWithLogicalType(localDate, instant);
}

然后使用前述方法之一序列化日期。这种方法让我们能利用Avro的逻辑类型,同时仍能处理遗留的Date对象。

测试实现:

@Test
void whenSerializingLegacyDate_thenConvertsCorrectly() {

    Date legacyDate = new Date();
    LocalDate expectedLocalDate = legacyDate.toInstant()
      .atZone(ZoneId.systemDefault())
      .toLocalDate();

    byte[] serialized = serializeLegacyDateAsModern(legacyDate);
    LocalDate deserialized = deserializeDateWithLogicalType(serialized).getKey();
    
    assertEquals(expectedLocalDate, deserialized);
}

5. 结论

本文探讨了使用Avro序列化日期对象的多种方法。我们学习了如何使用Avro的逻辑类型正确表示日期和时间戳值。

对于大多数现代应用,使用Avro的Conversion API配合java.time类处理逻辑类型是最佳方案。 这种组合提供了类型安全、正确的语义保持,以及与Avro模式扩展能力的兼容性。

完整代码可在GitHub获取。


原始标题:How to Serialize and Deserialize Dates in Avro | Baeldung