从数据流管道中间读取 Bigtable 数据

Reading Bigtable data from the middle of a Dataflow pipeline

我有一个管道从 pub sub 接收一些数据,进行一些处理,并且需要根据该处理的结果处理 Bigtable 上的所有数据。

例如,我有一个 pub sub 消息,如:{clientId: 10},所以我需要从 Bigtable 中读取 clientId 10 的所有数据(我知道如何根据 clientId 创建扫描)。问题是我们目前对 Bigtable 的两次读取(BigtableIO 和 CloudBigtableIO)都是基于管道以 bigtable 开头的事实,所以我不能(或找不到方法)在中间使用它们管道。我怎样才能实现这种情况?

简单的伪类代码:

Pipeline p = Pipeline.create(...)
p.apply(PubsubIO.readMessagesWithAttributes ...)
.apply( PubsubMessageToScans()) // I know how to do this
.apply( ReadBigTable()) // How to do this?

更新:

我最近在玩 Bigtable 和 Dataflow,遇到了您在此处描述的相同问题。我认为没有办法在管道中间执行 Read.from(CloudBigtableIO.read(config),因此您必须创建自己的 DoFn。您可以扩展 AbstractCloudBigtableTableDoFn and access the easily reusable and configurable Bigtable connection through getConnection()。这是我放在一起的示例 Dataflow/Beam 作业,展示了如何执行此操作:

public class ReadInMiddleOfPipeline {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);

    Pipeline p = Pipeline.create(options);
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(GenerateSequence.from(0).to(10).withRate(1, new Duration(1000)))
        .apply(ParDo.of(new ReadFromTableFn(bigtableTableConfig)));

    p.run().waitUntilFinish();
  }

  static class ReadFromTableFn extends AbstractCloudBigtableTableDoFn<Long, Void> {
    public ReadFromTableFn(CloudBigtableConfiguration config) {
      super(config);
    }

    @ProcessElement
    public void processElement(@Element Long input, OutputReceiver<Void> out, PipelineOptions po) {
      BigtableOptions options = po.as(BigtableOptions.class);
      try {
        Table table = getConnection().getTable(TableName.valueOf(options.getBigtableTableId()));
        Scan scan = new Scan().setRowPrefixFilter(Bytes.toBytes("#phone"));
        ResultScanner rows = table.getScanner(scan);

        for (Result row : rows) {
          System.out.printf(
              "Reading data for %s%n", Bytes.toString(row.rawCells()[0].getRowArray()));
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("bigtable-table")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

为了补充@Billy 的回答,您还可以尝试在 ParDo 转换中使用 BigtableDataClient class。 数据输入将是PubsubMessage中包含的参数来配置Scan对象,然后在ParDo中设置Scan参数,连接到BigTable并获得过滤结果。

这段代码可能有用:

    @ProcessElement
    public void processElement(@Element String element, OutputReceiver<String> out){

        String projectId = "<PROJECT_ID>";
        String instanceId = "<INSTANCE_ID>";
        String tableName = "<TABLENAME>";


        String[] scanParameters = element.split(",");

        try (Connection connection = BigtableConfiguration.connect(projectId, instanceId)){

            Table table = connection.getTable(TableName.valueOf(tableName));

            Scan scan = new Scan();
            scan.withStartRow(Bytes.toBytes(scanParameters[0]));
            scan.withStopRow(Bytes.toBytes(scanParameters[1]));

            ResultScanner scanner = table.getScanner(scan);

            for (Result row : scanner) {
                System.out.println(row);
            }

            catch (Exception e){
                e.printStackTrace();
            }

            out.output("");
        }

我没有直接使用 PubsubMessage 对其进行测试,但是,您可以进行另一次转换以调整消息或直接获取 PubsubMessage 并设置 Scan 对象。