使用 AvroCoder 使用提供的模式序列化通用 class

Serializing generic class with provided schema using AvroCoder

让我们做一个简单的class。

class IntValue {
    private int data;

    IntValue() {}
    IntValue(int data) { this.setData(data); }

    int getData() { return this.data; }
    void setData(int data) { this.data = data; }
}

还有一个薄的通用包装器:

class Snapshot<T> {
    private T value;

    Snapshot<T> () {}

    T getValue() { return value; }
    void setValue(T value) { this.value = value; }
}

接下来让我们制作一个帮助器来获取包装器的架构。

private static Schema buildSnapshotSchema(Schema valueSchema) {
    return SchemaBuilder.record("Snapshot")
            .namespace("com.Whosebug.primer")
            .fields()
            .name("value").type(valueSchema).noDefault()
            .endRecord();
}

最后,我想以与在 Using Avrocoder for Custom Types with Generics

中创建的方式相同的方式创建具有显式模式的 AvroCoder
AvroCoder.of(Snapshot.class, buildSnapshotSchema(AvroCoder.of(IntValue.class).getSchema())

结果……

Exception in thread "main" java.lang.IllegalArgumentException: Unable to get field data from class null
at com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.getField(AvroCoder.java:710)
at com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.checkRecord(AvroCoder.java:548)
at com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.doCheck(AvroCoder.java:477)
at com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:453)
at com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.checkRecord(AvroCoder.java:567)
at com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.doCheck(AvroCoder.java:477)
at com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:453)
at com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:430)
at com.google.cloud.dataflow.sdk.coders.AvroCoder.<init>(AvroCoder.java:189)
at com.google.cloud.dataflow.sdk.coders.AvroCoder.of(AvroCoder.java:144)
at com.Whosebug.primer.GenericPipeline.main(GenericPipeline.java:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

AvroDeterminismChecker 无法从 Object 的实例中获取字段 "data"。这是有道理的,但为什么它不使用提供的模式来重建对象呢?在这种情况下如何创建 AvroCoder 的实例?

更新

发现另一个 post 有类似的问题,但似乎没有解决...

从未使用过 AvroCoder,但这也是其他框架中泛型的常见问题。这通常通过超类型令牌来解决。 AvroCoder 有一个采用类型标记的方法(在这种情况下为 TypeDescriptor)。

我没有尝试过此代码,但它有一定的工作可能性。

AvroCoder.of( new TypeDescriptor<Snapshot<InvValue>>(){}).of( buildSnapshotSchema(AvroCoder.of(IntValue.class).getSchema());

我将您的示例复制并粘贴到 Apache Beam 主分支中 AvroCoderTest 的代码中,测试没有失败并出现以下异常:

static class IntValue {
  private int data;

  IntValue() {}
  IntValue(int data) { this.setData(data); }

  int getData() { return this.data; }
  void setData(int data) { this.data = data; }
}
static class Snapshot<T> {
  private T value;

  Snapshot() {}

  T getValue() { return value; }
  void setValue(T value) { this.value = value; }
}
private static Schema buildSnapshotSchema(Schema valueSchema) {
  return SchemaBuilder.record("Snapshot")
      .namespace("blah")
      .fields()
      .name("value").type(valueSchema).noDefault()
      .endRecord();
}

@Test
public void testWhosebugPost() throws Exception {
  AvroCoder.of(Snapshot.class, buildSnapshotSchema(AvroCoder.of(IntValue.class).getSchema()));
}

也许问题已经解决了?

最后的说明: 版本 1.9 Dataflow SDK 不支持 在泛型上使用 AvroCoder。当 AvroCoder 构造函数试图猜测结果是否确定时会发生错误。

问题是 resolved for Apache Beam in Jule'16. After Apache Beam v0.2-incubating AvroCoder always instantiates not-deterministic coder for generics. For some reasons it was not backported to Dataflow SDK. Following @jkff advice I've created a PR Dataflow。如果维护者决定应用它——Dataflow SDK v1.10 将支持泛型。

好消息是已经有 SDK v.2.01-beta,它是 Apache Beam v0.4.0 的轻型代理,它支持泛型并有许多新增功能。转换需要一些代码更新,但这绝对值得。