如何将单个 csv 文件转换为 apache beam 的多个 pcollection
how do you turn a single csv file into multiple pcollections for apache beam
我有一个 csv 文件,其中前几行作为 ID 和标签,其余几行是实际数据。将前几行与 map 函数共享以使用实际数据转换后续行的最佳方法是什么?总的来说,我正在做类似于 的事情,但我不只是在顶部添加标签,我还有一行额外的 ID。
数据看起来像这样:
-- ,id1 , id1 , id1 , id2 , id2 , id2
-- ,label,label,label,label,label,label
time1,data, data, data, data, data, data
time2,data, data, data, data, data, data
然后我想为每个唯一 ID 写入一条 id/time/dataobject 记录到 bigquery。
基本上我假设我需要一个中间管道步骤将文件转换为多个 pcollections,我可以让下一步根据顶行的值实际转换所有文件行。如果是这种情况,实现该目标的最佳方法是什么?如果没有,我还能将前几行的值提供给其他行项的映射函数吗?
一个可能的解决方案是修改上一个问题中的自定义源。否则,您可以对数据进行初始传递以将 headers 保存为主要处理步骤的辅助输入:
input = p | 'Read CSV file' >> ReadFromText("input.csv")
headers = input | 'Parse headers' >> beam.ParDo(ParseHeadersFn())
rows = input | 'Parse data rows' >> beam.ParDo(ParseRowsFn(), beam.pvalue.AsList(headers))
其中 ParseHeadersFn
检查以 --
开头的行是否符合 header 的条件,如果为真则丢弃第一个字段,因为它不需要:
class ParseHeadersFn(beam.DoFn):
"""ParDo to output only the headers"""
def process(self, element):
if '--' in element.split(',')[0]:
yield [','.join(element.split(',')[1:])]
然后,在ParseRowsFn
内,我们可以访问headers
侧输入:
class ParseRowsFn(beam.DoFn):
"""ParDo to process data rows according to header metadata"""
def process(self, element, headers):
if 'time1' in element.split(',')[0]:
for id in headers[0]:
print 'ids: ' + id
for label in headers[1]:
print 'labels: ' + label
请注意,我假设 id 行将出现在标签一之前,但由于 Dataflow 是一个分布式系统,这可能不是真的。做一些更严格的检查会更好。
如果我们的input.csv
是:
--,id1,id1,id1,id2,id2,id2
--,label1,label2,label3,label1,label2,label3
time1,data1,data2,data3,data4,data5,data6
time2,data7,data8,data9,data10,data11,data12
示例输出:
ids: id1 , id1 , id1 , id2 , id2 , id2
labels: label1,label2,label3,label1,label2,label3
使用的代码:script.py gist
ParseRowsFn
可以用 dict(zip(...))
修改以获得所需的输出,但我不确定我是否理解输出结构。你需要这样的东西吗?
id1,time1,data1,data2,data3
id1,time2,data7,data8,data9
id2,time1,data4,data5,data6
id2,time2,data10,data11,data12
如果是这种情况,我们可以使用此 answer 中的技巧来确定 ID 更改的位置并采取相应行动:
class ParseRowsFn(beam.DoFn):
"""ParDo to process data rows according to header metadata"""
def process(self, element, headers):
# changing ids as per https://whosebug.com/a/28242076/6121516
fields = element.split(',')
if '--' not in fields[0]:
ids = headers[0][0].split(',')
labels = headers[1][0].split(',')
id_changes = [i for i in range(1,len(ids)) if ids[i]!=ids[i-1]]
id_changes.append(len(ids))
for idx, change in enumerate(id_changes):
row = {'timestamp': fields[0], 'id': ids[change - 1]}
low = max(idx - 1, 0)
row.update(dict(zip(labels[low:change], fields[low+1:change+1])))
print row
yield [row]
示例输出:
{'timestamp': u'time1', u'label2': u'data2', u'label3': u'data3', 'id': u'id1', u'label1': u'data1'}
{'timestamp': u'time1', u'label2': u'data5', u'label3': u'data6', 'id': u'id2', u'label1': u'data4'}
{'timestamp': u'time2', u'label2': u'data8', u'label3': u'data9', 'id': u'id1', u'label1': u'data7'}
{'timestamp': u'time2', u'label2': u'data11', u'label3': u'data12', 'id': u'id2', u'label1': u'data10'}
使用的代码:output.py 同 gist
我有一个 csv 文件,其中前几行作为 ID 和标签,其余几行是实际数据。将前几行与 map 函数共享以使用实际数据转换后续行的最佳方法是什么?总的来说,我正在做类似于
数据看起来像这样:
-- ,id1 , id1 , id1 , id2 , id2 , id2
-- ,label,label,label,label,label,label
time1,data, data, data, data, data, data
time2,data, data, data, data, data, data
然后我想为每个唯一 ID 写入一条 id/time/dataobject 记录到 bigquery。
基本上我假设我需要一个中间管道步骤将文件转换为多个 pcollections,我可以让下一步根据顶行的值实际转换所有文件行。如果是这种情况,实现该目标的最佳方法是什么?如果没有,我还能将前几行的值提供给其他行项的映射函数吗?
一个可能的解决方案是修改上一个问题中的自定义源。否则,您可以对数据进行初始传递以将 headers 保存为主要处理步骤的辅助输入:
input = p | 'Read CSV file' >> ReadFromText("input.csv")
headers = input | 'Parse headers' >> beam.ParDo(ParseHeadersFn())
rows = input | 'Parse data rows' >> beam.ParDo(ParseRowsFn(), beam.pvalue.AsList(headers))
其中 ParseHeadersFn
检查以 --
开头的行是否符合 header 的条件,如果为真则丢弃第一个字段,因为它不需要:
class ParseHeadersFn(beam.DoFn):
"""ParDo to output only the headers"""
def process(self, element):
if '--' in element.split(',')[0]:
yield [','.join(element.split(',')[1:])]
然后,在ParseRowsFn
内,我们可以访问headers
侧输入:
class ParseRowsFn(beam.DoFn):
"""ParDo to process data rows according to header metadata"""
def process(self, element, headers):
if 'time1' in element.split(',')[0]:
for id in headers[0]:
print 'ids: ' + id
for label in headers[1]:
print 'labels: ' + label
请注意,我假设 id 行将出现在标签一之前,但由于 Dataflow 是一个分布式系统,这可能不是真的。做一些更严格的检查会更好。
如果我们的input.csv
是:
--,id1,id1,id1,id2,id2,id2
--,label1,label2,label3,label1,label2,label3
time1,data1,data2,data3,data4,data5,data6
time2,data7,data8,data9,data10,data11,data12
示例输出:
ids: id1 , id1 , id1 , id2 , id2 , id2
labels: label1,label2,label3,label1,label2,label3
使用的代码:script.py gist
ParseRowsFn
可以用 dict(zip(...))
修改以获得所需的输出,但我不确定我是否理解输出结构。你需要这样的东西吗?
id1,time1,data1,data2,data3
id1,time2,data7,data8,data9
id2,time1,data4,data5,data6
id2,time2,data10,data11,data12
如果是这种情况,我们可以使用此 answer 中的技巧来确定 ID 更改的位置并采取相应行动:
class ParseRowsFn(beam.DoFn):
"""ParDo to process data rows according to header metadata"""
def process(self, element, headers):
# changing ids as per https://whosebug.com/a/28242076/6121516
fields = element.split(',')
if '--' not in fields[0]:
ids = headers[0][0].split(',')
labels = headers[1][0].split(',')
id_changes = [i for i in range(1,len(ids)) if ids[i]!=ids[i-1]]
id_changes.append(len(ids))
for idx, change in enumerate(id_changes):
row = {'timestamp': fields[0], 'id': ids[change - 1]}
low = max(idx - 1, 0)
row.update(dict(zip(labels[low:change], fields[low+1:change+1])))
print row
yield [row]
示例输出:
{'timestamp': u'time1', u'label2': u'data2', u'label3': u'data3', 'id': u'id1', u'label1': u'data1'}
{'timestamp': u'time1', u'label2': u'data5', u'label3': u'data6', 'id': u'id2', u'label1': u'data4'}
{'timestamp': u'time2', u'label2': u'data8', u'label3': u'data9', 'id': u'id1', u'label1': u'data7'}
{'timestamp': u'time2', u'label2': u'data11', u'label3': u'data12', 'id': u'id2', u'label1': u'data10'}
使用的代码:output.py 同 gist