使用 TableProvider 在 Apache Beam 中生成一个 table 和 运行 一个 SQL 查询

Use TableProvider to generate a table and run an SQL query in Apache Beam

我想使用 Apache Beam Calcite SQL 方言和 Apache Flink 运行ner 生成一个无限制的行集合和 运行 一个 SQL 查询.根据 Apache Beam 的源代码和文档,可以使用 table 提供程序来做这样的事情:GenerateSequenceTableProvider。但我不明白如何在 Beam SQL CLI 之外使用它。我想在我的常规 Java 代码中使用它。

我正在尝试做这样的事情:

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);

GenerateSequenceTableProvider tableProvider = new GenerateSequenceTableProvider();
tableProvider.createTable(Table.builder()
        .name("sequence")
        .schema(Schema.of(Schema.Field.of("sequence", Schema.FieldType.INT64), Schema.Field.of("event_time", Schema.FieldType.DATETIME)))
        .type(tableProvider.getTableType())
        .build()
);
PCollection<Row> res = PCollectionTuple.empty(pipeline).apply(SqlTransform.query("select * from sequenceSchema.sequence limit 5").withTableProvider("sequenceSchema", tableProvider));

pipeline.run().waitUntilFinish();

但是我收到 Object 'sequence' not found within 'sequenceSchema' 错误,所以我想我实际上并没有创建 table。那么如何创建 table?如果我理解正确,这些值应该由 table 提供商自动提供。

基本上,如果我想对 table 这些提供商应该(我认为?)生成的查询执行查询,如何使用 Beam SQL table 提供商?

如果您无法让 TableProviders 工作,您可以将其作为普通 PCollection 阅读,然后对结果应用 SqlTransform

TableProvider 接口有点难以直接使用。您 运行 遇到的问题是 GenerateSquenceTableProvider 与许多其他 TableProvider 一样,没有任何方法可以自行存储 table 元数据。所以调用它的 createTable 方法实际上是空操作!您要做的是将其包装在 InMemoryMetaStore 中,如下所示:

GenerateSequenceTableProvider tableProvider = new GenerateSequenceTableProvider();
InMemoryMetaStore metaStore = new InMemoryMetaStore();

metaStore.registerProvider(tableProvider);

metaStore.createTable(Table.builder()
        .name("sequence")
        .schema(Schema.of(Schema.Field.of("sequence", Schema.FieldType.INT64), Schema.Field.of("event_time", Schema.FieldType.DATETIME)))
        .type(tableProvider.getTableType())
        .build()
);
PCollection<Row> res = PCollectionTuple.empty(pipeline)
  .apply(SqlTransform.query("select * from sequenceSchema.sequence limit 5")
           .withTableProvider("sequenceSchema", metaStore));

(注意我还没有测试过这个,但我认为类似的东西应该可以工作)

正如 robertwb 指出的那样,另一种选择是避免使用 TableProvider 接口并直接使用 GenerateSequence。您只需要确保您的 PCollection 具有 schema。然后你可以用 SqlTransform 处理它,像这样:

pc.apply(SqlTransform.query("select * from PCOLLECTION limit 5"))