Apache Beam 似乎正在截断 pub sub 消息有效负载
Apache beam seems to be truncating pub sub message payload
我们已经为 pub sub 事件处理创建了一个非常简单的管道。 pub sub 消息负载本身是制表符分隔的 csv 数据。
读取消息后,负载数据在膨胀回事件对象时被截断。在本地使用直接运行器和 运行 管道端到端工作。
仅当 运行 在 Google 云数据流运行程序中我们看到此消息数据被截断时?
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
LOG.info("Reading from subscription: " + options.getInputSubscription());
//Step #1: Read from a PubSub subscription.
PCollection<PubsubMessage> pubsubMessages = pipeline.apply(
"ReadPubSubSubscription",
PubsubIO.readMessagesWithMessageId()
.fromSubscription(options.getInputSubscription())
);
//Step #2: Transform the PubsubMessages into snowplow events.
PCollection<Event> rawEvents = pubsubMessages.apply(
"ConvertMessageToEvent",
ParDo.of(new PubsubMessageEventFn())
);
// other pipeline functions.....
这里是转换函数,其中每个 pub sub 消息都属于错误情况。请注意 Event.parse() 实际上是一个 scala 库,但我看不出这会如何影响它,因为消息数据本身是在管道的两个阶段之间被截断的。
可能存在编码问题?
public static class PubsubMessageEventFn extends DoFn<PubsubMessage, Event> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
Validated<ParsingError, Event> event = Event.parse(new String(message.getPayload()));
Either<ParsingError, Event> condition = event.toEither();
if (condition.isLeft()) {
ParsingError err = condition.left().get();
LOG.error("Event parsing error: " + err.toString() + " for message: " + new String(message.getPayload()));
} else {
Event e = condition.right().get();
context.output(e);
}
}
}
这是日志消息中发出的数据示例:
Event parsing error: FieldNumberMismatch(5) for message: 4f6ec25-67a7-4edf-972a-29e80320f67f web 2020-04-14 21:26:40.034 2020-04-14 21:26:39.884 2020-04-1
请注意,DirectRunner 的 Pub/Sub 实现不同于此处记录的 Dataflow Runner 中的实现 - https://cloud.google.com/dataflow/docs/concepts/streaming-with-cloud-pubsub#integration-features。
我认为问题与编码有关,因为 message.getPayload
属于 bytes
类型,代码可能需要修改为 new String(message.getPayload(), StandardCharsets.UTF_8)
下一行
Validated<ParsingError, Event> event = Event.parse(new String(message.getPayload(), StandardCharsets.UTF_8));
使用 readMessagesWithAttributesAndMessageId 而不是 readMessagesWithMessageId 是根据此错误问题 https://issues.apache.org/jira/browse/BEAM-9483.
的解决方法
似乎还没有修复。
我们已经为 pub sub 事件处理创建了一个非常简单的管道。 pub sub 消息负载本身是制表符分隔的 csv 数据。
读取消息后,负载数据在膨胀回事件对象时被截断。在本地使用直接运行器和 运行 管道端到端工作。
仅当 运行 在 Google 云数据流运行程序中我们看到此消息数据被截断时?
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
LOG.info("Reading from subscription: " + options.getInputSubscription());
//Step #1: Read from a PubSub subscription.
PCollection<PubsubMessage> pubsubMessages = pipeline.apply(
"ReadPubSubSubscription",
PubsubIO.readMessagesWithMessageId()
.fromSubscription(options.getInputSubscription())
);
//Step #2: Transform the PubsubMessages into snowplow events.
PCollection<Event> rawEvents = pubsubMessages.apply(
"ConvertMessageToEvent",
ParDo.of(new PubsubMessageEventFn())
);
// other pipeline functions.....
这里是转换函数,其中每个 pub sub 消息都属于错误情况。请注意 Event.parse() 实际上是一个 scala 库,但我看不出这会如何影响它,因为消息数据本身是在管道的两个阶段之间被截断的。
可能存在编码问题?
public static class PubsubMessageEventFn extends DoFn<PubsubMessage, Event> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
Validated<ParsingError, Event> event = Event.parse(new String(message.getPayload()));
Either<ParsingError, Event> condition = event.toEither();
if (condition.isLeft()) {
ParsingError err = condition.left().get();
LOG.error("Event parsing error: " + err.toString() + " for message: " + new String(message.getPayload()));
} else {
Event e = condition.right().get();
context.output(e);
}
}
}
这是日志消息中发出的数据示例:
Event parsing error: FieldNumberMismatch(5) for message: 4f6ec25-67a7-4edf-972a-29e80320f67f web 2020-04-14 21:26:40.034 2020-04-14 21:26:39.884 2020-04-1
请注意,DirectRunner 的 Pub/Sub 实现不同于此处记录的 Dataflow Runner 中的实现 - https://cloud.google.com/dataflow/docs/concepts/streaming-with-cloud-pubsub#integration-features。
我认为问题与编码有关,因为 message.getPayload
属于 bytes
类型,代码可能需要修改为 new String(message.getPayload(), StandardCharsets.UTF_8)
下一行
Validated<ParsingError, Event> event = Event.parse(new String(message.getPayload(), StandardCharsets.UTF_8));
使用 readMessagesWithAttributesAndMessageId 而不是 readMessagesWithMessageId 是根据此错误问题 https://issues.apache.org/jira/browse/BEAM-9483.
的解决方法似乎还没有修复。