Beam:ReadAllFromText 从 DoFn 不同行为接收字符串或列表?

Beam: ReadAllFromText receive string or list from DoFn different behavior?

我有一个从 GCSPub\Sub

的管道读取文件
class ExtractFileNameFn(beam.DoFn):
    def process(self, element):
        file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
        logging.info("Load file: " + file_name)
        yield file_name

class LogFn(beam.DoFn):
    def process(self, element):
        logging.info(element)
        return [element]

class LogPassThroughFn(beam.DoFn):
    def process(self, element):
        logging.info(element)
        return element

...

    p
    | "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
    | "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
    | "Extract File Name" >> beam.ParDo(ExtractFileNameFn())
    | 'Log Results' >> beam.ParDo(LogFn())
    # | 'Log Results' >> beam.ParDo(LogPassThroughFn())
    | "Read File from GCS" >> beam.io.ReadAllFromText()

LogPassThroughFnLogPassThroughFn的区别在于return值的类型,一个是string,另一个是listLogFn 在测试代码中运行良好,但 LogPassThroughFn 使管道失败 运行。每

Beam Python SDK still tries to interpret the output of that ParDo as if it was a collection of elements. And it does so by interpreting the string you emitted as collection of characters.

我们知道 LogFn 应该可以正常工作。

但是,我注意到 ExtractFileNameFn return string 而不是 list。那是对的吗?然后我测试如下,return list in ExtractFileNameFn1

class ExtractFileNameFn1(beam.DoFn):
    def process(self, element):
        file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
        logging.info("Load file: " + file_name)
        yield [file_name]

...

    p
    | "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
    | "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
    | "Extract File Name" >> beam.ParDo(ExtractFileNameFn1())
    | "Read File from GCS" >> beam.io.ReadAllFromText()

现在,管道无法 运行...

我的问题是 DoFn 中的 return string 和 return list 有什么区别?为什么 ReadAllFromText 可以从 ExtractFileNameFn 收到 string,但从 LogFn 收到 list

光束版本:2.14.0

ParDo 的文档说:

Note that the DoFn must return an iterable for each element of the input PCollection. An easy way to do this is to use the yield keyword in the process method.

https://beam.apache.org/releases/pydoc/2.6.0/apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo

返回可迭代对象的目的是您的输入元素可能不会与您的输出元素 1-1 映射。一个输入可能会产生多个输出。

您可以边走边 yield 它们,或者您可以将它们收集到一个列表中,然后 return 最后

所以这个:

class ExtractFileNameFn(beam.DoFn):
    def process(self, element):
        file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
        logging.info("Load file: " + file_name)
        yield file_name

将与此相同:

class ExtractFileNameFn(beam.DoFn):
    def process(self, element):
        file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
        logging.info("Load file: " + file_name)
        return [file_name]

两者的输出元素都是字符串,每个输出元素都是一个文件名

当您执行 yield [file_name] 时,每个输出元素实际上是一个包含字符串的列表