如果 GCloud Dataflow 在作业 运行 期间被删除,则重新创建 BigQuery table
GCloud Dataflow recreate BigQuery table if it gets deleted during job run
我已经设置了一个 GCloud 数据流管道,它使用来自 Pub/Sub 订阅的消息,将它们转换为 table 行并将这些行写入相应的 BigQuery table。
Table 目的地是根据Pub/Sub 消息的内容决定的,偶尔会导致 table 还不存在,必须先创建的情况。为此,我使用 create disposition CREATE_IF_NEEDED
,效果很好。
但是,我注意到如果我在 BigQuery 中手动删除新创建的 table 而 Dataflow 作业仍在 运行ning,Dataflow 将卡住并且不会重新创建 table.相反,我得到一个错误:
Operation ongoing in step write-rows-to-bigquery/StreamingInserts/StreamingWriteTables/StreamingWrite for at least 05m00s without outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at
java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429) at
java.util.concurrent.FutureTask.get(FutureTask.java:191) at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:816) at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:881) at
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:143) at
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:115) at
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn$DoFnInvoker.invokeFinishBundle(Unknown Source)
如果我返回 BigQuery 并手动重新创建此 table,Dataflow 作业将继续工作。
但是,我想知道是否有一种方法可以指示 Dataflow 管道 重新创建 table 如果它在作业期间被删除 运行 ?
这在当前的 BigqueryIO
连接器中是不可能的。从存在的连接器 here 的 github link 中,您将观察到对于 StreamingWriteFn
这是您的代码,table 创建过程在 getOrCreateTable
,这在 finishBundle
中调用。有一个 createdTables
的映射被维护并且在 finishBundle
中创建 table 如果它不存在,一旦它存在并存储在 hashmap 中就不会重新创建如下图:-
public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
throws IOException {
TableReference tableReference = parseTableSpec(tableSpec);
if (!createdTables.contains(tableSpec)) {
synchronized (createdTables) {
// Another thread may have succeeded in creating the table in the meanwhile, so
// check again. This check isn't needed for correctness, but we add it to prevent
// every thread from attempting a create and overwhelming our BigQuery quota.
if (!createdTables.contains(tableSpec)) {
TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
Bigquery client = Transport.newBigQueryClient(options).build();
BigQueryTableInserter inserter = new BigQueryTableInserter(client);
inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND,
CreateDisposition.CREATE_IF_NEEDED, tableSchema);
createdTables.add(tableSpec);
}
}
}
return tableReference;
}
为了满足您的要求,您可能必须维护自己的 BigqueryIO,其中您不执行此特定检查
if (!createdTables.contains(tableSpec)) {
但更重要的问题是为什么 table 会在生产系统中自行删除?应该解决此问题,而不是尝试从 Dataflow 重新创建 table。
我已经设置了一个 GCloud 数据流管道,它使用来自 Pub/Sub 订阅的消息,将它们转换为 table 行并将这些行写入相应的 BigQuery table。
Table 目的地是根据Pub/Sub 消息的内容决定的,偶尔会导致 table 还不存在,必须先创建的情况。为此,我使用 create disposition CREATE_IF_NEEDED
,效果很好。
但是,我注意到如果我在 BigQuery 中手动删除新创建的 table 而 Dataflow 作业仍在 运行ning,Dataflow 将卡住并且不会重新创建 table.相反,我得到一个错误:
Operation ongoing in step write-rows-to-bigquery/StreamingInserts/StreamingWriteTables/StreamingWrite for at least 05m00s without outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at
java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429) at
java.util.concurrent.FutureTask.get(FutureTask.java:191) at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:816) at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:881) at
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:143) at
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:115) at
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn$DoFnInvoker.invokeFinishBundle(Unknown Source)
如果我返回 BigQuery 并手动重新创建此 table,Dataflow 作业将继续工作。
但是,我想知道是否有一种方法可以指示 Dataflow 管道 重新创建 table 如果它在作业期间被删除 运行 ?
这在当前的 BigqueryIO
连接器中是不可能的。从存在的连接器 here 的 github link 中,您将观察到对于 StreamingWriteFn
这是您的代码,table 创建过程在 getOrCreateTable
,这在 finishBundle
中调用。有一个 createdTables
的映射被维护并且在 finishBundle
中创建 table 如果它不存在,一旦它存在并存储在 hashmap 中就不会重新创建如下图:-
public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
throws IOException {
TableReference tableReference = parseTableSpec(tableSpec);
if (!createdTables.contains(tableSpec)) {
synchronized (createdTables) {
// Another thread may have succeeded in creating the table in the meanwhile, so
// check again. This check isn't needed for correctness, but we add it to prevent
// every thread from attempting a create and overwhelming our BigQuery quota.
if (!createdTables.contains(tableSpec)) {
TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
Bigquery client = Transport.newBigQueryClient(options).build();
BigQueryTableInserter inserter = new BigQueryTableInserter(client);
inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND,
CreateDisposition.CREATE_IF_NEEDED, tableSchema);
createdTables.add(tableSpec);
}
}
}
return tableReference;
}
为了满足您的要求,您可能必须维护自己的 BigqueryIO,其中您不执行此特定检查
if (!createdTables.contains(tableSpec)) {
但更重要的问题是为什么 table 会在生产系统中自行删除?应该解决此问题,而不是尝试从 Dataflow 重新创建 table。