从 Pubsub 每隔 X 条消息写入 Cloud Storage

Write to Cloud Storage every X messages from Pubsub

我是 Cloud Dataflow / Apache Beam 的新手,所以 concept/programming 对我来说还是很模糊。

我想做的是Dataflow监听Pubsub,在JSON:

中获取这种格式的消息
{
  "productId": "...",
  "productName": "..."
}

并将其转换为:

{
  "productId": "...",
  "productName": "...",
  "sku": "...",
  "inventory": {
    "revenue": <some Double>,
    "stocks":  <some Integer>
  }
}

所以需要的步骤是:

  1. (IngestFromPubsub) 通过监听一个topic从Pubsub获取记录(1条Pubsub消息=1条记录)

  2. (EnrichDataFromAPI)

    一个。将负载的 JSON 字符串反序列化为 Java 对象

    b。通过调用外部 API,使用 sku,我可以通过添加 inventory 属性来丰富每条记录的数据。

    c。再次序列化记录。

  3. (WriteToGCS) 然后每x条(可以参数化)记录,我需要把这些写到云存储里。 还请考虑 x=1 的简单情况。 (x=1,好主意吗?怕云盘写入太多)

即使我是 Python 人,我在 Python 中已经很难做到这一点,更重要的是我需要在 Java 中写作。阅读 Java 中 Beam 的示例让我感到头疼,它太冗长且难以理解。我所了解的是,每一步都是 PCollection 的 .apply

到目前为止,这是我微不足道的努力的结果:

public static void main(String[] args) {

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
        // I don't really understand the next part, I just copied from official documentation and filled in some values
        .apply(Window.<String>into(FixedWindows.of(Duration.millis(5000)))
            .withAllowedLateness(Duration.millis(5000))
            .triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1000)))
            .discardingFiredPanes()
        )
        .apply("EnrichDataFromAPI", ParDo.of(
            new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    c.element();
                    // help on this part, I heard I need to use Jackson but I don't know, for API HttpClient is sufficient
                    // ... deserialize, call API, serialize again ...
                    c.output(enrichedJSONString);
                }
            }
        ))
        .apply("WriteToGCS", 
            TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()))
    ;


    PipelineResult result = pipeline.run();
}

请填写缺失的部分,并给我一些关于窗口的提示(例如什么是合适的配置等)以及我应该在哪些步骤中 insert/apply 它。

  • 我认为您的 IngestFromPubsubEnrichDataFromAPI 中不需要任何窗口。开窗的目的是将时间附近的记录分组到 windows 中,以便您可以计算它们的聚合计算。但是由于您没有进行任何聚合计算,并且有兴趣独立处理每条记录,因此您不需要 windows.

  • 由于您总是将一个输入记录转换为一个输出记录,因此您的 EnrichDataFromAPI 应该是 MapElements。这应该会使代码更简单。

  • 在 Apache Bean Java 中有用于处理 JSON 的资源:

  • 您不一定需要使用 Jackson 将 JSON 映射到 Java 对象。您也许可以直接操作 JSON。您可以使用 Java 的 native JSON API 到 parse/manipulate/serialize.