数据流 - 未调用函数 - 错误 - 名称未定义
Dataflow - Function not being called - Error - name not defined
我在 Google Dataflow 上使用 Apache Beam,我正在通过 lambda 函数调用函数 sentiment,但我收到一个错误,指出函数名称未定义。
output_tweets = (lines
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'assign window key' >> beam.WindowInto(window.FixedWindows(10))
| 'batch into n batches' >> BatchElements(min_batch_size=49, max_batch_size=50)
| 'sentiment analysis' >> beam.FlatMap(lambda x: sentiment(x))
)
这是我的 Apache Beam 调用,在最后一行中提到了情绪功能,这让我遇到了问题。
功能代码如下(我认为应该无关紧要):
def sentiment(messages):
if not isinstance(messages, list):
messages = [messages]
instances = list(map(lambda message: json.loads(message), messages))
lservice = discovery.build('language', 'v1beta1', developerKey = APIKEY)
for instance in instances['text']:
response = lservice.documents().analyzeSentiment(
body ={
'document': {
'type': 'PLAIN_TEXT',
'content': instance
}
}
).execute()
instance['polarity'] = response['documentSentiment']['polarity']
instance['magnitude'] = response['documentSentiment']['magnitude']
return instances
我得到以下回溯
File "stream.py", line 97, in <lambda>
NameError: name 'sentiment' is not defined [while running 'generatedPtransform-441']
有什么想法吗?
出现此问题的原因有多种
- 函数
sentiment
定义是否存在于与光束管道相同的 python 文件中。
- 函数
sentiment
的定义是在beam pipeline调用之前吗?
我做了一个如下的快速测试,如果以上两个都遵循它,它就会按预期工作
def testing(messages):
return messages.lower()
windowed_lower_word_counts = (windowed_words
| beam.Map(lambda word: testing(word))
| "count" >> beam.combiners.Count.PerElement())
ib.show(windowed_lower_word_counts, include_window_info=True)
0 b'have' 3 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
1 b'ransom' 1 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
2 b'let' 1 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
3 b'me' 1 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
如果函数是在调用之后定义的,那么我们会得到如下所示的错误
windowed_lower_word_counts = (windowed_words
| beam.Map(lambda word: testing_after(word))
| "count" >> beam.combiners.Count.PerElement())
ib.show(windowed_lower_word_counts, include_window_info=True)
ERROR:apache_beam.runners.direct.executor:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f478f344820>, due to an exception.
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 954, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 552, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/root/apache-beam-custom/packages/beam/sdks/python/apache_beam/transforms/core.py", line 1482, in <lambda>
wrapper = lambda x: [fn(x)]
File "<ipython-input-19-f34e29a17836>", line 2, in <lambda>
| beam.Map(lambda word: testing_after_new(word))
NameError: name 'testing_after' is not defined
def testing_after(messages):
return messages.lower()
更新
而不是将函数作为 beam.FlatMap(lambda x : fn(x))
传递函数作为 beam.FlatMap(x)
我认为在第一种情况下,beam 会尝试在 worker 机器中查找 fn,但无法找到。可以在此处找到实施细节 - https://github.com/apache/beam/blob/fa4f4183a315f061e035d38ba2c5d4b837b371e0/sdks/python/apache_beam/transforms/core.py#L780
我在 Google Dataflow 上使用 Apache Beam,我正在通过 lambda 函数调用函数 sentiment,但我收到一个错误,指出函数名称未定义。
output_tweets = (lines
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'assign window key' >> beam.WindowInto(window.FixedWindows(10))
| 'batch into n batches' >> BatchElements(min_batch_size=49, max_batch_size=50)
| 'sentiment analysis' >> beam.FlatMap(lambda x: sentiment(x))
)
这是我的 Apache Beam 调用,在最后一行中提到了情绪功能,这让我遇到了问题。
功能代码如下(我认为应该无关紧要):
def sentiment(messages):
if not isinstance(messages, list):
messages = [messages]
instances = list(map(lambda message: json.loads(message), messages))
lservice = discovery.build('language', 'v1beta1', developerKey = APIKEY)
for instance in instances['text']:
response = lservice.documents().analyzeSentiment(
body ={
'document': {
'type': 'PLAIN_TEXT',
'content': instance
}
}
).execute()
instance['polarity'] = response['documentSentiment']['polarity']
instance['magnitude'] = response['documentSentiment']['magnitude']
return instances
我得到以下回溯
File "stream.py", line 97, in <lambda>
NameError: name 'sentiment' is not defined [while running 'generatedPtransform-441']
有什么想法吗?
出现此问题的原因有多种
- 函数
sentiment
定义是否存在于与光束管道相同的 python 文件中。 - 函数
sentiment
的定义是在beam pipeline调用之前吗?
我做了一个如下的快速测试,如果以上两个都遵循它,它就会按预期工作
def testing(messages):
return messages.lower()
windowed_lower_word_counts = (windowed_words
| beam.Map(lambda word: testing(word))
| "count" >> beam.combiners.Count.PerElement())
ib.show(windowed_lower_word_counts, include_window_info=True)
0 b'have' 3 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
1 b'ransom' 1 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
2 b'let' 1 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
3 b'me' 1 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
如果函数是在调用之后定义的,那么我们会得到如下所示的错误
windowed_lower_word_counts = (windowed_words
| beam.Map(lambda word: testing_after(word))
| "count" >> beam.combiners.Count.PerElement())
ib.show(windowed_lower_word_counts, include_window_info=True)
ERROR:apache_beam.runners.direct.executor:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f478f344820>, due to an exception.
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 954, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 552, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/root/apache-beam-custom/packages/beam/sdks/python/apache_beam/transforms/core.py", line 1482, in <lambda>
wrapper = lambda x: [fn(x)]
File "<ipython-input-19-f34e29a17836>", line 2, in <lambda>
| beam.Map(lambda word: testing_after_new(word))
NameError: name 'testing_after' is not defined
def testing_after(messages):
return messages.lower()
更新
而不是将函数作为 beam.FlatMap(lambda x : fn(x))
传递函数作为 beam.FlatMap(x)
我认为在第一种情况下,beam 会尝试在 worker 机器中查找 fn,但无法找到。可以在此处找到实施细节 - https://github.com/apache/beam/blob/fa4f4183a315f061e035d38ba2c5d4b837b371e0/sdks/python/apache_beam/transforms/core.py#L780