数据流中 PubsubMessage 的 getAttribute
getAttribute from PubsubMessage in Dataflow
我在尝试访问 pubsub 消息的属性时遇到问题。
错误信息如下:
Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a #structuralValue method which does not return true when the encoding of the elements is equal.
stackTrace: [org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage.getAttribute(PubsubMessage.java:56),
transform1.processElement(transform1.java:37),
transform1$DoFnInvoker.invokeProcessElement(Unknown Source),
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218),
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183),
org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:78),
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:216),
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54),
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160), org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124),
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511),
java.util.concurrent.FutureTask.run(FutureTask.java:266),
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149),
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624),
java.lang.Thread.run(Thread.java:748)]
我正在使用 Dataflow Eclipse SDK 在本地 运行 管道:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
产生错误的代码行是这样的:
String fieldId = c.element().getAttribute("evId");
ptransform 的完整代码如下:
public class transform1 extends DoFn<PubsubMessage, Event> {
public static TupleTag<ErrorHandler> failuresTag=new TupleTag<ErrorHandler>(){};
public static TupleTag<Event> validTag = new TupleTag<Event>(){};
public static PCollectionTuple process(PCollection<PubsubMessage> logStrings)
{
return logStrings.apply("Create PubSub objects", ParDo.of(new DoFn<PubsubMessage, Event>()
{
@ProcessElement
public void processElement(ProcessContext c)
{
try
{
Event event = new Event();
String fieldId = c.element().getAttribute("evId");
event.evId = "asa"; //this line is just to test to set a value
c.output(event);
<...>
我见过类似的 question 但我不确定如何修复它
主管道代码(如果需要)
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
/*
* Step 1: Read from PubSub
*/
PCollection<PubsubMessage> messages = null;
if (options.getUseSubscription()) {
messages = pipeline.apply("ReadPubSubSubscription", PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()).withIdAttribute("messageId"));
} else {
messages = pipeline.apply("ReadPubSubTopic", PubsubIO.readMessagesWithAttributes()
.fromTopic(options.getInputTopic()).withIdAttribute("messageId"));
}
/*
* Step 2: Transform PubSubMessage to Event
*/
PCollectionTuple eventCollections = transform1.process(messages);
PubSub 消息:
{ "evId":"id", "payload":"payload" }
我也试过:
"{ "evId":"id", "payload":"payload" }"
这是我在 pubsub 中发布消息以测试管道的方式:
进行更多测试后,我发布到 pubsub 的方式似乎是错误的根源,因为如果我添加为属性而不是消息正文,问题就会消失。
原因是我试图访问这里的属性:
String fieldId = c.element().getAttribute("evId");
但是当我通过 pubsub 仪表板发送消息时,我没有添加任何属性,这导致所有管道崩溃。
我在尝试访问 pubsub 消息的属性时遇到问题。
错误信息如下:
Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a #structuralValue method which does not return true when the encoding of the elements is equal.
stackTrace: [org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage.getAttribute(PubsubMessage.java:56),
transform1.processElement(transform1.java:37),
transform1$DoFnInvoker.invokeProcessElement(Unknown Source),
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218),
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183),
org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:78),
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:216),
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54),
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160), org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124),
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511),
java.util.concurrent.FutureTask.run(FutureTask.java:266),
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149),
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624),
java.lang.Thread.run(Thread.java:748)]
我正在使用 Dataflow Eclipse SDK 在本地 运行 管道:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
产生错误的代码行是这样的:
String fieldId = c.element().getAttribute("evId");
ptransform 的完整代码如下:
public class transform1 extends DoFn<PubsubMessage, Event> {
public static TupleTag<ErrorHandler> failuresTag=new TupleTag<ErrorHandler>(){};
public static TupleTag<Event> validTag = new TupleTag<Event>(){};
public static PCollectionTuple process(PCollection<PubsubMessage> logStrings)
{
return logStrings.apply("Create PubSub objects", ParDo.of(new DoFn<PubsubMessage, Event>()
{
@ProcessElement
public void processElement(ProcessContext c)
{
try
{
Event event = new Event();
String fieldId = c.element().getAttribute("evId");
event.evId = "asa"; //this line is just to test to set a value
c.output(event);
<...>
我见过类似的 question 但我不确定如何修复它
主管道代码(如果需要)
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
/*
* Step 1: Read from PubSub
*/
PCollection<PubsubMessage> messages = null;
if (options.getUseSubscription()) {
messages = pipeline.apply("ReadPubSubSubscription", PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()).withIdAttribute("messageId"));
} else {
messages = pipeline.apply("ReadPubSubTopic", PubsubIO.readMessagesWithAttributes()
.fromTopic(options.getInputTopic()).withIdAttribute("messageId"));
}
/*
* Step 2: Transform PubSubMessage to Event
*/
PCollectionTuple eventCollections = transform1.process(messages);
PubSub 消息:
{ "evId":"id", "payload":"payload" }
我也试过:
"{ "evId":"id", "payload":"payload" }"
这是我在 pubsub 中发布消息以测试管道的方式:
进行更多测试后,我发布到 pubsub 的方式似乎是错误的根源,因为如果我添加为属性而不是消息正文,问题就会消失。
原因是我试图访问这里的属性:
String fieldId = c.element().getAttribute("evId");
但是当我通过 pubsub 仪表板发送消息时,我没有添加任何属性,这导致所有管道崩溃。