数据流:字符串到 pubsub 消息
Dataflow: string to pubsub message
我正在尝试在 Dataflow 中进行单元测试。
对于该测试,在开始时,我将从一个简单的硬编码字符串开始。
问题是我需要将该字符串转换为 pubsub 消息。我得到了以下代码来做到这一点:
// Create a PCollection from string a transform to pubsub message format
PCollection<PubsubMessage> input = p.apply("input string", Create.of("test" +
""))
.apply("convert to Pub/Sub message", ParDo.of(new DoFn<String, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new PubsubMessage(c.element().getBytes(), null));
}
}));
但是我收到以下错误:
java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=com.xxx.pipeline.TesterPipeline@7b64240d, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
<...>
Caused by: java.io.NotSerializableException: com.xxx.pipeline.TesterPipeline
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
... 50 more
我应该如何从字符串创建 pubsub 消息?
你可以看看下面的 link 如何实现这个 - https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/test/java/com/google/cloud/teleport/templates/PubsubToPubsubTest.java
在用户 ParDos 的序列化要求下的 Beam Programming Guide 中,它提到了这一点:
Take care when declaring your function object inline by using an anonymous inner class instance. In a non-static context, your inner class instance will implicitly contain a pointer to the enclosing class and that class’ state. That enclosing class will also be serialized, and thus the same considerations that apply to the function object itself also apply to this outer class.
发生的事情是您的匿名 DoFn 隐式包含指向您正在其中构建管道的 class 的指针,这导致了此序列化失败。你可以通过让你的 DoFn 命名为 subclass 而不是 anonymous:
来避免这种情况
public class MyDoFn extends DoFn<String, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new PubsubMessage(c.element().getBytes(), null));
}
}
我正在尝试在 Dataflow 中进行单元测试。
对于该测试,在开始时,我将从一个简单的硬编码字符串开始。
问题是我需要将该字符串转换为 pubsub 消息。我得到了以下代码来做到这一点:
// Create a PCollection from string a transform to pubsub message format
PCollection<PubsubMessage> input = p.apply("input string", Create.of("test" +
""))
.apply("convert to Pub/Sub message", ParDo.of(new DoFn<String, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new PubsubMessage(c.element().getBytes(), null));
}
}));
但是我收到以下错误:
java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=com.xxx.pipeline.TesterPipeline@7b64240d, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
<...>
Caused by: java.io.NotSerializableException: com.xxx.pipeline.TesterPipeline
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
... 50 more
我应该如何从字符串创建 pubsub 消息?
你可以看看下面的 link 如何实现这个 - https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/test/java/com/google/cloud/teleport/templates/PubsubToPubsubTest.java
在用户 ParDos 的序列化要求下的 Beam Programming Guide 中,它提到了这一点:
Take care when declaring your function object inline by using an anonymous inner class instance. In a non-static context, your inner class instance will implicitly contain a pointer to the enclosing class and that class’ state. That enclosing class will also be serialized, and thus the same considerations that apply to the function object itself also apply to this outer class.
发生的事情是您的匿名 DoFn 隐式包含指向您正在其中构建管道的 class 的指针,这导致了此序列化失败。你可以通过让你的 DoFn 命名为 subclass 而不是 anonymous:
来避免这种情况public class MyDoFn extends DoFn<String, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new PubsubMessage(c.element().getBytes(), null));
}
}