如果 GCloud Dataflow 在作业 运行 期间被删除,则重新创建 BigQuery table

GCloud Dataflow recreate BigQuery table if it gets deleted during job run

我已经设置了一个 GCloud 数据流管道,它使用来自 Pub/Sub 订阅的消息,将它们转换为 table 行并将这些行写入相应的 BigQuery table。

Table 目的地是根据Pub/Sub 消息的内容决定的,偶尔会导致 table 还不存在,必须先创建的情况。为此,我使用 create disposition CREATE_IF_NEEDED,效果很好。

但是,我注意到如果我在 BigQuery 中手动删除新创建的 table 而 Dataflow 作业仍在 运行ning,Dataflow 将卡住并且不会重新创建 table.相反,我得到一个错误:

Operation ongoing in step write-rows-to-bigquery/StreamingInserts/StreamingWriteTables/StreamingWrite for at least 05m00s without outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at
    java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at
    java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429) at
    java.util.concurrent.FutureTask.get(FutureTask.java:191) at
    org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:816) at
    org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:881) at
    org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:143) at
    org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:115) at
    org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn$DoFnInvoker.invokeFinishBundle(Unknown Source)

如果我返回 BigQuery 并手动重新创建此 table,Dataflow 作业将继续工作。

但是,我想知道是否有一种方法可以指示 Dataflow 管道 重新创建 table 如果它在作业期间被删除 运行

这在当前的 BigqueryIO 连接器中是不可能的。从存在的连接器 here 的 github link 中,您将观察到对于 StreamingWriteFn 这是您的代码,table 创建过程在 getOrCreateTable,这在 finishBundle 中调用。有一个 createdTables 的映射被维护并且在 finishBundle 中创建 table 如果它不存在,一旦它存在并存储在 hashmap 中就不会重新创建如下图:-

    public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
        throws IOException {
      TableReference tableReference = parseTableSpec(tableSpec);
      if (!createdTables.contains(tableSpec)) {
        synchronized (createdTables) {
          // Another thread may have succeeded in creating the table in the meanwhile, so
          // check again. This check isn't needed for correctness, but we add it to prevent
          // every thread from attempting a create and overwhelming our BigQuery quota.
          if (!createdTables.contains(tableSpec)) {
            TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
            Bigquery client = Transport.newBigQueryClient(options).build();
            BigQueryTableInserter inserter = new BigQueryTableInserter(client);
            inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND,
                CreateDisposition.CREATE_IF_NEEDED, tableSchema);
            createdTables.add(tableSpec);
          }
        }
      }
      return tableReference;
    }

为了满足您的要求,您可能必须维护自己的 BigqueryIO,其中您不执行此特定检查

if (!createdTables.contains(tableSpec)) {

但更重要的问题是为什么 table 会在生产系统中自行删除?应该解决此问题,而不是尝试从 Dataflow 重新创建 table。