从 Pubsub 每隔 X 条消息写入 Cloud Storage
Write to Cloud Storage every X messages from Pubsub
我是 Cloud Dataflow / Apache Beam 的新手,所以 concept/programming 对我来说还是很模糊。
我想做的是Dataflow监听Pubsub,在JSON:
中获取这种格式的消息
{
"productId": "...",
"productName": "..."
}
并将其转换为:
{
"productId": "...",
"productName": "...",
"sku": "...",
"inventory": {
"revenue": <some Double>,
"stocks": <some Integer>
}
}
所以需要的步骤是:
(IngestFromPubsub) 通过监听一个topic从Pubsub获取记录(1条Pubsub消息=1条记录)
(EnrichDataFromAPI)
一个。将负载的 JSON 字符串反序列化为 Java 对象
b。通过调用外部 API,使用 sku
,我可以通过添加 inventory
属性来丰富每条记录的数据。
c。再次序列化记录。
(WriteToGCS) 然后每x
条(可以参数化)记录,我需要把这些写到云存储里。
还请考虑 x=1
的简单情况。
(x=1
,好主意吗?怕云盘写入太多)
即使我是 Python 人,我在 Python 中已经很难做到这一点,更重要的是我需要在 Java 中写作。阅读 Java 中 Beam 的示例让我感到头疼,它太冗长且难以理解。我所了解的是,每一步都是 PCollection 的 .apply
。
到目前为止,这是我微不足道的努力的结果:
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
// I don't really understand the next part, I just copied from official documentation and filled in some values
.apply(Window.<String>into(FixedWindows.of(Duration.millis(5000)))
.withAllowedLateness(Duration.millis(5000))
.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1000)))
.discardingFiredPanes()
)
.apply("EnrichDataFromAPI", ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.element();
// help on this part, I heard I need to use Jackson but I don't know, for API HttpClient is sufficient
// ... deserialize, call API, serialize again ...
c.output(enrichedJSONString);
}
}
))
.apply("WriteToGCS",
TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()))
;
PipelineResult result = pipeline.run();
}
请填写缺失的部分,并给我一些关于窗口的提示(例如什么是合适的配置等)以及我应该在哪些步骤中 insert/apply 它。
我认为您的 IngestFromPubsub
和 EnrichDataFromAPI
中不需要任何窗口。开窗的目的是将时间附近的记录分组到 windows 中,以便您可以计算它们的聚合计算。但是由于您没有进行任何聚合计算,并且有兴趣独立处理每条记录,因此您不需要 windows.
由于您总是将一个输入记录转换为一个输出记录,因此您的 EnrichDataFromAPI
应该是 MapElements
。这应该会使代码更简单。
在 Apache Bean Java 中有用于处理 JSON 的资源:
您不一定需要使用 Jackson 将 JSON 映射到 Java 对象。您也许可以直接操作 JSON。您可以使用 Java 的 native JSON API 到 parse/manipulate/serialize.
我是 Cloud Dataflow / Apache Beam 的新手,所以 concept/programming 对我来说还是很模糊。
我想做的是Dataflow监听Pubsub,在JSON:
中获取这种格式的消息{
"productId": "...",
"productName": "..."
}
并将其转换为:
{
"productId": "...",
"productName": "...",
"sku": "...",
"inventory": {
"revenue": <some Double>,
"stocks": <some Integer>
}
}
所以需要的步骤是:
(IngestFromPubsub) 通过监听一个topic从Pubsub获取记录(1条Pubsub消息=1条记录)
(EnrichDataFromAPI)
一个。将负载的 JSON 字符串反序列化为 Java 对象
b。通过调用外部 API,使用
sku
,我可以通过添加inventory
属性来丰富每条记录的数据。c。再次序列化记录。
(WriteToGCS) 然后每
x
条(可以参数化)记录,我需要把这些写到云存储里。 还请考虑x=1
的简单情况。 (x=1
,好主意吗?怕云盘写入太多)
即使我是 Python 人,我在 Python 中已经很难做到这一点,更重要的是我需要在 Java 中写作。阅读 Java 中 Beam 的示例让我感到头疼,它太冗长且难以理解。我所了解的是,每一步都是 PCollection 的 .apply
。
到目前为止,这是我微不足道的努力的结果:
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
// I don't really understand the next part, I just copied from official documentation and filled in some values
.apply(Window.<String>into(FixedWindows.of(Duration.millis(5000)))
.withAllowedLateness(Duration.millis(5000))
.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1000)))
.discardingFiredPanes()
)
.apply("EnrichDataFromAPI", ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.element();
// help on this part, I heard I need to use Jackson but I don't know, for API HttpClient is sufficient
// ... deserialize, call API, serialize again ...
c.output(enrichedJSONString);
}
}
))
.apply("WriteToGCS",
TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()))
;
PipelineResult result = pipeline.run();
}
请填写缺失的部分,并给我一些关于窗口的提示(例如什么是合适的配置等)以及我应该在哪些步骤中 insert/apply 它。
我认为您的
IngestFromPubsub
和EnrichDataFromAPI
中不需要任何窗口。开窗的目的是将时间附近的记录分组到 windows 中,以便您可以计算它们的聚合计算。但是由于您没有进行任何聚合计算,并且有兴趣独立处理每条记录,因此您不需要 windows.由于您总是将一个输入记录转换为一个输出记录,因此您的
EnrichDataFromAPI
应该是MapElements
。这应该会使代码更简单。在 Apache Bean Java 中有用于处理 JSON 的资源:
您不一定需要使用 Jackson 将 JSON 映射到 Java 对象。您也许可以直接操作 JSON。您可以使用 Java 的 native JSON API 到 parse/manipulate/serialize.