数据流作业在更新后保留旧的错误状态

Dataflow job retains old error state after updating

当我使用 DataflowRunner 提交我的数据流作业时(我正在使用带有 Pub/Sub 源的流作业),我在定义 BQ table 名称的执行参数时犯了一个错误(假设错误的 table 名称是 project-A) 并且作业抛出了一些错误。然后我用正确的 table 名称使用 --update 命令更新了作业,但是作业又抛出了一些错误,即错误告诉我我仍在使用项目-A 作为 BQ table 名称.

简而言之,这就是我所做的样子:

  1. 我提交了一个 Dataflow 作业
python main.py \
 --job_name=dataflow-job1 \
 --runner=DataflowRunner \
 --staging_location=gs://project-B-bucket/staging \
 --temp_location=gs://project-B-bucket/temp \
 --dataset=project-A:table-A
  1. 我遇到错误,因为项目-A:table-A 不是正确的数据集
{
  "error": {
    "code": 403,
    "message": "Access Denied: Dataset project-A:table-A: User does not have bigquery.datasets.get permission for dataset project-A:table-A.",
    "errors": [
      {
        "message": "Access Denied: Dataset project-A:table-A: User does not have bigquery.datasets.get permission for dataset project-A:table-A.",
        "domain": "global",
        "reason": "accessDenied"
      }
    ],
    "status": "PERMISSION_DENIED"
  }
}
  1. 我使用 --update
  2. 更新了作业
python main.py \
 --job_name=dataflow-job1 \
 --runner=DataflowRunner \
 --staging_location=gs://project-B-bucket/staging \
 --temp_location=gs://project-B-bucket/temp \
 --dataset=project-B:table-B \
 --update
  1. 然后我得到了和之前一样的错误(第2点)

为什么工作好像还保留着原来的状态?我认为如果 Dataflow 在工作中检测到​​错误,它不会处理管道并且 Pub/Sub 不会被确认并且管道将重新启动。

2020-12-08 更新: 这就是我传递参数参数的方式:

class MyOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument('--dataset')
...
class WriteToBigQuery(beam.PTransform):
    def __init__(self, name):
         self.name = name

    def expand(self, pcoll):
        return (pcoll
                | 'WriteBQ' >> beam.io.WriteToBigQuery(
            '{0}.my_table'.format(self.name),
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
def run(argv=None, save_main_session=True):
    pipeline_options = PipelineOptions(flags=argv)
    pipeline_options.view_as(StandardOptions).streaming = True
    my_args = pipeline_options.view_as(MyOptions)
    ...
    with beam.Pipeline(options=pipeline_options) as p:
        ...
        # I wrapped the BQ write component inside a PTransform class
        output | 'WriteBQ' >> WriteToBigQuery(my_args.dataset)

您无法在更新数据流流作业时更改管道参数。您只能更新管道的转换。