在 Java Beam 管道中的 date/timestamps 上使用 LogicalType 'timestamp-millis' 写入 avro 文件
Write avro files with LogicalType 'timestamp-millis' on date/timestamps in Java Beam pipeline
我有几个管道从流式 JSON 记录写入 avro 文件,但我在将它们导入 BigQuery 时遇到问题,因为日期字段的 logicalType 未在 avro 架构中定义。
考虑以下简单的 PoJo:
@DefaultCoder(AvroCoder.class)
public class SampleClass {
@AvroEncode(using=DateAsLongEncoding.class)
private Date updateTime;
public SampleClass() {
}
// Getters and setters
}
使用这个,字段被正确地保存到 avro 中。但是,LogicalType 未在架构中设置,当您希望它是 TIMESTAMP
或 DATE
而不是 long.
时,在导入到 BigQuery 时会导致问题
我希望能够对字段进行注释,就像 @AvroEncode
一样。最好设置 @LogicalType('timestamp-millis')
.
有没有人做过类似的事情,或者有任何其他简单的方法来为字段指定 LogicalType?
您可以尝试指定 @AvroSchema
,如 this test
所以你的例子看起来像
@DefaultCoder(AvroCoder.class)
public class SampleClass {
@AvroEncode(using=DateAsLongEncoding.class)
@AvroSchema("{\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}")
private Date updateTime;
public SampleClass() {
}
// Getters and setters
}
你也考虑过使用BigQueryIO to write directly? It has two write methods,一个写出文件并加载它们,另一个使用流式插入API。
这已通过使用 gson typeAdapters 反序列化传入 json 解决,如下所示:
GsonBuilder builder = new GsonBuilder();
builder.registerTypeAdapter(Sample.class, new JsonDeserializer() {
@Override
public Sample deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
try {
JsonObject jObj = json.getAsJsonObject();
return new Sample(
jObj.get("timestamp").getAsString()
);
}
catch (Exception e) {
log.error("Sample parser failed" + e.toString());
return null;
}
}
});
builder.create();
示例 class,使用 java.time.Instant 从 ISO 日期字符串创建毫秒:
@DefaultCoder(SnappyCoder.class)
public class Sample implements Serializable {
@AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}")
private long updateTime;
public Sample(String timestamp) {
this.updateTime = Instant.parse(timestamp).toEpochMilli();
}
}
我有几个管道从流式 JSON 记录写入 avro 文件,但我在将它们导入 BigQuery 时遇到问题,因为日期字段的 logicalType 未在 avro 架构中定义。
考虑以下简单的 PoJo:
@DefaultCoder(AvroCoder.class)
public class SampleClass {
@AvroEncode(using=DateAsLongEncoding.class)
private Date updateTime;
public SampleClass() {
}
// Getters and setters
}
使用这个,字段被正确地保存到 avro 中。但是,LogicalType 未在架构中设置,当您希望它是 TIMESTAMP
或 DATE
而不是 long.
我希望能够对字段进行注释,就像 @AvroEncode
一样。最好设置 @LogicalType('timestamp-millis')
.
有没有人做过类似的事情,或者有任何其他简单的方法来为字段指定 LogicalType?
您可以尝试指定 @AvroSchema
,如 this test
所以你的例子看起来像
@DefaultCoder(AvroCoder.class)
public class SampleClass {
@AvroEncode(using=DateAsLongEncoding.class)
@AvroSchema("{\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}")
private Date updateTime;
public SampleClass() {
}
// Getters and setters
}
你也考虑过使用BigQueryIO to write directly? It has two write methods,一个写出文件并加载它们,另一个使用流式插入API。
这已通过使用 gson typeAdapters 反序列化传入 json 解决,如下所示:
GsonBuilder builder = new GsonBuilder();
builder.registerTypeAdapter(Sample.class, new JsonDeserializer() {
@Override
public Sample deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
try {
JsonObject jObj = json.getAsJsonObject();
return new Sample(
jObj.get("timestamp").getAsString()
);
}
catch (Exception e) {
log.error("Sample parser failed" + e.toString());
return null;
}
}
});
builder.create();
示例 class,使用 java.time.Instant 从 ISO 日期字符串创建毫秒:
@DefaultCoder(SnappyCoder.class)
public class Sample implements Serializable {
@AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}")
private long updateTime;
public Sample(String timestamp) {
this.updateTime = Instant.parse(timestamp).toEpochMilli();
}
}