java.lang.IllegalArgumentException: 以未知视图调用 sideInput()

java.lang.IllegalArgumentException: calling sideInput() with unknown view

我试图将数据从一个 table 移动到另一个 table。使用 SideInput 在转换数据时过滤记录。 SideInput 也是 KV 集合类型,它从另一个 table 加载数据。

当 运行 我的管道出现 "java.lang.IllegalArgumentException: calling sideInput() with unknown view" 错误时。

这是我试过的完整代码:

{
PipelineOptionsFactory.register(OptionPipeline.class);

OptionPipeline options = PipelineOptionsFactory.fromArgs(args).withValidation().as(OptionPipeline.class);

Pipeline p = Pipeline.create(options);

PCollection<TableRow> sideInputData = p.apply("ReadSideInput",BigQueryIO.readTableRows().from(options.getOrgRegionMapping()));
PCollection<KV<String,String>> sideInputMap = sideInputData.apply(ParDo.of(new getSideInputDataFn()));
final PCollectionView<Map<String,String>> sideInputView = sideInputMap.apply(View.<String,String>asMap());



PCollection<TableRow> orgMaster = p.apply("ReadOrganization",BigQueryIO.readTableRows().from(options.getOrgCodeMaster()));
PCollection<TableRow> orgCode = orgMaster.apply(ParDo.of(new gnGetOrgMaster()));


@SuppressWarnings("serial")
PCollection<TableRow> finalResultCollection =  orgCode.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>() 
{
      @ProcessElement
      public void processElement(ProcessContext c) {

          TableRow outputRow = new TableRow();

          TableRow orgCodeRow = c.element();
          String orgCodefromMaster = (String) orgCodeRow.get("orgCode");

          String region = c.sideInput(sideInputView).get(orgCodefromMaster);

          outputRow.set("orgCode", orgCodefromMaster);
          outputRow.set("orgName", orgCodeRow.get("orgName"));
          outputRow.set("orgName", region);
          DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
          Date dateobj = new Date();
          outputRow.set("updatedDate",df.format(dateobj));

          c.output(outputRow);
      }
}));


finalResultCollection.apply(BigQueryIO.writeTableRows()
                     .withSchema(schema)
                     .to(options.getOrgCodeTable())
                     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

p.run().waitUntilFinish();
}
@SuppressWarnings("serial")
static class getSideInputDataFn extends DoFn<TableRow,KV<String, String>>
{
    @ProcessElement
    public void processElement(ProcessContext c)
    {
        TableRow row = c.element();
        c.output(KV.of((String) row.get("orgcode"), (String) row.get("region")));
    }
}

看起来跑步者在抱怨,因为你在定义图形时从未告诉它关于侧输入的信息。在这种情况下,您在 ParDo.of 调用之后调用 .withSideInputs,传递对您之前定义的 PCollectionView<T> 的引用。

@SuppressWarnings("serial")
PCollection<TableRow> finalResultCollection =  orgCode.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>()
{
    @ProcessElement
    public void processElement(ProcessContext c) {

        TableRow outputRow = new TableRow();

        TableRow orgCodeRow = c.element();
        String orgCodefromMaster = (String) orgCodeRow.get("orgCode");

        String region = c.sideInput(sideInputView).get(orgCodefromMaster);

        outputRow.set("orgCode", orgCodefromMaster);
        outputRow.set("orgName", orgCodeRow.get("orgName"));
        outputRow.set("orgName", region);
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
        Date dateobj = new Date();
        outputRow.set("updatedDate",df.format(dateobj));

        c.output(outputRow);
    }
}).withSideInputs(sideInputView));

我没有测试这段代码,但当我查看它时,它是突出的。