NameError: name 'pvalue' is not defined

NameError: name 'pvalue' is not defined

在此处 (https://beam.apache.org/documentation/programming-guide/#additional-outputs) 的 4.5.2 文档中,有一个 pvalue.TaggedOutput() yielded。

pvalue 似乎很难导入,我从 apache 文档中复制了导入行,我使用 --save_main_session 选项以及 save_main_session=True def run() 以及 pipeline_options.view_as(SetupOptions).save_main_session = save_main_session 在我开始管道之前。所有导入都适用于所有功能,所有 类 适用于所有功能。但不是 pvalue。我还尝试了所有这些可能的组合,并将它们排除在外。 pvalue 始终未知。

我从这里的食谱中获取所有代码:https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py

然而,没有pvalue。
NameError: name 'pvalue' is not defined [while running 'generatedPtransform-1725']

此错误仅在我使用 Dataflow运行ner 时出现,而在我使用 Direct运行ner 时不会出现。

我的 DoFn 示例

class Splitter(beam.DoFn):

    TAG1 = 'kleintje'
    TAG2 = 'grootje'

    def process(self, element):
        splittertid = element.get('id')

        if splittertid < 100:
            yield pvalue.TaggedOutput(self.TAG1, element)
        else:
            yield pvalue.TaggedOutput(self.TAG2, element)

我的例子 运行()

def run(argv=None, save_main_session=True):
    sources = [
        json.loads('{"id":72234,"value":1'),
        json.loads('{"id":23,"value":2}')
        ]

    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    with beam.Pipeline(options=pipeline_options) as p:
           | beam.Create(sources)
           | beam.ParDo(Splitter()).with_outputs(Splitter.TAG1,Splitter.TAG2,main=Splitter.TAG1)

** 我的进口 **

from __future__ import absolute_import

import argparse
import logging
import re
import json
import datetime
from jsonschema import validate

import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json

您应该尝试在 class 拆分器中导入 pvalue,因为在使用 Apache beam 时应在 classes 和函数中声明依赖项。

你的代码应该是这样的

class Splitter(beam.DoFn):
    from apache_beam import pvalue
    TAG1 = 'kleintje'
    TAG2 = 'grootje'

    def process(self, element):
        splittertid = element.get('id')

        if splittertid < 100:
            yield pvalue.TaggedOutput(self.TAG1, element)
        else:
            yield pvalue.TaggedOutput(self.TAG2, element)

您可以通过 Directrunner 正常使用 from apache_beam import pvalue,因为本地代码是 运行;但是,在使用 Dataflowrunner 时,代码应遵循正确处理 dependencies.

的结构

Dataflowrunner 的依赖关系不知何故搞砸了。通过加载一组错误的依赖项,然后再次删除它们,事情突然开始起作用了。 像 from apache_beam import pvalue 这样的导入似乎毕竟是正确的。

也许这里吸取的教训是可能存在损坏的依赖项,您可以通过安装和卸载旧的或错误的 apache_beam 包触发强制重新安装来修复这些依赖项。