通过初始键拆分文件处理
Splitting file processing by initial keys
用例
我有一些 TB 的美国 属性 数据要合并。它分布在两种不同的文件格式和数千个文件中。源数据按地域划分。
我找不到将单个管道分支为多个独立处理流的方法。
这特别困难,因为 Dataframe
API 似乎不支持对文件名集合的 PTransform
。
详细背景
文件分布是这样的:
- StateData - 总共 51 个文件(美国各州 + DC)
- CountyData - ~2000 总文件(特定县,按州分组)
理想的流水线将分成数千个独立的处理步骤并在几分钟内完成。
- 1 -> 51(每个美国州 + DC 开始处理)
- 51 -> 数千(每个美国州然后产生一个合并县的过程,最后合并整个州)
目录结构是这样的:
state-data/
|-AL.zip
|-AK.zip
|-...
|-WY.zip
county-data/
|-AL/
|-COUNTY1.csv
|-COUNTY2.csv
|-...
|-COUNTY68.csv
|-AK/
|-...
|-.../
|-WY/
|-...
示例数据
这是非常简短的,但想象一下这样的事情:
州级数据 - 其中 51 个(约 200 列宽)
uid
census_plot
flood_zone
abc121
ACVB-1249575
R50
abc122
ACVB-1249575
R50
abc123
ACVB-1249575
R51
abc124
ACVB-1249599
R51
abc125
ACVB-1249599
R50
...
...
...
县级数据 - 数以千计(约 300 列宽)
uid
县
细分
tax_id
abc121
04021
罗兰高地
3t4g
abc122
04021
罗兰高地
3g444
abc123
04021
罗兰高地
09udd
...
...
...
...
所以我们将许多县级连接到一个州级,从而拥有一个聚合的、更完整的州级数据集。
然后我们汇总所有州,我们有一个国家级数据集。
期望的结果
我一次可以成功合并一个州(多个县合并到一个州)。我构建了一个管道来执行此操作,但该管道以单个 CountyData CSV 和单个 StateData CSV 开始。问题已经发展到可以加载 CountyData 和 StateData 的地步了。
换句话说:
#
# I need to find a way to generalize this flow to
# dynamically created COUNTY and STATE variables.
#
from apache_beam.dataframe.convert import to_pcollection
from apache_beam.dataframe.io import read_csv
COUNTY = "county-data/AL/*.csv"
STATE = "state-data/AL.zip"
def key_by_uid(elem):
return (elem.uid, elem)
with beam.Pipeline() as p:
county_df = p | read_csv(COUNTY)
county_rows_keyed = to_pcollection(county_df) | beam.Map(key_by_uid)
state_df = pd.read_csv(STATE, compression="zip")
state_rows_keys = to_pcollection(state_df, pipeline=p) | beam.Map(key_by_uid)
merged = ({ "state": state_rows_keys, "county": county_rows_keyed } ) | beam.CoGroupByKey() | beam.Map(merge_logic)
merged | WriteToParquet()
- 从状态列表开始
- 按状态,生成文件模式到源数据
- 按状态加载并合并文件名
- 将每个州的输出扁平化为美国数据集。
- 写入 Parquet 文件。
with beam.Pipeline(options=pipeline_options) as p:
merged_data = (
p
| beam.Create(cx.STATES)
| "PathsKeyedByState" >> tx.PathsKeyedByState()
# ('AL', {'county-data': 'gs://data/county-data/AL/COUNTY*.csv', 'state-data': 'gs://data/state-data/AL.zip'})
| "MergeSourceDataByState" >> tx.MergeSourceDataByState()
| "MergeAllStateData" >> beam.Flatten()
)
merged_data | "WriteParquet" >> tx.WriteParquet()
我遇到的问题是这样的:
- 我在每个州的字典中有两个文件模式。要访问这些,我需要使用
DoFn
来获取元素。
- 为了传达数据流的方式,我需要访问
Pipeline
,这是一个 PTransform
。例如:df = p | read_csv(...)
- 这些似乎是不相容的需求。
我不确定您在这里进行的到底是哪种合并,但是构造此管道的一种方法可能是使用一个 DoFn,它将县数据作为文件名作为输入元素(即您会有县数据文件名的 PCollection),使用“正常”Python(例如 pandas)打开它,然后读取相关的州数据作为边输入来进行合并。
这是另一个答案。
一次读取一个状态数据并将它们展平,例如
state_dataframe = None
for state in STATES:
df = p | read_csv('/path/to/state')
df['state'] = state
if state_dataframe is None:
state_dataframe = df
else:
state_dataframe = state_dataframe.append(df)
与县数据类似。现在使用数据框操作加入他们。
用例
我有一些 TB 的美国 属性 数据要合并。它分布在两种不同的文件格式和数千个文件中。源数据按地域划分。
我找不到将单个管道分支为多个独立处理流的方法。
这特别困难,因为 Dataframe
API 似乎不支持对文件名集合的 PTransform
。
详细背景
文件分布是这样的:
- StateData - 总共 51 个文件(美国各州 + DC)
- CountyData - ~2000 总文件(特定县,按州分组)
理想的流水线将分成数千个独立的处理步骤并在几分钟内完成。
- 1 -> 51(每个美国州 + DC 开始处理)
- 51 -> 数千(每个美国州然后产生一个合并县的过程,最后合并整个州)
目录结构是这样的:
state-data/
|-AL.zip
|-AK.zip
|-...
|-WY.zip
county-data/
|-AL/
|-COUNTY1.csv
|-COUNTY2.csv
|-...
|-COUNTY68.csv
|-AK/
|-...
|-.../
|-WY/
|-...
示例数据
这是非常简短的,但想象一下这样的事情:
州级数据 - 其中 51 个(约 200 列宽)
uid | census_plot | flood_zone |
---|---|---|
abc121 | ACVB-1249575 | R50 |
abc122 | ACVB-1249575 | R50 |
abc123 | ACVB-1249575 | R51 |
abc124 | ACVB-1249599 | R51 |
abc125 | ACVB-1249599 | R50 |
... | ... | ... |
县级数据 - 数以千计(约 300 列宽)
uid | 县 | 细分 | tax_id |
---|---|---|---|
abc121 | 04021 | 罗兰高地 | 3t4g |
abc122 | 04021 | 罗兰高地 | 3g444 |
abc123 | 04021 | 罗兰高地 | 09udd |
... | ... | ... | ... |
所以我们将许多县级连接到一个州级,从而拥有一个聚合的、更完整的州级数据集。
然后我们汇总所有州,我们有一个国家级数据集。
期望的结果
我一次可以成功合并一个州(多个县合并到一个州)。我构建了一个管道来执行此操作,但该管道以单个 CountyData CSV 和单个 StateData CSV 开始。问题已经发展到可以加载 CountyData 和 StateData 的地步了。
换句话说:
#
# I need to find a way to generalize this flow to
# dynamically created COUNTY and STATE variables.
#
from apache_beam.dataframe.convert import to_pcollection
from apache_beam.dataframe.io import read_csv
COUNTY = "county-data/AL/*.csv"
STATE = "state-data/AL.zip"
def key_by_uid(elem):
return (elem.uid, elem)
with beam.Pipeline() as p:
county_df = p | read_csv(COUNTY)
county_rows_keyed = to_pcollection(county_df) | beam.Map(key_by_uid)
state_df = pd.read_csv(STATE, compression="zip")
state_rows_keys = to_pcollection(state_df, pipeline=p) | beam.Map(key_by_uid)
merged = ({ "state": state_rows_keys, "county": county_rows_keyed } ) | beam.CoGroupByKey() | beam.Map(merge_logic)
merged | WriteToParquet()
- 从状态列表开始
- 按状态,生成文件模式到源数据
- 按状态加载并合并文件名
- 将每个州的输出扁平化为美国数据集。
- 写入 Parquet 文件。
with beam.Pipeline(options=pipeline_options) as p:
merged_data = (
p
| beam.Create(cx.STATES)
| "PathsKeyedByState" >> tx.PathsKeyedByState()
# ('AL', {'county-data': 'gs://data/county-data/AL/COUNTY*.csv', 'state-data': 'gs://data/state-data/AL.zip'})
| "MergeSourceDataByState" >> tx.MergeSourceDataByState()
| "MergeAllStateData" >> beam.Flatten()
)
merged_data | "WriteParquet" >> tx.WriteParquet()
我遇到的问题是这样的:
- 我在每个州的字典中有两个文件模式。要访问这些,我需要使用
DoFn
来获取元素。 - 为了传达数据流的方式,我需要访问
Pipeline
,这是一个PTransform
。例如:df = p | read_csv(...)
- 这些似乎是不相容的需求。
我不确定您在这里进行的到底是哪种合并,但是构造此管道的一种方法可能是使用一个 DoFn,它将县数据作为文件名作为输入元素(即您会有县数据文件名的 PCollection),使用“正常”Python(例如 pandas)打开它,然后读取相关的州数据作为边输入来进行合并。
这是另一个答案。
一次读取一个状态数据并将它们展平,例如
state_dataframe = None
for state in STATES:
df = p | read_csv('/path/to/state')
df['state'] = state
if state_dataframe is None:
state_dataframe = df
else:
state_dataframe = state_dataframe.append(df)
与县数据类似。现在使用数据框操作加入他们。