使用 Cloud Dataflow 从 PubSub 将数据流式传输到 Google Cloud Storage

Streaming data to Google Cloud Storage from PubSub using Cloud Dataflow

我正在使用数据流中的流数据收听来自 pub-sub 的数据。 然后我需要上传到存储,处理数据并上传到bigquery。

这是我的代码:

public class BotPipline {

public static void main(String[] args) {

    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
    options.setRunner(BlockingDataflowPipelineRunner.class);
    options.setProject(MY_PROJECT);
    options.setStagingLocation(MY_STAGING_LOCATION);
    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> input = pipeline.apply(PubsubIO.Read.maxNumRecords(1).subscription(MY_SUBSCRIBTION));

    input.apply(TextIO.Write.to(MY_STORAGE_LOCATION));

    input
    .apply(someDataProcessing(...)).named("update json"))
    .apply(convertToTableRow(...)).named("convert json to table row"))
            .apply(BigQueryIO.Write.to(MY_BQ_TABLE).withSchema(tableSchema)
    );
    pipeline.run();
}

}

当我 运行 注释代码写入存储时,代码运行良好。 但是当我尝试上传到大查询时我得到这个错误(这是预期的..):

Write can only be applied to a Bounded PCollection

我没有使用 bound,因为我一直需要 运行 这个,而且我需要立即上传数据。 任何解决方案?

编辑: 这是我想要的行为:

我正在通过 pubsub 接收消息。 每条消息都应作为粗略数据存储在 GCS 中自己的文件中, 对数据执行一些处理,然后将其保存到大查询 - 在数据中具有文件名。

数据在BQ中收到后应该立即可见 示例:

data published to pubsub : {a:1, b:2} 
data saved to GCS file UUID: A1F432 
data processing :  {a:1, b:2} -> 
                   {a:11, b: 22} -> 
                   {fileName: A1F432, data: {a:11, b: 22}} 
data in BQ : {fileName: A1F432, data: {a:11, b: 22}} 

想法是处理后的数据存储在 BQ 中,与存储在 GCS

中的粗略数据具有 link

目前我们不支持在 TextIO.Write 中编写无限集合。参见

您能否阐明您希望 unbounded TextIO.Write 的行为是什么?例如。你想要一个不断增长的文件,还是每个 window 一个文件,在 window 关闭时关闭,或者其他什么,或者只对你重要的是写入的文件的总内容将最终包含所有 PubSub 消息,但文件的结构等并不重要?

作为一种解决方法,您可以将写入 GCS 作为您自己的 DoFn,使用 IOChannelFactory 与 GCS 交互(实际上,TextIO.Write 在幕后只是用户可以自己从头开始编写的复合转换)。

您可以使用 @ProcessElement 上的可选 BoundedWindow 参数访问数据的 window。如果您解释所需的行为,我将能够提供更多建议。