通过初始键拆分文件处理

Splitting file processing by initial keys

用例

我有一些 TB 的美国 属性 数据要合并。它分布在两种不同的文件格式和数千个文件中。源数据按地域划分。

我找不到将单个管道分支为多个独立处理流的方法。

这特别困难,因为 Dataframe API 似乎不支持对文件名集合的 PTransform

详细背景

文件分布是这样的:

理想的流水线将分成数千个独立的处理步骤并在几分钟内完成。

目录结构是这样的:

state-data/
 |-AL.zip
 |-AK.zip
 |-...
 |-WY.zip
county-data/
 |-AL/
   |-COUNTY1.csv
   |-COUNTY2.csv
   |-...
   |-COUNTY68.csv
 |-AK/
   |-...
 |-.../
 |-WY/
   |-...

示例数据

这是非常简短的,但想象一下这样的事情:

州级数据 - 其中 51 个(约 200 列宽)

uid census_plot flood_zone
abc121 ACVB-1249575 R50
abc122 ACVB-1249575 R50
abc123 ACVB-1249575 R51
abc124 ACVB-1249599 R51
abc125 ACVB-1249599 R50
... ... ...

县级数据 - 数以千计(约 300 列宽)

uid 细分 tax_id
abc121 04021 罗兰高地 3t4g
abc122 04021 罗兰高地 3g444
abc123 04021 罗兰高地 09udd
... ... ... ...

所以我们将许多县级连接到一个州级,从而拥有一个聚合的、更完整的州级数据集。

然后我们汇总所有州,我们有一个国家级数据集。

期望的结果

我一次可以成功合并一个州(多个县合并到一个州)。我构建了一个管道来执行此操作,但该管道以单个 CountyData CSV 和单个 StateData CSV 开始。问题已经发展到可以加载 CountyData 和 StateData 的地步了。

换句话说:

#
# I need to find a way to generalize this flow to
# dynamically created COUNTY and STATE variables.
#

from apache_beam.dataframe.convert import to_pcollection
from apache_beam.dataframe.io import read_csv

COUNTY = "county-data/AL/*.csv"
STATE = "state-data/AL.zip"

def key_by_uid(elem):
    return (elem.uid, elem)

with beam.Pipeline() as p:
    county_df = p | read_csv(COUNTY)
    county_rows_keyed = to_pcollection(county_df) | beam.Map(key_by_uid)

    state_df = pd.read_csv(STATE, compression="zip")
    state_rows_keys = to_pcollection(state_df, pipeline=p) | beam.Map(key_by_uid)

    merged = ({ "state": state_rows_keys, "county": county_rows_keyed } ) | beam.CoGroupByKey() | beam.Map(merge_logic)

    merged | WriteToParquet()
  1. 从状态列表开始
  2. 按状态,生成文件模式到源数据
  3. 按状态加载并合并文件名
  4. 将每个州的输出扁平化为美国数据集。
  5. 写入 Parquet 文件。
with beam.Pipeline(options=pipeline_options) as p:

    merged_data = (
        p
        | beam.Create(cx.STATES)
        | "PathsKeyedByState" >> tx.PathsKeyedByState()
        # ('AL', {'county-data': 'gs://data/county-data/AL/COUNTY*.csv', 'state-data': 'gs://data/state-data/AL.zip'})
        | "MergeSourceDataByState" >> tx.MergeSourceDataByState()
        | "MergeAllStateData" >> beam.Flatten()
    )

    merged_data | "WriteParquet" >> tx.WriteParquet()

我遇到的问题是这样的:

我不确定您在这里进行的到底是哪种合并,但是构造此管道的一种方法可能是使用一个 DoFn,它将县数据作为文件名作为输入元素(即您会有县数据文件名的 PCollection),使用“正常”Python(例如 pandas)打开它,然后读取相关的州数据作为边输入来进行合并。

这是另一个答案。

一次读取一个状态数据并将它们展平,例如

state_dataframe = None
for state in STATES:
  df = p | read_csv('/path/to/state')
  df['state'] = state
  if state_dataframe is None:
    state_dataframe = df
  else:
    state_dataframe = state_dataframe.append(df)

与县数据类似。现在使用数据框操作加入他们。