使用 Apache Beam 笔记本启动数据流作业时处理名称错误
Handle nameerrors when launching Dataflow jobs with Apache Beam notebooks
当我 运行 the example notebook Dataflow_Word_count.ipynb 在 Google Cloud Platform 的网站上可用时,我可以使用 Apache Beam 笔记本启动数据流作业,并且作业成功完成。管道定义如下。
class ReadWordsFromText(beam.PTransform):
def __init__(self, file_pattern):
self._file_pattern = file_pattern
def expand(self, pcoll):
return (pcoll.pipeline
| beam.io.ReadFromText(self._file_pattern)
| beam.FlatMap(lambda line: re.findall(r'[\w\']+', line.strip(), re.UNICODE)))
p = beam.Pipeline(InteractiveRunner())
words = p | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
counts = (words
| 'count' >> beam.combiners.Count.PerElement())
lower_counts = (words
| "lower" >> beam.Map(lambda word: word.lower())
| "lower_count" >> beam.combiners.Count.PerElement()
如果我使用新函数重构提取单词的部分,如下所示
def extract_words(line):
return re.findall(r'[\w\']+', line.strip(), re.UNICODE)
class ReadWordsFromText(beam.PTransform):
def __init__(self, file_pattern):
self._file_pattern = file_pattern
def expand(self, pcoll):
return (pcoll.pipeline
| beam.io.ReadFromText(self._file_pattern)
| beam.FlatMap(lambda line: extract_words(line)))
和 运行 笔记本我收到以下错误消息:
DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "<ipython-input-3-d48b3d7d5e4f>", line 12, in <lambda>
NameError: name 'extract_words' is not defined
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 638, in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "<ipython-input-3-d48b3d7d5e4f>", line 12, in <lambda>
NameError: name 'extract_words' is not defined [while running '[3]: read/FlatMap(<lambda at <ipython-input-3-d48b3d7d5e4f>:12>)']
Note: imports, functions and other variables defined in the global context of your __main__ file of your Dataflow pipeline are, by default, not available in the worker execution environment, and such references will cause a NameError, unless the --save_main_session pipeline option is set to True. Please see https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors for additional documentation on configuring your worker execution environment.
为了处理名称错误,我遵循 the instructions 并添加以下行
options.view_as(SetupOptions).save_main_session=True
但是当我 运行 notebook
时出现以下错误
DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 760, in run
self._load_main_session(self.local_staging_directory)
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 501, in _load_main_session
pickler.load_session(session_file)
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 307, in load_session
return dill.load_session(file_path)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 368, in load_session
module = unpickler.load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 462, in find_class
return StockUnpickler.find_class(self, module, name)
ModuleNotFoundError: No module named 'IPython'
有解决这个问题的简单方法吗?
而不是使用 save_main_session,在 ReadWordsFromText 复合转换之外解压缩提取的单词。这是示例:
def extract_words(line):
return re.findall(r'[\w\']+', line.strip(), re.UNICODE)
class ReadWordsFromText(beam.PTransform):
def __init__(self, file_pattern):
self._file_pattern = file_pattern
def expand(self, pcoll):
return (pcoll.pipeline
| beam.io.ReadFromText(self._file_pattern)
)
words = (p | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
| 'Extract' >> beam.FlatMap(extract_words)
)
counts = (words
| 'count' >> beam.combiners.Count.PerElement())
不修改管道的解决方法是将函数定义作为 class 方法放在 DoFn 定义中。
class ReadWordsFromText(beam.PTransform):
def extract_words(self, line):
return re.findall(r'[\w\']+', line.strip(), re.UNICODE)
def __init__(self, file_pattern):
self._file_pattern = file_pattern
def expand(self, pcoll):
return (pcoll.pipeline
| beam.io.ReadFromText(self._file_pattern)
| beam.FlatMap(lambda line: self.extract_words(line)))
当我 运行 the example notebook Dataflow_Word_count.ipynb 在 Google Cloud Platform 的网站上可用时,我可以使用 Apache Beam 笔记本启动数据流作业,并且作业成功完成。管道定义如下。
class ReadWordsFromText(beam.PTransform):
def __init__(self, file_pattern):
self._file_pattern = file_pattern
def expand(self, pcoll):
return (pcoll.pipeline
| beam.io.ReadFromText(self._file_pattern)
| beam.FlatMap(lambda line: re.findall(r'[\w\']+', line.strip(), re.UNICODE)))
p = beam.Pipeline(InteractiveRunner())
words = p | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
counts = (words
| 'count' >> beam.combiners.Count.PerElement())
lower_counts = (words
| "lower" >> beam.Map(lambda word: word.lower())
| "lower_count" >> beam.combiners.Count.PerElement()
如果我使用新函数重构提取单词的部分,如下所示
def extract_words(line):
return re.findall(r'[\w\']+', line.strip(), re.UNICODE)
class ReadWordsFromText(beam.PTransform):
def __init__(self, file_pattern):
self._file_pattern = file_pattern
def expand(self, pcoll):
return (pcoll.pipeline
| beam.io.ReadFromText(self._file_pattern)
| beam.FlatMap(lambda line: extract_words(line)))
和 运行 笔记本我收到以下错误消息:
DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "<ipython-input-3-d48b3d7d5e4f>", line 12, in <lambda>
NameError: name 'extract_words' is not defined
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 638, in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "<ipython-input-3-d48b3d7d5e4f>", line 12, in <lambda>
NameError: name 'extract_words' is not defined [while running '[3]: read/FlatMap(<lambda at <ipython-input-3-d48b3d7d5e4f>:12>)']
Note: imports, functions and other variables defined in the global context of your __main__ file of your Dataflow pipeline are, by default, not available in the worker execution environment, and such references will cause a NameError, unless the --save_main_session pipeline option is set to True. Please see https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors for additional documentation on configuring your worker execution environment.
为了处理名称错误,我遵循 the instructions 并添加以下行
options.view_as(SetupOptions).save_main_session=True
但是当我 运行 notebook
时出现以下错误DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 760, in run
self._load_main_session(self.local_staging_directory)
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 501, in _load_main_session
pickler.load_session(session_file)
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 307, in load_session
return dill.load_session(file_path)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 368, in load_session
module = unpickler.load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 462, in find_class
return StockUnpickler.find_class(self, module, name)
ModuleNotFoundError: No module named 'IPython'
有解决这个问题的简单方法吗?
而不是使用 save_main_session,在 ReadWordsFromText 复合转换之外解压缩提取的单词。这是示例:
def extract_words(line):
return re.findall(r'[\w\']+', line.strip(), re.UNICODE)
class ReadWordsFromText(beam.PTransform):
def __init__(self, file_pattern):
self._file_pattern = file_pattern
def expand(self, pcoll):
return (pcoll.pipeline
| beam.io.ReadFromText(self._file_pattern)
)
words = (p | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
| 'Extract' >> beam.FlatMap(extract_words)
)
counts = (words
| 'count' >> beam.combiners.Count.PerElement())
不修改管道的解决方法是将函数定义作为 class 方法放在 DoFn 定义中。
class ReadWordsFromText(beam.PTransform):
def extract_words(self, line):
return re.findall(r'[\w\']+', line.strip(), re.UNICODE)
def __init__(self, file_pattern):
self._file_pattern = file_pattern
def expand(self, pcoll):
return (pcoll.pipeline
| beam.io.ReadFromText(self._file_pattern)
| beam.FlatMap(lambda line: self.extract_words(line)))