Dataflow输出参数化类型到avro文件
Dataflow output parameterized type to avro file
我有一个成功输出 Avro 文件的管道,如下所示:
@DefaultCoder(AvroCoder.class)
class MyOutput_T_S {
T foo;
S bar;
Boolean baz;
public MyOutput_T_S() {}
}
@DefaultCoder(AvroCoder.class)
class T {
String id;
public T() {}
}
@DefaultCoder(AvroCoder.class)
class S {
String id;
public S() {}
}
...
PCollection<MyOutput_T_S> output = input.apply(myTransform);
output.apply(AvroIO.Write.to("/out").withSchema(MyOutput_T_S.class));
除了参数化输出 MyOutput<T, S>
(其中 T
和 S
都可以使用反射进行 Avro 编码),我如何才能重现这种确切的行为。
主要问题是 Avro 反射不适用于参数化类型。因此,基于这些回复:
- Setting Custom Coders & Handling Parameterized types
- Using Avrocoder for Custom Types with Generics
1) 我想我需要写一个自定义 CoderFactory
但是,我很难弄清楚它到底是如何工作的(我很难找到示例)。奇怪的是,一个完全天真的编码器工厂似乎让我 运行 管道并使用 DataflowAssert 检查正确的输出:
cr.RegisterCoder(MyOutput.class, new CoderFactory() {
@Override
public Coder<?> create(List<? excents Coder<?>> componentCoders) {
Schema schema = new Schema.Parser().parse("{\"type\":\"record\,"
+ "\"name\":\"MyOutput\","
+ "\"namespace\":\"mypackage"\","
+ "\"fields\":[]}"
return AvroCoder.of(MyOutput.class, schema);
}
@Override
public List<Object> getInstanceComponents(Object value) {
MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value;
List components = new ArrayList();
return components;
}
虽然我现在可以成功断言输出,但我预计这不会因为写入文件而中断它。我还没有弄清楚我应该如何使用提供的 componentCoders
来生成正确的模式,如果我尝试将 T
或 S
的模式推入 fields
我得到:
java.lang.IllegalArgumentException: Unable to get field id from class null
2) 假设我知道如何编码 MyOutput
。我将什么传递给 AvroIO.Write.withSchema
?如果我通过 MyOutput.class
或架构,我会收到类型不匹配错误。
我觉得有两个问题(有错请指正):
- 如何使编码器注册表为
MyOutput<T, S>
的各种参数化提供编码器?
- 如何使用
AvroIO.Write
. 将 MyOutput<T, S>
的值写入文件
第一个问题将通过注册一个 CoderFactory
来解决,就像您找到的链接问题一样。
您天真的编码器可能允许您 运行 管道没有问题,因为序列化正在被优化掉。当然,没有字段的 Avro 模式会导致这些字段在序列化+反序列化往返过程中被丢弃。
但是假设您使用字段填充模式,您对 CoderFactory#create
的处理方法看起来是正确的。我不知道消息 java.lang.IllegalArgumentException: Unable to get field id from class null
的确切原因,但对 AvroCoder.of(MyOutput.class, schema)
的调用应该有效,因为适当组装 schema
。如果这有问题,更多详细信息(例如堆栈轨道的其余部分)将会有所帮助。
但是,您对 CoderFactory#getInstanceComponents
的覆盖应该 return 一个值列表,每个 MyOutput
的类型参数一个。像这样:
@Override
public List<Object> getInstanceComponents(Object value) {
MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value;
return ImmutableList.of(myOutput.foo, myOutput.bar);
}
第二个问题可以使用与第一个相同的一些支持代码来回答,但在其他方面是独立的。 AvroIO.Write.withSchema
始终明确使用提供的模式。它确实在幕后使用 AvroCoder
,但这实际上是一个实现细节。提供一个兼容的模式是所有必要的——这样的模式将必须为每个要输出 MyOutput<T, S>
.
的 T
和 S
的值组成
我有一个成功输出 Avro 文件的管道,如下所示:
@DefaultCoder(AvroCoder.class)
class MyOutput_T_S {
T foo;
S bar;
Boolean baz;
public MyOutput_T_S() {}
}
@DefaultCoder(AvroCoder.class)
class T {
String id;
public T() {}
}
@DefaultCoder(AvroCoder.class)
class S {
String id;
public S() {}
}
...
PCollection<MyOutput_T_S> output = input.apply(myTransform);
output.apply(AvroIO.Write.to("/out").withSchema(MyOutput_T_S.class));
除了参数化输出 MyOutput<T, S>
(其中 T
和 S
都可以使用反射进行 Avro 编码),我如何才能重现这种确切的行为。
主要问题是 Avro 反射不适用于参数化类型。因此,基于这些回复:
- Setting Custom Coders & Handling Parameterized types
- Using Avrocoder for Custom Types with Generics
1) 我想我需要写一个自定义 CoderFactory
但是,我很难弄清楚它到底是如何工作的(我很难找到示例)。奇怪的是,一个完全天真的编码器工厂似乎让我 运行 管道并使用 DataflowAssert 检查正确的输出:
cr.RegisterCoder(MyOutput.class, new CoderFactory() {
@Override
public Coder<?> create(List<? excents Coder<?>> componentCoders) {
Schema schema = new Schema.Parser().parse("{\"type\":\"record\,"
+ "\"name\":\"MyOutput\","
+ "\"namespace\":\"mypackage"\","
+ "\"fields\":[]}"
return AvroCoder.of(MyOutput.class, schema);
}
@Override
public List<Object> getInstanceComponents(Object value) {
MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value;
List components = new ArrayList();
return components;
}
虽然我现在可以成功断言输出,但我预计这不会因为写入文件而中断它。我还没有弄清楚我应该如何使用提供的 componentCoders
来生成正确的模式,如果我尝试将 T
或 S
的模式推入 fields
我得到:
java.lang.IllegalArgumentException: Unable to get field id from class null
2) 假设我知道如何编码 MyOutput
。我将什么传递给 AvroIO.Write.withSchema
?如果我通过 MyOutput.class
或架构,我会收到类型不匹配错误。
我觉得有两个问题(有错请指正):
- 如何使编码器注册表为
MyOutput<T, S>
的各种参数化提供编码器? - 如何使用
AvroIO.Write
. 将
MyOutput<T, S>
的值写入文件
第一个问题将通过注册一个 CoderFactory
来解决,就像您找到的链接问题一样。
您天真的编码器可能允许您 运行 管道没有问题,因为序列化正在被优化掉。当然,没有字段的 Avro 模式会导致这些字段在序列化+反序列化往返过程中被丢弃。
但是假设您使用字段填充模式,您对 CoderFactory#create
的处理方法看起来是正确的。我不知道消息 java.lang.IllegalArgumentException: Unable to get field id from class null
的确切原因,但对 AvroCoder.of(MyOutput.class, schema)
的调用应该有效,因为适当组装 schema
。如果这有问题,更多详细信息(例如堆栈轨道的其余部分)将会有所帮助。
但是,您对 CoderFactory#getInstanceComponents
的覆盖应该 return 一个值列表,每个 MyOutput
的类型参数一个。像这样:
@Override
public List<Object> getInstanceComponents(Object value) {
MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value;
return ImmutableList.of(myOutput.foo, myOutput.bar);
}
第二个问题可以使用与第一个相同的一些支持代码来回答,但在其他方面是独立的。 AvroIO.Write.withSchema
始终明确使用提供的模式。它确实在幕后使用 AvroCoder
,但这实际上是一个实现细节。提供一个兼容的模式是所有必要的——这样的模式将必须为每个要输出 MyOutput<T, S>
.
T
和 S
的值组成