我使用 ReadFromSpanner 在 Apache Beam 中得到 504 Deadline Exceeded
I get 504 Deadline Exceeded in Apache Beam using ReadFromSpanner
我正在 Apache Beam 和 Python 中构建一个在 Google DataFlow 中运行的应用程序。我在 apache_beam.io.gcp.experimental.spannerio
中使用 ReadFromSpanner
方法。这适用于我的大多数 Spanner 表,但行数 >16m 的真正大表往往会因以下错误而失败:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
work_executor.execute()
File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/experimental/spannerio.py", line 550, in process
for row in read_action(element['partitions']):
File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 143, in __iter__
self._consume_next()
File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 116, in _consume_next
response = six.next(self._response_iterator)
File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/snapshot.py", line 45, in _restart_on_unavailable
for item in iterator:
File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 116, in next
six.raise_from(exceptions.from_grpc_error(exc), exc)
File "<string>", line 3, in raise_from
google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded [while running 'Read from Spanner/Read From Partitions']
根据我的理解,此错误来自 ReadFromSpanner
操作,因为它的工作人员已超时。
为了解决这个问题,我尝试了以下方法:
- 更改了
num_workers
和 disk_size_gb
并按照 Google's Common error guidance 中的建议添加了 --experiments=shuffle_mode=service
标志
- 已将机器类型从
n1-standard-1
更改为 n1-standard-2
,从 here
我的最新代码附在下面。我在行中包括 Transformation
以进行简单的数据整理。
"""Set pipeline arguments."""
options = PipelineOptions(
region=RUNNER_REGION,
project=RUNNER_PROJECT_ID,
runner=RUNNER,
temp_location=TEMP_LOCATION,
job_name=JOB_NAME,
service_account_email=SA_EMAIL,
setup_file=SETUP_FILE_PATH,
disk_size_gb=500,
num_workers=10,
machine_type="n1-standard-2",
save_main_session=True)
"""Build and run the pipeline."""
with beam.Pipeline(options=options) as p:
(p
| "Read from Spanner" >> ReadFromSpanner(SPANNER_PROJECT_ID, SPANNER_INSTANCE_ID, SPANNER_DB, sql=QUERY)
| "Transform elements into dictionary" >> beam.ParDo(Transformation)
| "Write new records to BQ" >> WriteToBigQuery(
BIGQUERY_TABLE,
schema=SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
一个可能的解决方案是编辑超时控制;我已经看到它在 Java 中可用,但在 Python 中不可用。我如何在 Python 中编辑超时控制或者是否有任何其他解决此问题的方法?
我在 googleapis/python-spanner
仓库中提交了这个 issue。库的维护者能够帮助我解决问题,并包括读取和查询的重试和超时选项。
为了解决这个问题,我对 Apache Beam Spanner connector、apache_beam.io.gcp.experimental.spannerio
进行了逆向工程。具体来说 _ReadFromPartitionFn
包括超时选项。
我包含了以下代码,这些代码将 运行 从 Spanner 创建读取分区对象,然后从这些分区对象中读取。请注意,我在 readSpannerPartitions
.
中的 process_query_batch
中使用 timeout
变量
class createSpannerReadPartitions(beam.DoFn):
def __init__(self, SPANNER_CONFIG):
self.project = SPANNER_CONFIG['spanner_project']
self.instance = SPANNER_CONFIG['spanner_instance']
self.db = SPANNER_CONFIG['spanner_database']
self.query = SPANNER_CONFIG['query']
def setup(self):
spanner_client = spanner.Client(self.project)
spanner_instance = spanner_client.instance(self.instance)
spanner_db = spanner_instance.database(self.db)
self.snapshot = spanner_db.batch_snapshot()
self.snapshot_dict = self.snapshot.to_dict()
def process(self, element):
partitioning_action = self.snapshot.generate_query_batches
for p in partitioning_action(self.query):
yield {
"partitions": p,
"transaction_info": self.snapshot_dict}
class readSpannerPartitions(beam.DoFn):
def __init__(self, SPANNER_CONFIG):
self.project = SPANNER_CONFIG['spanner_project']
self.instance = SPANNER_CONFIG['spanner_instance']
self.db = SPANNER_CONFIG['spanner_database']
self.query = SPANNER_CONFIG['query']
def setup(self):
spanner_client = spanner.Client(self.project)
spanner_instance = spanner_client.instance(self.instance)
self.spanner_db = spanner_instance.database(self.db)
self.snapshot = self.spanner_db.batch_snapshot()
self.snapshot_dict = self.snapshot.to_dict()
def process(self, element):
self.snapshot = BatchSnapshot.from_dict(
self.spanner_db, element['transaction_info'])
read_action = self.snapshot.process_query_batch
for row in read_action(element['partitions'], timeout=86400):
yield row
def teardown(self):
self.snapshot.close()
然后我像这样创建了管道
with beam.Pipeline(options=options) as p:
p_read = (p | beam.Create(["Start pipeline"])
| 'Generate Partitions' >> beam.ParDo(createSpannerReadPartitions(SPANNER_CONFIG))
| 'Reshuffle' >> beam.Reshuffle()
| 'Read From Partitions' >> beam.ParDo(readSpannerPartitions(SPANNER_CONFIG)))
return p_read
这要归功于 googleapis/python-spanner
存储库的维护者。
我正在 Apache Beam 和 Python 中构建一个在 Google DataFlow 中运行的应用程序。我在 apache_beam.io.gcp.experimental.spannerio
中使用 ReadFromSpanner
方法。这适用于我的大多数 Spanner 表,但行数 >16m 的真正大表往往会因以下错误而失败:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
work_executor.execute()
File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/experimental/spannerio.py", line 550, in process
for row in read_action(element['partitions']):
File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 143, in __iter__
self._consume_next()
File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 116, in _consume_next
response = six.next(self._response_iterator)
File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/snapshot.py", line 45, in _restart_on_unavailable
for item in iterator:
File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 116, in next
six.raise_from(exceptions.from_grpc_error(exc), exc)
File "<string>", line 3, in raise_from
google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded [while running 'Read from Spanner/Read From Partitions']
根据我的理解,此错误来自 ReadFromSpanner
操作,因为它的工作人员已超时。
为了解决这个问题,我尝试了以下方法:
- 更改了
num_workers
和disk_size_gb
并按照 Google's Common error guidance 中的建议添加了 - 已将机器类型从
n1-standard-1
更改为n1-standard-2
,从 here
--experiments=shuffle_mode=service
标志
我的最新代码附在下面。我在行中包括 Transformation
以进行简单的数据整理。
"""Set pipeline arguments."""
options = PipelineOptions(
region=RUNNER_REGION,
project=RUNNER_PROJECT_ID,
runner=RUNNER,
temp_location=TEMP_LOCATION,
job_name=JOB_NAME,
service_account_email=SA_EMAIL,
setup_file=SETUP_FILE_PATH,
disk_size_gb=500,
num_workers=10,
machine_type="n1-standard-2",
save_main_session=True)
"""Build and run the pipeline."""
with beam.Pipeline(options=options) as p:
(p
| "Read from Spanner" >> ReadFromSpanner(SPANNER_PROJECT_ID, SPANNER_INSTANCE_ID, SPANNER_DB, sql=QUERY)
| "Transform elements into dictionary" >> beam.ParDo(Transformation)
| "Write new records to BQ" >> WriteToBigQuery(
BIGQUERY_TABLE,
schema=SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
一个可能的解决方案是编辑超时控制;我已经看到它在 Java 中可用,但在 Python 中不可用。我如何在 Python 中编辑超时控制或者是否有任何其他解决此问题的方法?
我在 googleapis/python-spanner
仓库中提交了这个 issue。库的维护者能够帮助我解决问题,并包括读取和查询的重试和超时选项。
为了解决这个问题,我对 Apache Beam Spanner connector、apache_beam.io.gcp.experimental.spannerio
进行了逆向工程。具体来说 _ReadFromPartitionFn
包括超时选项。
我包含了以下代码,这些代码将 运行 从 Spanner 创建读取分区对象,然后从这些分区对象中读取。请注意,我在 readSpannerPartitions
.
process_query_batch
中使用 timeout
变量
class createSpannerReadPartitions(beam.DoFn):
def __init__(self, SPANNER_CONFIG):
self.project = SPANNER_CONFIG['spanner_project']
self.instance = SPANNER_CONFIG['spanner_instance']
self.db = SPANNER_CONFIG['spanner_database']
self.query = SPANNER_CONFIG['query']
def setup(self):
spanner_client = spanner.Client(self.project)
spanner_instance = spanner_client.instance(self.instance)
spanner_db = spanner_instance.database(self.db)
self.snapshot = spanner_db.batch_snapshot()
self.snapshot_dict = self.snapshot.to_dict()
def process(self, element):
partitioning_action = self.snapshot.generate_query_batches
for p in partitioning_action(self.query):
yield {
"partitions": p,
"transaction_info": self.snapshot_dict}
class readSpannerPartitions(beam.DoFn):
def __init__(self, SPANNER_CONFIG):
self.project = SPANNER_CONFIG['spanner_project']
self.instance = SPANNER_CONFIG['spanner_instance']
self.db = SPANNER_CONFIG['spanner_database']
self.query = SPANNER_CONFIG['query']
def setup(self):
spanner_client = spanner.Client(self.project)
spanner_instance = spanner_client.instance(self.instance)
self.spanner_db = spanner_instance.database(self.db)
self.snapshot = self.spanner_db.batch_snapshot()
self.snapshot_dict = self.snapshot.to_dict()
def process(self, element):
self.snapshot = BatchSnapshot.from_dict(
self.spanner_db, element['transaction_info'])
read_action = self.snapshot.process_query_batch
for row in read_action(element['partitions'], timeout=86400):
yield row
def teardown(self):
self.snapshot.close()
然后我像这样创建了管道
with beam.Pipeline(options=options) as p:
p_read = (p | beam.Create(["Start pipeline"])
| 'Generate Partitions' >> beam.ParDo(createSpannerReadPartitions(SPANNER_CONFIG))
| 'Reshuffle' >> beam.Reshuffle()
| 'Read From Partitions' >> beam.ParDo(readSpannerPartitions(SPANNER_CONFIG)))
return p_read
这要归功于 googleapis/python-spanner
存储库的维护者。