在数据流流管道上捕获 BigQuery HttpBadRequestError
Catch BigQuery HttpBadRequestError on Dataflow streaming pipeline
最近,由于超出请求大小,我的 Dataflow 流作业从 BigQuery API 抛出 HttpBadRequestError。
Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1246, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 514, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 520, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1288, in finish_bundle
return self._flush_all_batches()
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1296, in _flush_all_batches
*[
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1297, in <listcomp>
self._flush_batch(destination)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1326, in _flush_batch
passed, errors = self.bigquery_wrapper.insert_rows(
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 984, in insert_rows
result, errors = self._insert_all_rows(
File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/retry.py", line 236, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 539, in _insert_all_rows
response = self.client.tabledata.InsertAll(request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 761, in InsertAll
return self._RunMethod(
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse
raise exceptions.HttpError.FromResponse(
apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/my-projects/datasets/my-datasets/tables/my-tables/insertAll?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Sat, 20 Feb 2021 14:21:53 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '321', '-content-encoding': 'gzip'}>, content <{
"error": {
"code": 400,
"message": "Request payload size exceeds the limit: 10485760 bytes.",
"errors": [
{
"message": "Request payload size exceeds the limit: 10485760 bytes.",
"domain": "global",
"reason": "badRequest"
}
],
"status": "INVALID_ARGUMENT"
}
}
>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 258, in _execute
response = task()
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 315, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 483, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 519, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 990, in process_bundle
op.finish()
File "apache_beam/runners/worker/operations.py", line 730, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 732, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 733, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/common.py", line 1267, in apache_beam.runners.common.DoFnRunner.finish
File "apache_beam/runners/common.py", line 1248, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1246, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 514, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 520, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1288, in finish_bundle
return self._flush_all_batches()
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1296, in _flush_all_batches
*[
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1297, in <listcomp>
self._flush_batch(destination)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1326, in _flush_batch
passed, errors = self.bigquery_wrapper.insert_rows(
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 984, in insert_rows
result, errors = self._insert_all_rows(
File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/retry.py", line 236, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 539, in _insert_all_rows
response = self.client.tabledata.InsertAll(request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 761, in InsertAll
return self._RunMethod(
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse
raise exceptions.HttpError.FromResponse(
RuntimeError: apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/my-projects/datasets/my-datasets/tables/my-tables/insertAll?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Sat, 20 Feb 2021 14:21:53 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '321', '-content-encoding': 'gzip'}>, content <{
"error": {
"code": 400,
"message": "Request payload size exceeds the limit: 10485760 bytes.",
"errors": [
{
"message": "Request payload size exceeds the limit: 10485760 bytes.",
"domain": "global",
"reason": "badRequest"
}
],
"status": "INVALID_ARGUMENT"
}
}
> [while running 'WriteBqTables/WriteBQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)-ptransform-25875']
passed through:
==>
dist_proc/dax/workflow/worker/fnapi_service.cc:631
我想使用 来减轻此类错误,以防它再次发生。
当发生HttpBadRequestError 时,BQ 死字模式是否也有效?还是仅在由于架构不匹配而导致插入行失败时才有效?我为 python
使用 Apache Beam SDK 版本 2.27.0
提前致谢
20201-02-24 更新:我添加了更多显示错误发生时间的堆栈跟踪片段
是的,该模式会起作用。一般来说,它会捕获任何可以捕获的故障(有时事情会严重失败以致于处理完全停止)。
在您的特定情况下,堆栈跟踪包括 this region of BigQueryIO and you can see the failed rows output to the dead letter PCollection just below, here。
最近,由于超出请求大小,我的 Dataflow 流作业从 BigQuery API 抛出 HttpBadRequestError。
Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1246, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 514, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 520, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1288, in finish_bundle
return self._flush_all_batches()
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1296, in _flush_all_batches
*[
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1297, in <listcomp>
self._flush_batch(destination)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1326, in _flush_batch
passed, errors = self.bigquery_wrapper.insert_rows(
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 984, in insert_rows
result, errors = self._insert_all_rows(
File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/retry.py", line 236, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 539, in _insert_all_rows
response = self.client.tabledata.InsertAll(request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 761, in InsertAll
return self._RunMethod(
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse
raise exceptions.HttpError.FromResponse(
apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/my-projects/datasets/my-datasets/tables/my-tables/insertAll?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Sat, 20 Feb 2021 14:21:53 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '321', '-content-encoding': 'gzip'}>, content <{
"error": {
"code": 400,
"message": "Request payload size exceeds the limit: 10485760 bytes.",
"errors": [
{
"message": "Request payload size exceeds the limit: 10485760 bytes.",
"domain": "global",
"reason": "badRequest"
}
],
"status": "INVALID_ARGUMENT"
}
}
>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 258, in _execute
response = task()
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 315, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 483, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 519, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 990, in process_bundle
op.finish()
File "apache_beam/runners/worker/operations.py", line 730, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 732, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 733, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/common.py", line 1267, in apache_beam.runners.common.DoFnRunner.finish
File "apache_beam/runners/common.py", line 1248, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1246, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 514, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 520, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1288, in finish_bundle
return self._flush_all_batches()
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1296, in _flush_all_batches
*[
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1297, in <listcomp>
self._flush_batch(destination)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1326, in _flush_batch
passed, errors = self.bigquery_wrapper.insert_rows(
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 984, in insert_rows
result, errors = self._insert_all_rows(
File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/retry.py", line 236, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 539, in _insert_all_rows
response = self.client.tabledata.InsertAll(request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 761, in InsertAll
return self._RunMethod(
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse
raise exceptions.HttpError.FromResponse(
RuntimeError: apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/my-projects/datasets/my-datasets/tables/my-tables/insertAll?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Sat, 20 Feb 2021 14:21:53 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '321', '-content-encoding': 'gzip'}>, content <{
"error": {
"code": 400,
"message": "Request payload size exceeds the limit: 10485760 bytes.",
"errors": [
{
"message": "Request payload size exceeds the limit: 10485760 bytes.",
"domain": "global",
"reason": "badRequest"
}
],
"status": "INVALID_ARGUMENT"
}
}
> [while running 'WriteBqTables/WriteBQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)-ptransform-25875']
passed through:
==>
dist_proc/dax/workflow/worker/fnapi_service.cc:631
我想使用
当发生HttpBadRequestError 时,BQ 死字模式是否也有效?还是仅在由于架构不匹配而导致插入行失败时才有效?我为 python
使用 Apache Beam SDK 版本 2.27.0提前致谢
20201-02-24 更新:我添加了更多显示错误发生时间的堆栈跟踪片段
是的,该模式会起作用。一般来说,它会捕获任何可以捕获的故障(有时事情会严重失败以致于处理完全停止)。
在您的特定情况下,堆栈跟踪包括 this region of BigQueryIO and you can see the failed rows output to the dead letter PCollection just below, here。