Dataflow输出参数化类型到avro文件

Dataflow output parameterized type to avro file

我有一个成功输出 Avro 文件的管道,如下所示:

@DefaultCoder(AvroCoder.class)
class MyOutput_T_S {
  T foo;
  S bar;
  Boolean baz;
  public MyOutput_T_S() {}
}

@DefaultCoder(AvroCoder.class)
class T {
  String id;
  public T() {}
}

@DefaultCoder(AvroCoder.class)
class S {
  String id;
  public S() {}
}
...
PCollection<MyOutput_T_S> output = input.apply(myTransform);
output.apply(AvroIO.Write.to("/out").withSchema(MyOutput_T_S.class));

除了参数化输出 MyOutput<T, S>(其中 TS 都可以使用反射进行 Avro 编码),我如何才能重现这种确切的行为。

主要问题是 Avro 反射不适用于参数化类型。因此,基于这些回复:

1) 我想我需要写一个自定义 CoderFactory 但是,我很难弄清楚它到底是如何工作的(我很难找到示例)。奇怪的是,一个完全天真的编码器工厂似乎让我 运行 管道并使用 DataflowAssert 检查正确的输出:

cr.RegisterCoder(MyOutput.class, new CoderFactory() {
  @Override
  public Coder<?> create(List<? excents Coder<?>> componentCoders) {
    Schema schema = new Schema.Parser().parse("{\"type\":\"record\,"
      + "\"name\":\"MyOutput\","
      + "\"namespace\":\"mypackage"\","
      + "\"fields\":[]}"
    return AvroCoder.of(MyOutput.class, schema);
  }
  @Override
  public List<Object> getInstanceComponents(Object value) {
    MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value;
    List components = new ArrayList();
    return components;
  }

虽然我现在可以成功断言输出,但我预计这不会因为写入文件而中断它。我还没有弄清楚我应该如何使用提供的 componentCoders 来生成正确的模式,如果我尝试将 TS 的模式推入 fields 我得到:

java.lang.IllegalArgumentException: Unable to get field id from class null

2) 假设我知道如何编码 MyOutput。我将什么传递给 AvroIO.Write.withSchema?如果我通过 MyOutput.class 或架构,我会收到类型不匹配错误。

我觉得有两个问题(有错请指正):

  1. 如何使编码器注册表为 MyOutput<T, S> 的各种参数化提供编码器?
  2. 如何使用 AvroIO.Write.
  3. MyOutput<T, S> 的值写入文件

第一个问题将通过注册一个 CoderFactory 来解决,就像您找到的链接问题一样。

您天真的编码器可能允许您 运行 管道没有问题,因为序列化正在被优化掉。当然,没有字段的 Avro 模式会导致这些字段在序列化+反序列化往返过程中被丢弃。

但是假设您使用字段填充模式,您对 CoderFactory#create 的处理方法看起来是正确的。我不知道消息 java.lang.IllegalArgumentException: Unable to get field id from class null 的确切原因,但对 AvroCoder.of(MyOutput.class, schema) 的调用应该有效,因为适当组装 schema。如果这有问题,更多详细信息(例如堆栈轨道的其余部分)将会有所帮助。

但是,您对 CoderFactory#getInstanceComponents 的覆盖应该 return 一个值列表,每个 MyOutput 的类型参数一个。像这样:

@Override
public List<Object> getInstanceComponents(Object value) {
  MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value;
  return ImmutableList.of(myOutput.foo, myOutput.bar);
}

第二个问题可以使用与第一个相同的一些支持代码来回答,但在其他方面是独立的。 AvroIO.Write.withSchema 始终明确使用提供的模式。它确实在幕后使用 AvroCoder ,但这实际上是一个实现细节。提供一个兼容的模式是所有必要的——这样的模式将必须为每个要输出 MyOutput<T, S>.

TS 的值组成