Python/Apache-Beam: 如何将文本文件解析为CSV?

Python/Apache-Beam: How to Parse Text File To CSV?

我还是 Beam 的新手,但是您究竟如何读取 GCS 存储桶中的 CSV 文件?我本质上是使用 Beam 将这些文件转换为 pandas 数据帧,然后将 sklearn 模型应用于 "train" 此数据。我见过 pre-define 和 header 的大多数示例,我希望这个 Beam 管道能够推广到任何 header 肯定会不同的文件。有一个名为 beam_utils 的库可以完成我想做的事情,但后来我 运行 遇到了这个错误:AttributeError: module 'apache_beam.io.fileio' has no attribute 'CompressionTypes'

代码示例:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# The error occurs in this import
from beam_utils.sources import CsvFileSource

options = {
    'project': 'my-project',
    'runner:': 'DirectRunner',
    'streaming': False
}

pipeline_options = PipelineOptions(flags=[], **options)

class Printer(beam.DoFn):
    def process(self, element):
        print(element)

with beam.Pipeline(options=pipeline_options) as p:  # Create the Pipeline with the specified options.

    data = (p
            | 'Read File From GCS' >> beam.io.textio.ReadFromText('gs://my-csv-files')
            )

    _ = (data | "Print the data" >> beam.ParDo(Printer()))

result = p.run()
result.wait_until_finish()

Apache Beam 模块 fileio 最近被修改为向后不兼容的更改,并且库 beam_utils 尚未更新。

我完成了 suggested by @Pablo and the source code of beam_utils (also written by Pablo) to replicate the behavior using the filesystems 模块。

下面是使用 pandas 生成 DataFrame 的代码的两个版本。

csv 用于示例:

a,b
1,2
3,4
5,6

读取 csv 并创建包含其所有内容的 DataFrame

import apache_beam as beam
import pandas as pd
import csv
import io

def create_dataframe(readable_file):

    # Open a channel to read the file from GCS
    gcs_file = beam.io.filesystems.FileSystems.open(readable_file)

    # Read it as csv, you can also use csv.reader
    csv_dict = csv.DictReader(io.TextIOWrapper(gcs_file))

    # Create the DataFrame
    dataFrame = pd.DataFrame(csv_dict)
    print(dataFrame.to_string())

p = beam.Pipeline()
(p | beam.Create(['gs://my-bucket/my-file.csv'])
   | beam.FlatMap(create_dataframe)
)

p.run()

结果数据帧

   a  b
0  1  2
1  3  4
2  5  6

读取 csv 并在其他转换中创建数据帧

def get_csv_reader(readable_file):

    # Open a channel to read the file from GCS
    gcs_file = beam.io.filesystems.FileSystems.open(readable_file)

    # Return the csv reader
    return  csv.DictReader(io.TextIOWrapper(gcs_file))

p = beam.Pipeline()
(p | beam.Create(['gs://my-bucket/my-file.csv'])
   | beam.FlatMap(get_csv_reader)
   | beam.Map(lambda x: pd.DataFrame([x])) # Create the DataFrame from each csv row
   | beam.Map(lambda x: print(x.to_string()))
)

结果数据帧

   a  b
0  1  2
   a  b
0  3  4
   a  b
0  5  6