使用 TableProvider 在 Apache Beam 中生成一个 table 和 运行 一个 SQL 查询
Use TableProvider to generate a table and run an SQL query in Apache Beam
我想使用 Apache Beam Calcite SQL 方言和 Apache Flink 运行ner 生成一个无限制的行集合和 运行 一个 SQL 查询.根据 Apache Beam 的源代码和文档,可以使用 table 提供程序来做这样的事情:GenerateSequenceTableProvider。但我不明白如何在 Beam SQL CLI 之外使用它。我想在我的常规 Java 代码中使用它。
我正在尝试做这样的事情:
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
GenerateSequenceTableProvider tableProvider = new GenerateSequenceTableProvider();
tableProvider.createTable(Table.builder()
.name("sequence")
.schema(Schema.of(Schema.Field.of("sequence", Schema.FieldType.INT64), Schema.Field.of("event_time", Schema.FieldType.DATETIME)))
.type(tableProvider.getTableType())
.build()
);
PCollection<Row> res = PCollectionTuple.empty(pipeline).apply(SqlTransform.query("select * from sequenceSchema.sequence limit 5").withTableProvider("sequenceSchema", tableProvider));
pipeline.run().waitUntilFinish();
但是我收到 Object 'sequence' not found within 'sequenceSchema'
错误,所以我想我实际上并没有创建 table。那么如何创建 table?如果我理解正确,这些值应该由 table 提供商自动提供。
基本上,如果我想对 table 这些提供商应该(我认为?)生成的查询执行查询,如何使用 Beam SQL table 提供商?
如果您无法让 TableProviders 工作,您可以将其作为普通 PCollection
阅读,然后对结果应用 SqlTransform
。
TableProvider 接口有点难以直接使用。您 运行 遇到的问题是 GenerateSquenceTableProvider
与许多其他 TableProvider 一样,没有任何方法可以自行存储 table 元数据。所以调用它的 createTable
方法实际上是空操作!您要做的是将其包装在 InMemoryMetaStore 中,如下所示:
GenerateSequenceTableProvider tableProvider = new GenerateSequenceTableProvider();
InMemoryMetaStore metaStore = new InMemoryMetaStore();
metaStore.registerProvider(tableProvider);
metaStore.createTable(Table.builder()
.name("sequence")
.schema(Schema.of(Schema.Field.of("sequence", Schema.FieldType.INT64), Schema.Field.of("event_time", Schema.FieldType.DATETIME)))
.type(tableProvider.getTableType())
.build()
);
PCollection<Row> res = PCollectionTuple.empty(pipeline)
.apply(SqlTransform.query("select * from sequenceSchema.sequence limit 5")
.withTableProvider("sequenceSchema", metaStore));
(注意我还没有测试过这个,但我认为类似的东西应该可以工作)
正如 robertwb 指出的那样,另一种选择是避免使用 TableProvider 接口并直接使用 GenerateSequence。您只需要确保您的 PCollection 具有 schema。然后你可以用 SqlTransform 处理它,像这样:
pc.apply(SqlTransform.query("select * from PCOLLECTION limit 5"))
我想使用 Apache Beam Calcite SQL 方言和 Apache Flink 运行ner 生成一个无限制的行集合和 运行 一个 SQL 查询.根据 Apache Beam 的源代码和文档,可以使用 table 提供程序来做这样的事情:GenerateSequenceTableProvider。但我不明白如何在 Beam SQL CLI 之外使用它。我想在我的常规 Java 代码中使用它。
我正在尝试做这样的事情:
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
GenerateSequenceTableProvider tableProvider = new GenerateSequenceTableProvider();
tableProvider.createTable(Table.builder()
.name("sequence")
.schema(Schema.of(Schema.Field.of("sequence", Schema.FieldType.INT64), Schema.Field.of("event_time", Schema.FieldType.DATETIME)))
.type(tableProvider.getTableType())
.build()
);
PCollection<Row> res = PCollectionTuple.empty(pipeline).apply(SqlTransform.query("select * from sequenceSchema.sequence limit 5").withTableProvider("sequenceSchema", tableProvider));
pipeline.run().waitUntilFinish();
但是我收到 Object 'sequence' not found within 'sequenceSchema'
错误,所以我想我实际上并没有创建 table。那么如何创建 table?如果我理解正确,这些值应该由 table 提供商自动提供。
基本上,如果我想对 table 这些提供商应该(我认为?)生成的查询执行查询,如何使用 Beam SQL table 提供商?
如果您无法让 TableProviders 工作,您可以将其作为普通 PCollection
阅读,然后对结果应用 SqlTransform
。
TableProvider 接口有点难以直接使用。您 运行 遇到的问题是 GenerateSquenceTableProvider
与许多其他 TableProvider 一样,没有任何方法可以自行存储 table 元数据。所以调用它的 createTable
方法实际上是空操作!您要做的是将其包装在 InMemoryMetaStore 中,如下所示:
GenerateSequenceTableProvider tableProvider = new GenerateSequenceTableProvider();
InMemoryMetaStore metaStore = new InMemoryMetaStore();
metaStore.registerProvider(tableProvider);
metaStore.createTable(Table.builder()
.name("sequence")
.schema(Schema.of(Schema.Field.of("sequence", Schema.FieldType.INT64), Schema.Field.of("event_time", Schema.FieldType.DATETIME)))
.type(tableProvider.getTableType())
.build()
);
PCollection<Row> res = PCollectionTuple.empty(pipeline)
.apply(SqlTransform.query("select * from sequenceSchema.sequence limit 5")
.withTableProvider("sequenceSchema", metaStore));
(注意我还没有测试过这个,但我认为类似的东西应该可以工作)
正如 robertwb 指出的那样,另一种选择是避免使用 TableProvider 接口并直接使用 GenerateSequence。您只需要确保您的 PCollection 具有 schema。然后你可以用 SqlTransform 处理它,像这样:
pc.apply(SqlTransform.query("select * from PCOLLECTION limit 5"))