在 Java Beam 管道中的 date/timestamps 上使用 LogicalType 'timestamp-millis' 写入 avro 文件

Write avro files with LogicalType 'timestamp-millis' on date/timestamps in Java Beam pipeline

我有几个管道从流式 JSON 记录写入 avro 文件,但我在将它们导入 BigQuery 时遇到问题,因为日期字段的 logicalType 未在 avro 架构中定义。

考虑以下简单的 PoJo:

@DefaultCoder(AvroCoder.class)
public class SampleClass {
    @AvroEncode(using=DateAsLongEncoding.class)
    private Date updateTime;

    public SampleClass() {
    }

    // Getters and setters
}

使用这个,字段被正确地保存到 avro 中。但是,LogicalType 未在架构中设置,当您希望它是 TIMESTAMPDATE 而不是 long.

时,在导入到 BigQuery 时会导致问题

我希望能够对字段进行注释,就像 @AvroEncode 一样。最好设置 @LogicalType('timestamp-millis').

有没有人做过类似的事情,或者有任何其他简单的方法来为字段指定 LogicalType?

您可以尝试指定 @AvroSchema,如 this test 所以你的例子看起来像

@DefaultCoder(AvroCoder.class)
public class SampleClass {
    @AvroEncode(using=DateAsLongEncoding.class)
    @AvroSchema("{\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}")
    private Date updateTime;

    public SampleClass() {
    }

    // Getters and setters
}

你也考虑过使用BigQueryIO to write directly? It has two write methods,一个写出文件并加载它们,另一个使用流式插入API。

这已通过使用 gson typeAdapters 反序列化传入 json 解决,如下所示:

GsonBuilder builder = new GsonBuilder();
builder.registerTypeAdapter(Sample.class, new JsonDeserializer() {
   @Override
   public Sample deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
        try {
           JsonObject jObj = json.getAsJsonObject();
           return new Sample(
               jObj.get("timestamp").getAsString()
           );
        } 
        catch (Exception e) {
           log.error("Sample parser failed" + e.toString());
           return null;
        }
    }
});
builder.create();

示例 class,使用 java.time.Instant 从 ISO 日期字符串创建毫秒:

@DefaultCoder(SnappyCoder.class)
public class Sample implements Serializable {
    @AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}")
    private long updateTime;

    public Sample(String timestamp) {
        this.updateTime = Instant.parse(timestamp).toEpochMilli();
    }
}