Apache Beam pipeline with DataFlowRunner runs into _dill.py: "ModuleNotFoundError: No module named 'main'" when deployed from cloud function
Apache Beam pipeline with DataFlowRunner runs into _dill.py: "ModuleNotFoundError: No module named 'main'" when deployed from cloud function
我正在尝试使用 Python SDK 从 GCP 上的云功能执行数据流管道。在笔记本服务器上测试了代码,其中管道与 DataFlowRunner 一起工作。但是,当使用 Cloud Functions 调用管道时,我得到以下信息:
错误
Traceback (most recent call last): File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py",
line 346, in run_http_function result = _function_handler.invoke_user_function(flask.request) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py",
line 217, in invoke_user_function return call_user_function(request_or_event) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py",
line 210, in call_user_function return self._user_function(request_or_event) File "/user_code/main.py",
line 215, in run_main BUCKET=BUCKET) File "/user_code/main.py",
line 143, in dataflow create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED File "/env/local/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 481, in __exit__ self.run().wait_until_finish() File "/env/local/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
line 1449, in wait_until_finish (self.state, getattr(self._runner, 'last_error_msg', None)), self) apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 286, in loads return dill.loads(s) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 462, in find_class return StockUnpickler.find_class(self, module, name) ModuleNotFoundError: No module named 'main' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py",
line 648, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py",
line 176, in execute op.start() File "apache_beam/runners/worker/operations.py",
line 649, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py",
line 651, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py",
line 652, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py",
line 261, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py",
line 266, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py",
line 597, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/worker/operations.py",
line 602, in apache_beam.runners.worker.operations.DoOperation.setup File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 290, in loads return dill.loads(s) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 462, in find_class return StockUnpickler.find_class(self, module, name) ModuleNotFoundError: No module named 'main'
所以在我看来,只有在无服务器调用数据流时才会出现此问题。我尝试添加一个 to make the pipeline install the dependencies in correct versions but this didn't fix it. Seems to me that this question is similar to this one,但我怀疑那里唯一的(未被接受的)答案是否有效,因为云函数代码始终从 main.py.
运行
流水线代码
class getResponse(beam.DoFn):
def process(self, element, urlfield, pfield):
response = requests.get(element[urlfield])
status_code = response.status_code
if status_code >= 200 and status_code < 300:
yield {'id': element[pfield], 'response': response, 'url': element[urlfield]}
class getImageData(beam.DoFn):
def process(self, element, responsefield, urlfield, pfield):
p = element[responsefield]
img = Image.open(BytesIO(p.content)).resize((10, 10), Image.ANTIALIAS).convert("L")
yield {'id': element[pfield], 'url': element[urlfield], 'image_data': list(img.getdata())}
class outputDummies(beam.DoFn):
def process(self, element, dummy_data, image_datafield, urlfield, pfield):
if element[image_datafield] == dummy_data:
yield {'id': element[pfield], 'url': element[urlfield]}
def dataflow(in_test_mode=True,
query=None,
table_schema=None,
table_spec=None,
dummy_data=None,
job_name=None,
PROJECT=None,
REGION=None,
BUCKET=None):
if in_test_mode:
RUNNER = "DirectRunner"
OUTPUT_DIR = "gs://{0}/dummy_images/".format(BUCKET)
else:
RUNNER = "DataflowRunner"
OUTPUT_DIR = "gs://{0}/dummy_images/".format(BUCKET)
options = {
"job_name": job_name,
"project": PROJECT,
"region": REGION,
"staging_location": os.path.join(OUTPUT_DIR, "tmp", "staging"),
"temp_location": os.path.join(OUTPUT_DIR, "tmp"),
"streaming": False
}
opts = beam.pipeline.PipelineOptions(**options)
# Run Beam
with beam.Pipeline(RUNNER,
options=opts,
argv=['--setup_file', '/tmp/setup.py']) as p:
(p |
"Read data" >> beam.io.Read(beam.io.BigQuerySource(query=query,
use_standard_sql=True)) |
"Get responses" >> beam.ParDo(getResponse(),
urlfield='url',
pfield='id') |
"Process images" >> beam.ParDo(getImageData(),
responsefield='response',
urlfield='url',
pfield='id') |
"Output dummy images" >> beam.ParDo(outputDummies(),
dummy_data=dummy_data,
image_datafield='image_data',
urlfield='url',
pfield='id') |
"Write to BQ" >> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
requirements.txt
apache_beam[gcp]==2.19.0
pillow==6.2.1
requests==2.23.0
有人有解决方法吗?
感谢您的所有建议,最后我没有找到解决错误的方法,但我确实找到了工作流程的解决方案。正如 AMargheriti 所指出的,数据流模板总是存在的。通过创建代码的自定义模板,我能够使用云函数触发流程。有用的文档是 the create dataflow template, the running templates page and lastly this solution because the suggested API on the running templates page does not allow setting a region where to run the template whereas dataflow().projects().locations().templates().launch() 允许添加此选项。
我正在尝试使用 Python SDK 从 GCP 上的云功能执行数据流管道。在笔记本服务器上测试了代码,其中管道与 DataFlowRunner 一起工作。但是,当使用 Cloud Functions 调用管道时,我得到以下信息:
错误
Traceback (most recent call last): File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py",
line 346, in run_http_function result = _function_handler.invoke_user_function(flask.request) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py",
line 217, in invoke_user_function return call_user_function(request_or_event) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py",
line 210, in call_user_function return self._user_function(request_or_event) File "/user_code/main.py",
line 215, in run_main BUCKET=BUCKET) File "/user_code/main.py",
line 143, in dataflow create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED File "/env/local/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 481, in __exit__ self.run().wait_until_finish() File "/env/local/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
line 1449, in wait_until_finish (self.state, getattr(self._runner, 'last_error_msg', None)), self) apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 286, in loads return dill.loads(s) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 462, in find_class return StockUnpickler.find_class(self, module, name) ModuleNotFoundError: No module named 'main' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py",
line 648, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py",
line 176, in execute op.start() File "apache_beam/runners/worker/operations.py",
line 649, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py",
line 651, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py",
line 652, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py",
line 261, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py",
line 266, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py",
line 597, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/worker/operations.py",
line 602, in apache_beam.runners.worker.operations.DoOperation.setup File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 290, in loads return dill.loads(s) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 462, in find_class return StockUnpickler.find_class(self, module, name) ModuleNotFoundError: No module named 'main'
所以在我看来,只有在无服务器调用数据流时才会出现此问题。我尝试添加一个
流水线代码
class getResponse(beam.DoFn):
def process(self, element, urlfield, pfield):
response = requests.get(element[urlfield])
status_code = response.status_code
if status_code >= 200 and status_code < 300:
yield {'id': element[pfield], 'response': response, 'url': element[urlfield]}
class getImageData(beam.DoFn):
def process(self, element, responsefield, urlfield, pfield):
p = element[responsefield]
img = Image.open(BytesIO(p.content)).resize((10, 10), Image.ANTIALIAS).convert("L")
yield {'id': element[pfield], 'url': element[urlfield], 'image_data': list(img.getdata())}
class outputDummies(beam.DoFn):
def process(self, element, dummy_data, image_datafield, urlfield, pfield):
if element[image_datafield] == dummy_data:
yield {'id': element[pfield], 'url': element[urlfield]}
def dataflow(in_test_mode=True,
query=None,
table_schema=None,
table_spec=None,
dummy_data=None,
job_name=None,
PROJECT=None,
REGION=None,
BUCKET=None):
if in_test_mode:
RUNNER = "DirectRunner"
OUTPUT_DIR = "gs://{0}/dummy_images/".format(BUCKET)
else:
RUNNER = "DataflowRunner"
OUTPUT_DIR = "gs://{0}/dummy_images/".format(BUCKET)
options = {
"job_name": job_name,
"project": PROJECT,
"region": REGION,
"staging_location": os.path.join(OUTPUT_DIR, "tmp", "staging"),
"temp_location": os.path.join(OUTPUT_DIR, "tmp"),
"streaming": False
}
opts = beam.pipeline.PipelineOptions(**options)
# Run Beam
with beam.Pipeline(RUNNER,
options=opts,
argv=['--setup_file', '/tmp/setup.py']) as p:
(p |
"Read data" >> beam.io.Read(beam.io.BigQuerySource(query=query,
use_standard_sql=True)) |
"Get responses" >> beam.ParDo(getResponse(),
urlfield='url',
pfield='id') |
"Process images" >> beam.ParDo(getImageData(),
responsefield='response',
urlfield='url',
pfield='id') |
"Output dummy images" >> beam.ParDo(outputDummies(),
dummy_data=dummy_data,
image_datafield='image_data',
urlfield='url',
pfield='id') |
"Write to BQ" >> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
requirements.txt
apache_beam[gcp]==2.19.0
pillow==6.2.1
requests==2.23.0
有人有解决方法吗?
感谢您的所有建议,最后我没有找到解决错误的方法,但我确实找到了工作流程的解决方案。正如 AMargheriti 所指出的,数据流模板总是存在的。通过创建代码的自定义模板,我能够使用云函数触发流程。有用的文档是 the create dataflow template, the running templates page and lastly this solution because the suggested API on the running templates page does not allow setting a region where to run the template whereas dataflow().projects().locations().templates().launch() 允许添加此选项。