NameError: name 'pvalue' is not defined
NameError: name 'pvalue' is not defined
在此处 (https://beam.apache.org/documentation/programming-guide/#additional-outputs) 的 4.5.2 文档中,有一个 pvalue.TaggedOutput()
yielded。
pvalue
似乎很难导入,我从 apache 文档中复制了导入行,我使用 --save_main_session
选项以及 save_main_session=True
def run()
以及 pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
在我开始管道之前。所有导入都适用于所有功能,所有 类 适用于所有功能。但不是 pvalue
。我还尝试了所有这些可能的组合,并将它们排除在外。 pvalue
始终未知。
我从这里的食谱中获取所有代码:https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
然而,没有pvalue。
NameError: name 'pvalue' is not defined [while running 'generatedPtransform-1725']
此错误仅在我使用 Dataflow运行ner 时出现,而在我使用 Direct运行ner 时不会出现。
我的 DoFn 示例
class Splitter(beam.DoFn):
TAG1 = 'kleintje'
TAG2 = 'grootje'
def process(self, element):
splittertid = element.get('id')
if splittertid < 100:
yield pvalue.TaggedOutput(self.TAG1, element)
else:
yield pvalue.TaggedOutput(self.TAG2, element)
我的例子 运行()
def run(argv=None, save_main_session=True):
sources = [
json.loads('{"id":72234,"value":1'),
json.loads('{"id":23,"value":2}')
]
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
| beam.Create(sources)
| beam.ParDo(Splitter()).with_outputs(Splitter.TAG1,Splitter.TAG2,main=Splitter.TAG1)
** 我的进口 **
from __future__ import absolute_import
import argparse
import logging
import re
import json
import datetime
from jsonschema import validate
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
您应该尝试在 class 拆分器中导入 pvalue,因为在使用 Apache beam 时应在 classes 和函数中声明依赖项。
你的代码应该是这样的
class Splitter(beam.DoFn):
from apache_beam import pvalue
TAG1 = 'kleintje'
TAG2 = 'grootje'
def process(self, element):
splittertid = element.get('id')
if splittertid < 100:
yield pvalue.TaggedOutput(self.TAG1, element)
else:
yield pvalue.TaggedOutput(self.TAG2, element)
您可以通过 Directrunner 正常使用 from apache_beam import pvalue
,因为本地代码是 运行;但是,在使用 Dataflowrunner 时,代码应遵循正确处理 dependencies.
的结构
Dataflowrunner 的依赖关系不知何故搞砸了。通过加载一组错误的依赖项,然后再次删除它们,事情突然开始起作用了。
像 from apache_beam import pvalue
这样的导入似乎毕竟是正确的。
也许这里吸取的教训是可能存在损坏的依赖项,您可以通过安装和卸载旧的或错误的 apache_beam
包触发强制重新安装来修复这些依赖项。
在此处 (https://beam.apache.org/documentation/programming-guide/#additional-outputs) 的 4.5.2 文档中,有一个 pvalue.TaggedOutput()
yielded。
pvalue
似乎很难导入,我从 apache 文档中复制了导入行,我使用 --save_main_session
选项以及 save_main_session=True
def run()
以及 pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
在我开始管道之前。所有导入都适用于所有功能,所有 类 适用于所有功能。但不是 pvalue
。我还尝试了所有这些可能的组合,并将它们排除在外。 pvalue
始终未知。
我从这里的食谱中获取所有代码:https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
然而,没有pvalue。
NameError: name 'pvalue' is not defined [while running 'generatedPtransform-1725']
此错误仅在我使用 Dataflow运行ner 时出现,而在我使用 Direct运行ner 时不会出现。
我的 DoFn 示例
class Splitter(beam.DoFn):
TAG1 = 'kleintje'
TAG2 = 'grootje'
def process(self, element):
splittertid = element.get('id')
if splittertid < 100:
yield pvalue.TaggedOutput(self.TAG1, element)
else:
yield pvalue.TaggedOutput(self.TAG2, element)
我的例子 运行()
def run(argv=None, save_main_session=True):
sources = [
json.loads('{"id":72234,"value":1'),
json.loads('{"id":23,"value":2}')
]
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
| beam.Create(sources)
| beam.ParDo(Splitter()).with_outputs(Splitter.TAG1,Splitter.TAG2,main=Splitter.TAG1)
** 我的进口 **
from __future__ import absolute_import
import argparse
import logging
import re
import json
import datetime
from jsonschema import validate
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
您应该尝试在 class 拆分器中导入 pvalue,因为在使用 Apache beam 时应在 classes 和函数中声明依赖项。
你的代码应该是这样的
class Splitter(beam.DoFn):
from apache_beam import pvalue
TAG1 = 'kleintje'
TAG2 = 'grootje'
def process(self, element):
splittertid = element.get('id')
if splittertid < 100:
yield pvalue.TaggedOutput(self.TAG1, element)
else:
yield pvalue.TaggedOutput(self.TAG2, element)
您可以通过 Directrunner 正常使用 from apache_beam import pvalue
,因为本地代码是 运行;但是,在使用 Dataflowrunner 时,代码应遵循正确处理 dependencies.
Dataflowrunner 的依赖关系不知何故搞砸了。通过加载一组错误的依赖项,然后再次删除它们,事情突然开始起作用了。
像 from apache_beam import pvalue
这样的导入似乎毕竟是正确的。
也许这里吸取的教训是可能存在损坏的依赖项,您可以通过安装和卸载旧的或错误的 apache_beam
包触发强制重新安装来修复这些依赖项。