从数据流管道中间读取 Bigtable 数据
Reading Bigtable data from the middle of a Dataflow pipeline
我有一个管道从 pub sub 接收一些数据,进行一些处理,并且需要根据该处理的结果处理 Bigtable 上的所有数据。
例如,我有一个 pub sub 消息,如:{clientId: 10}
,所以我需要从 Bigtable 中读取 clientId 10 的所有数据(我知道如何根据 clientId 创建扫描)。问题是我们目前对 Bigtable 的两次读取(BigtableIO 和 CloudBigtableIO)都是基于管道以 bigtable 开头的事实,所以我不能(或找不到方法)在中间使用它们管道。我怎样才能实现这种情况?
简单的伪类代码:
Pipeline p = Pipeline.create(...)
p.apply(PubsubIO.readMessagesWithAttributes ...)
.apply( PubsubMessageToScans()) // I know how to do this
.apply( ReadBigTable()) // How to do this?
更新:
我最近在玩 Bigtable 和 Dataflow,遇到了您在此处描述的相同问题。我认为没有办法在管道中间执行 Read.from(CloudBigtableIO.read(config)
,因此您必须创建自己的 DoFn。您可以扩展 AbstractCloudBigtableTableDoFn and access the easily reusable and configurable Bigtable connection through getConnection()
。这是我放在一起的示例 Dataflow/Beam 作业,展示了如何执行此操作:
public class ReadInMiddleOfPipeline {
public static void main(String[] args) {
BigtableOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);
CloudBigtableTableConfiguration bigtableTableConfig =
new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.build();
p.apply(GenerateSequence.from(0).to(10).withRate(1, new Duration(1000)))
.apply(ParDo.of(new ReadFromTableFn(bigtableTableConfig)));
p.run().waitUntilFinish();
}
static class ReadFromTableFn extends AbstractCloudBigtableTableDoFn<Long, Void> {
public ReadFromTableFn(CloudBigtableConfiguration config) {
super(config);
}
@ProcessElement
public void processElement(@Element Long input, OutputReceiver<Void> out, PipelineOptions po) {
BigtableOptions options = po.as(BigtableOptions.class);
try {
Table table = getConnection().getTable(TableName.valueOf(options.getBigtableTableId()));
Scan scan = new Scan().setRowPrefixFilter(Bytes.toBytes("#phone"));
ResultScanner rows = table.getScanner(scan);
for (Result row : rows) {
System.out.printf(
"Reading data for %s%n", Bytes.toString(row.rawCells()[0].getRowArray()));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public interface BigtableOptions extends DataflowPipelineOptions {
@Description("The Bigtable project ID, this can be different than your Dataflow project")
@Default.String("bigtable-project")
String getBigtableProjectId();
void setBigtableProjectId(String bigtableProjectId);
@Description("The Bigtable instance ID")
@Default.String("bigtable-instance")
String getBigtableInstanceId();
void setBigtableInstanceId(String bigtableInstanceId);
@Description("The Bigtable table ID in the instance.")
@Default.String("bigtable-table")
String getBigtableTableId();
void setBigtableTableId(String bigtableTableId);
}
}
为了补充@Billy 的回答,您还可以尝试在 ParDo 转换中使用 BigtableDataClient class。
数据输入将是PubsubMessage中包含的参数来配置Scan对象,然后在ParDo中设置Scan参数,连接到BigTable并获得过滤结果。
这段代码可能有用:
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> out){
String projectId = "<PROJECT_ID>";
String instanceId = "<INSTANCE_ID>";
String tableName = "<TABLENAME>";
String[] scanParameters = element.split(",");
try (Connection connection = BigtableConfiguration.connect(projectId, instanceId)){
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(scanParameters[0]));
scan.withStopRow(Bytes.toBytes(scanParameters[1]));
ResultScanner scanner = table.getScanner(scan);
for (Result row : scanner) {
System.out.println(row);
}
catch (Exception e){
e.printStackTrace();
}
out.output("");
}
我没有直接使用 PubsubMessage 对其进行测试,但是,您可以进行另一次转换以调整消息或直接获取 PubsubMessage 并设置 Scan 对象。
我有一个管道从 pub sub 接收一些数据,进行一些处理,并且需要根据该处理的结果处理 Bigtable 上的所有数据。
例如,我有一个 pub sub 消息,如:{clientId: 10}
,所以我需要从 Bigtable 中读取 clientId 10 的所有数据(我知道如何根据 clientId 创建扫描)。问题是我们目前对 Bigtable 的两次读取(BigtableIO 和 CloudBigtableIO)都是基于管道以 bigtable 开头的事实,所以我不能(或找不到方法)在中间使用它们管道。我怎样才能实现这种情况?
简单的伪类代码:
Pipeline p = Pipeline.create(...)
p.apply(PubsubIO.readMessagesWithAttributes ...)
.apply( PubsubMessageToScans()) // I know how to do this
.apply( ReadBigTable()) // How to do this?
更新:
我最近在玩 Bigtable 和 Dataflow,遇到了您在此处描述的相同问题。我认为没有办法在管道中间执行 Read.from(CloudBigtableIO.read(config)
,因此您必须创建自己的 DoFn。您可以扩展 AbstractCloudBigtableTableDoFn and access the easily reusable and configurable Bigtable connection through getConnection()
。这是我放在一起的示例 Dataflow/Beam 作业,展示了如何执行此操作:
public class ReadInMiddleOfPipeline {
public static void main(String[] args) {
BigtableOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);
CloudBigtableTableConfiguration bigtableTableConfig =
new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.build();
p.apply(GenerateSequence.from(0).to(10).withRate(1, new Duration(1000)))
.apply(ParDo.of(new ReadFromTableFn(bigtableTableConfig)));
p.run().waitUntilFinish();
}
static class ReadFromTableFn extends AbstractCloudBigtableTableDoFn<Long, Void> {
public ReadFromTableFn(CloudBigtableConfiguration config) {
super(config);
}
@ProcessElement
public void processElement(@Element Long input, OutputReceiver<Void> out, PipelineOptions po) {
BigtableOptions options = po.as(BigtableOptions.class);
try {
Table table = getConnection().getTable(TableName.valueOf(options.getBigtableTableId()));
Scan scan = new Scan().setRowPrefixFilter(Bytes.toBytes("#phone"));
ResultScanner rows = table.getScanner(scan);
for (Result row : rows) {
System.out.printf(
"Reading data for %s%n", Bytes.toString(row.rawCells()[0].getRowArray()));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public interface BigtableOptions extends DataflowPipelineOptions {
@Description("The Bigtable project ID, this can be different than your Dataflow project")
@Default.String("bigtable-project")
String getBigtableProjectId();
void setBigtableProjectId(String bigtableProjectId);
@Description("The Bigtable instance ID")
@Default.String("bigtable-instance")
String getBigtableInstanceId();
void setBigtableInstanceId(String bigtableInstanceId);
@Description("The Bigtable table ID in the instance.")
@Default.String("bigtable-table")
String getBigtableTableId();
void setBigtableTableId(String bigtableTableId);
}
}
为了补充@Billy 的回答,您还可以尝试在 ParDo 转换中使用 BigtableDataClient class。 数据输入将是PubsubMessage中包含的参数来配置Scan对象,然后在ParDo中设置Scan参数,连接到BigTable并获得过滤结果。
这段代码可能有用:
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> out){
String projectId = "<PROJECT_ID>";
String instanceId = "<INSTANCE_ID>";
String tableName = "<TABLENAME>";
String[] scanParameters = element.split(",");
try (Connection connection = BigtableConfiguration.connect(projectId, instanceId)){
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(scanParameters[0]));
scan.withStopRow(Bytes.toBytes(scanParameters[1]));
ResultScanner scanner = table.getScanner(scan);
for (Result row : scanner) {
System.out.println(row);
}
catch (Exception e){
e.printStackTrace();
}
out.output("");
}
我没有直接使用 PubsubMessage 对其进行测试,但是,您可以进行另一次转换以调整消息或直接获取 PubsubMessage 并设置 Scan 对象。