Apache Beam 似乎正在截断 pub sub 消息有效负载

Apache beam seems to be truncating pub sub message payload

我们已经为 pub sub 事件处理创建了一个非常简单的管道。 pub sub 消息负载本身是制表符分隔的 csv 数据。

读取消息后,负载数据在膨胀回事件对象时被截断。在本地使用直接运行器和 运行 管道端到端工作。

仅当 运行 在 Google 云数据流运行程序中我们看到此消息数据被截断时?

        // Create the pipeline
        Pipeline pipeline = Pipeline.create(options);

        LOG.info("Reading from subscription: " + options.getInputSubscription());

        //Step #1: Read from a PubSub subscription.
        PCollection<PubsubMessage> pubsubMessages = pipeline.apply(
            "ReadPubSubSubscription",
            PubsubIO.readMessagesWithMessageId()
                    .fromSubscription(options.getInputSubscription())
        );


        //Step #2: Transform the PubsubMessages into snowplow events.
        PCollection<Event> rawEvents = pubsubMessages.apply(
            "ConvertMessageToEvent",
            ParDo.of(new PubsubMessageEventFn())
        );

        // other pipeline functions.....

这里是转换函数,其中每个 pub sub 消息都属于错误情况。请注意 Event.parse() 实际上是一个 scala 库,但我看不出这会如何影响它,因为消息数据本身是在管道的两个阶段之间被截断的。

可能存在编码问题?


    public static class PubsubMessageEventFn extends DoFn<PubsubMessage, Event> {
        @ProcessElement
        public void processElement(ProcessContext context) {
            PubsubMessage message = context.element();
            Validated<ParsingError, Event> event = Event.parse(new String(message.getPayload()));
            Either<ParsingError, Event> condition = event.toEither();
            if (condition.isLeft()) {
                ParsingError err = condition.left().get();
                LOG.error("Event parsing error: " + err.toString() + " for message: " + new String(message.getPayload()));
            } else {
                Event e = condition.right().get();
                context.output(e);
            }
        }
    }

这是日志消息中发出的数据示例:

Event parsing error: FieldNumberMismatch(5) for message: 4f6ec25-67a7-4edf-972a-29e80320f67f web 2020-04-14 21:26:40.034 2020-04-14 21:26:39.884 2020-04-1

请注意,DirectRunner 的 Pub/Sub 实现不同于此处记录的 Dataflow Runner 中的实现 - https://cloud.google.com/dataflow/docs/concepts/streaming-with-cloud-pubsub#integration-features

我认为问题与编码有关,因为 message.getPayload 属于 bytes 类型,代码可能需要修改为 new String(message.getPayload(), StandardCharsets.UTF_8) 下一行

Validated<ParsingError, Event> event = Event.parse(new String(message.getPayload(), StandardCharsets.UTF_8));

使用 readMessagesWithAttributesAndMessageId 而不是 readMessagesWithMessageId 是根据此错误问题 https://issues.apache.org/jira/browse/BEAM-9483.

的解决方法

似乎还没有修复。