数据流作业在更新后保留旧的错误状态
Dataflow job retains old error state after updating
当我使用 DataflowRunner 提交我的数据流作业时(我正在使用带有 Pub/Sub 源的流作业),我在定义 BQ table 名称的执行参数时犯了一个错误(假设错误的 table 名称是 project-A) 并且作业抛出了一些错误。然后我用正确的 table 名称使用 --update 命令更新了作业,但是作业又抛出了一些错误,即错误告诉我我仍在使用项目-A 作为 BQ table 名称.
简而言之,这就是我所做的样子:
- 我提交了一个 Dataflow 作业
python main.py \
--job_name=dataflow-job1 \
--runner=DataflowRunner \
--staging_location=gs://project-B-bucket/staging \
--temp_location=gs://project-B-bucket/temp \
--dataset=project-A:table-A
- 我遇到错误,因为项目-A:table-A 不是正确的数据集
{
"error": {
"code": 403,
"message": "Access Denied: Dataset project-A:table-A: User does not have bigquery.datasets.get permission for dataset project-A:table-A.",
"errors": [
{
"message": "Access Denied: Dataset project-A:table-A: User does not have bigquery.datasets.get permission for dataset project-A:table-A.",
"domain": "global",
"reason": "accessDenied"
}
],
"status": "PERMISSION_DENIED"
}
}
- 我使用 --update
更新了作业
python main.py \
--job_name=dataflow-job1 \
--runner=DataflowRunner \
--staging_location=gs://project-B-bucket/staging \
--temp_location=gs://project-B-bucket/temp \
--dataset=project-B:table-B \
--update
- 然后我得到了和之前一样的错误(第2点)
为什么工作好像还保留着原来的状态?我认为如果 Dataflow 在工作中检测到错误,它不会处理管道并且 Pub/Sub 不会被确认并且管道将重新启动。
2020-12-08 更新:
这就是我传递参数参数的方式:
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--dataset')
...
class WriteToBigQuery(beam.PTransform):
def __init__(self, name):
self.name = name
def expand(self, pcoll):
return (pcoll
| 'WriteBQ' >> beam.io.WriteToBigQuery(
'{0}.my_table'.format(self.name),
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
def run(argv=None, save_main_session=True):
pipeline_options = PipelineOptions(flags=argv)
pipeline_options.view_as(StandardOptions).streaming = True
my_args = pipeline_options.view_as(MyOptions)
...
with beam.Pipeline(options=pipeline_options) as p:
...
# I wrapped the BQ write component inside a PTransform class
output | 'WriteBQ' >> WriteToBigQuery(my_args.dataset)
您无法在更新数据流流作业时更改管道参数。您只能更新管道的转换。
当我使用 DataflowRunner 提交我的数据流作业时(我正在使用带有 Pub/Sub 源的流作业),我在定义 BQ table 名称的执行参数时犯了一个错误(假设错误的 table 名称是 project-A) 并且作业抛出了一些错误。然后我用正确的 table 名称使用 --update 命令更新了作业,但是作业又抛出了一些错误,即错误告诉我我仍在使用项目-A 作为 BQ table 名称.
简而言之,这就是我所做的样子:
- 我提交了一个 Dataflow 作业
python main.py \
--job_name=dataflow-job1 \
--runner=DataflowRunner \
--staging_location=gs://project-B-bucket/staging \
--temp_location=gs://project-B-bucket/temp \
--dataset=project-A:table-A
- 我遇到错误,因为项目-A:table-A 不是正确的数据集
{
"error": {
"code": 403,
"message": "Access Denied: Dataset project-A:table-A: User does not have bigquery.datasets.get permission for dataset project-A:table-A.",
"errors": [
{
"message": "Access Denied: Dataset project-A:table-A: User does not have bigquery.datasets.get permission for dataset project-A:table-A.",
"domain": "global",
"reason": "accessDenied"
}
],
"status": "PERMISSION_DENIED"
}
}
- 我使用 --update 更新了作业
python main.py \
--job_name=dataflow-job1 \
--runner=DataflowRunner \
--staging_location=gs://project-B-bucket/staging \
--temp_location=gs://project-B-bucket/temp \
--dataset=project-B:table-B \
--update
- 然后我得到了和之前一样的错误(第2点)
为什么工作好像还保留着原来的状态?我认为如果 Dataflow 在工作中检测到错误,它不会处理管道并且 Pub/Sub 不会被确认并且管道将重新启动。
2020-12-08 更新: 这就是我传递参数参数的方式:
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--dataset')
...
class WriteToBigQuery(beam.PTransform):
def __init__(self, name):
self.name = name
def expand(self, pcoll):
return (pcoll
| 'WriteBQ' >> beam.io.WriteToBigQuery(
'{0}.my_table'.format(self.name),
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
def run(argv=None, save_main_session=True):
pipeline_options = PipelineOptions(flags=argv)
pipeline_options.view_as(StandardOptions).streaming = True
my_args = pipeline_options.view_as(MyOptions)
...
with beam.Pipeline(options=pipeline_options) as p:
...
# I wrapped the BQ write component inside a PTransform class
output | 'WriteBQ' >> WriteToBigQuery(my_args.dataset)
您无法在更新数据流流作业时更改管道参数。您只能更新管道的转换。