数据流中 PubsubMessage 的 getAttribute

getAttribute from PubsubMessage in Dataflow

我在尝试访问 pubsub 消息的属性时遇到问题。

错误信息如下:

Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a #structuralValue method which does not return true when the encoding of the elements is equal. 

stackTrace: [org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage.getAttribute(PubsubMessage.java:56),
transform1.processElement(transform1.java:37),
transform1$DoFnInvoker.invokeProcessElement(Unknown Source),
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218),
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183),
org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:78),
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:216),
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54),
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160), org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124),
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511),
java.util.concurrent.FutureTask.run(FutureTask.java:266),
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149),
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624),
java.lang.Thread.run(Thread.java:748)]

我正在使用 Dataflow Eclipse SDK 在本地 运行 管道:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-direct-java</artifactId>
  <version>${beam.version}</version>
  <scope>runtime</scope>
</dependency>

产生错误的代码行是这样的:

String fieldId = c.element().getAttribute("evId");

ptransform 的完整代码如下:

public class transform1 extends DoFn<PubsubMessage, Event> {

    public static TupleTag<ErrorHandler> failuresTag=new TupleTag<ErrorHandler>(){};
    public static TupleTag<Event> validTag = new TupleTag<Event>(){};

    public static PCollectionTuple process(PCollection<PubsubMessage> logStrings)
    {
        return logStrings.apply("Create PubSub objects", ParDo.of(new DoFn<PubsubMessage, Event>()
        {
            @ProcessElement
            public void processElement(ProcessContext c)
            {
                try 
                {
                    Event event = new Event();
                    String fieldId = c.element().getAttribute("evId");
                    event.evId = "asa"; //this line is just to test to set a value
                    c.output(event);
                    <...>

我见过类似的 question 但我不确定如何修复它


主管道代码(如果需要)

    public static PipelineResult run(Options options) {

        Pipeline pipeline = Pipeline.create(options);

        /*
         * Step 1: Read from PubSub
         */
        PCollection<PubsubMessage> messages = null;
        if (options.getUseSubscription()) {
            messages = pipeline.apply("ReadPubSubSubscription", PubsubIO.readMessagesWithAttributes()
                    .fromSubscription(options.getInputSubscription()).withIdAttribute("messageId"));
        } else {
            messages = pipeline.apply("ReadPubSubTopic", PubsubIO.readMessagesWithAttributes()
                    .fromTopic(options.getInputTopic()).withIdAttribute("messageId"));
        }


        /*
         * Step 2: Transform PubSubMessage to Event
         */
        PCollectionTuple eventCollections = transform1.process(messages);

PubSub 消息:

{ "evId":"id", "payload":"payload" }

我也试过:

"{ "evId":"id", "payload":"payload" }"

这是我在 pubsub 中发布消息以测试管道的方式:


进行更多测试后,我发布到 pubsub 的方式似乎是错误的根源,因为如果我添加为属性而不是消息正文,问题就会消失。

原因是我试图访问这里的属性:

String fieldId = c.element().getAttribute("evId");

但是当我通过 pubsub 仪表板发送消息时,我没有添加任何属性,这导致所有管道崩溃。