从 Cloud Dataflow 写入 BigQuery:无法从输入创建侧输入视图

Writing to BigQuery from Cloud Dataflow: Unable to create a side-input view from input

我正在尝试编写一个数据存储流,该流读取 pub sub 的流并写入大查询。

当尝试 运行 该工具时,我收到错误消息“无法从输入创建侧输入视图”和堆栈跟踪:

Exception in thread "main" java.lang.IllegalStateException: Unable to create a side-input view from input
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:277)
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:268)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:366)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161)
at com.google.cloud.dataflow.sdk.io.Write$Bound.createWrite(Write.java:214)
at com.google.cloud.dataflow.sdk.io.Write$Bound.apply(Write.java:79)
at com.google.cloud.dataflow.sdk.io.Write$Bound.apply(Write.java:68)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.apply(DirectPipelineRunner.java:247)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:290)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:174)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$Bound.apply(BigQueryIO.java:1738)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$Bound.apply(BigQueryIO.java:1440)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.apply(DirectPipelineRunner.java:247)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161)
at co.uk.bubblestudent.dataflow.StarterPipeline.main(StarterPipeline.java:116)
Caused by: java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
at com.google.cloud.dataflow.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:192)
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:275)
... 20 more

我的代码是:

public class StarterPipeline {


public static final Duration ONE_DAY = Duration.standardDays(1);
public static final Duration ONE_HOUR = Duration.standardHours(1);
public static final Duration TEN_SECONDS = Duration.standardSeconds(10);
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

  private static TableSchema schemaGen() {
    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("facebookID").setType("STRING"));
    fields.add(new TableFieldSchema().setName("propertyID").setType("STRING"));
    fields.add(new TableFieldSchema().setName("time").setType("TIMESTAMP"));
    TableSchema schema = new TableSchema().setFields(fields);
    return schema;
  }

  public static void main(String[] args) {
  LOG.info("Starting");
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
  LOG.info("Pipeline made");
  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowPipelineRunner or BlockingDataflowPipelineRunner.
  options.setProject(<project>);
  options.setStagingLocation(<bucket>);
  options.setTempLocation(<bucket>);
  Pipeline p = Pipeline.create(options);


  TableSchema schema = schemaGen();
  LOG.info("Schema made");
  try {
    LOG.info(schema.toPrettyString());
} catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}
  PCollection<String> input = p.apply(PubsubIO.Read.named("ReadFromPubsub").subscription(<subscription>));

  PCollection<TableRow> pardo = input.apply(ParDo.of(new FormatAsTableRowFn()));
  LOG.info("Formatted Row");

  pardo.apply(BigQueryIO.Write.named("Write into BigQuery").to(<table>)
       .withSchema(schema)
       .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
       .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
  LOG.info("about to run");
  p.run();

  }


  static class FormatAsTableRowFn extends DoFn<String, TableRow> {
    @Override
    public void processElement(ProcessContext c) {
        LOG.info("Formatting");
         String json = c.element();

         //HashMap<String,String> items = new Gson().fromJson(json, new TypeToken<HashMap<String, String>>(){}.getType());

        // Make a BigQuery row from the JSON object:
        TableRow row = new TableRow()
            .set("facebookID","324234")
            .set("properttyID", "23423")
            .set("time", "12312313123");


       /*
        *     TableRow row = new TableRow()
            .set("facebookID", items.get("facbookID"))
            .set("properttyID", items.get("propertyID"))
            .set("time", items.get("time"));
        */
        c.output(row);
    }
  }
}

有什么建议吗?

BigQueryIO 的默认实现仅适用于有界 PCollection,PubsubIO.Read 生成无界 PCollection。

有两种方法可以解决此问题:您可以通过在 PubsubIO 转换上调用 maxReadTime 或 maxNumElements 来绑定输入,或者您可以通过在选项上调用 setStreaming(true) 来使用 BigQueryIO 的流式插入类型。