如何收集光束变换的所有结果?
How to collect all results from a beam transform?
我有一个光束管道来处理一个相当大的文本文件。管道读取文本,并将行数据提取到字典中。我想将字典写入文本文件,但我只能写入字典键。
字典看起来像这样:
{'Site_number': '09427500', 'Date': '2019-08-09 10:30:00',
'Reservoir_storage': '584900'}
然而,我的result_data.txt文件中写的是:
Site_number
Date
Reservoir_storage
这是我要处理的文本示例:
# Data provided for site 09427500
# TS parameter Description
# 6385 00054 Reservoir storage, acre feet
#
# Data-value qualification codes included in this output:
# P Provisional data subject to revision.
#
agency_cd site_no datetime tz_cd 6385_00054 6385_00054_cd
5s 15s 20d 6s 14n 10s
USGS 09427500 2019-08-09 00:00 MST 580800 P
USGS 09427500 2019-08-09 00:15 MST 581100 P
USGS 09427500 2019-08-09 00:30 MST 581100 P
USGS 09427500 2019-08-09 00:45 MST 581300 P
USGS 09427500 2019-08-09 01:00 MST 581500 P
USGS 09427500 2019-08-09 01:15 MST 581700 P
这是我的代码:
import apache_beam as beam
import pandas as pd
from apache_beam.options.pipeline_options import PipelineOptions
from dateutil import parser
import os
class ExtractData(beam.DoFn):
def process(self, element):
line = element[1]
if not line.startswith('#'):
dict = {}
item = line.replace('\n', '').split('\t')
date_item = item[2]
try:
date = parser.parse(item[2]).strftime('%Y-%m-%d %H:%M:%S')
except:
date = date_item
dict['Site_number'] = item[1]
dict['Date'] = date
dict['Reservoir_storage'] = item[-2]
print(dict)
return dict
def run():
output = []
p = beam.Pipeline('DirectRunner')
data = ( p
| 'Read text' >> beam.io.ReadFromTextWithFilename('reservoir_data.txt')
| 'Process lines' >> beam.ParDo(ExtractData())
| 'Write' >> beam.io.textio.WriteToText('result_data.txt')
)
result = p.run()
result.wait_until_finish()
if __name__=="__main__":
run()
最终,我想要一个包含所有词典的列表。
我在写字典时做错了什么 file/what 我是不是没看懂?
我正在使用 python 3.6
这与此相似 and test。基本上,您需要 return 一个带有 return [dict]
或 yield dict
而不是 return dict
的迭代,它会起作用:
$ cat result_data.txt-00000-of-00001
{'Date': '2019-08-09 00:00:00', 'Reservoir_storage': u'2019-08-09 00:00', 'Site_number': u'09427500'}
{'Date': '2019-08-09 00:15:00', 'Reservoir_storage': u'2019-08-09 00:15', 'Site_number': u'09427500'}
{'Date': '2019-08-09 00:30:00', 'Reservoir_storage': u'2019-08-09 00:30', 'Site_number': u'09427500'}
{'Date': '2019-08-09 00:45:00', 'Reservoir_storage': u'2019-08-09 00:45', 'Site_number': u'09427500'}
{'Date': '2019-08-09 01:00:00', 'Reservoir_storage': u'2019-08-09 01:00', 'Site_number': u'09427500'}
{'Date': '2019-08-09 01:15:00', 'Reservoir_storage': u'2019-08-09 01:15', 'Site_number': u'09427500'}
我有一个光束管道来处理一个相当大的文本文件。管道读取文本,并将行数据提取到字典中。我想将字典写入文本文件,但我只能写入字典键。
字典看起来像这样:
{'Site_number': '09427500', 'Date': '2019-08-09 10:30:00',
'Reservoir_storage': '584900'}
然而,我的result_data.txt文件中写的是:
Site_number
Date
Reservoir_storage
这是我要处理的文本示例:
# Data provided for site 09427500
# TS parameter Description
# 6385 00054 Reservoir storage, acre feet
#
# Data-value qualification codes included in this output:
# P Provisional data subject to revision.
#
agency_cd site_no datetime tz_cd 6385_00054 6385_00054_cd
5s 15s 20d 6s 14n 10s
USGS 09427500 2019-08-09 00:00 MST 580800 P
USGS 09427500 2019-08-09 00:15 MST 581100 P
USGS 09427500 2019-08-09 00:30 MST 581100 P
USGS 09427500 2019-08-09 00:45 MST 581300 P
USGS 09427500 2019-08-09 01:00 MST 581500 P
USGS 09427500 2019-08-09 01:15 MST 581700 P
这是我的代码:
import apache_beam as beam
import pandas as pd
from apache_beam.options.pipeline_options import PipelineOptions
from dateutil import parser
import os
class ExtractData(beam.DoFn):
def process(self, element):
line = element[1]
if not line.startswith('#'):
dict = {}
item = line.replace('\n', '').split('\t')
date_item = item[2]
try:
date = parser.parse(item[2]).strftime('%Y-%m-%d %H:%M:%S')
except:
date = date_item
dict['Site_number'] = item[1]
dict['Date'] = date
dict['Reservoir_storage'] = item[-2]
print(dict)
return dict
def run():
output = []
p = beam.Pipeline('DirectRunner')
data = ( p
| 'Read text' >> beam.io.ReadFromTextWithFilename('reservoir_data.txt')
| 'Process lines' >> beam.ParDo(ExtractData())
| 'Write' >> beam.io.textio.WriteToText('result_data.txt')
)
result = p.run()
result.wait_until_finish()
if __name__=="__main__":
run()
最终,我想要一个包含所有词典的列表。 我在写字典时做错了什么 file/what 我是不是没看懂?
我正在使用 python 3.6
这与此相似return [dict]
或 yield dict
而不是 return dict
的迭代,它会起作用:
$ cat result_data.txt-00000-of-00001
{'Date': '2019-08-09 00:00:00', 'Reservoir_storage': u'2019-08-09 00:00', 'Site_number': u'09427500'}
{'Date': '2019-08-09 00:15:00', 'Reservoir_storage': u'2019-08-09 00:15', 'Site_number': u'09427500'}
{'Date': '2019-08-09 00:30:00', 'Reservoir_storage': u'2019-08-09 00:30', 'Site_number': u'09427500'}
{'Date': '2019-08-09 00:45:00', 'Reservoir_storage': u'2019-08-09 00:45', 'Site_number': u'09427500'}
{'Date': '2019-08-09 01:00:00', 'Reservoir_storage': u'2019-08-09 01:00', 'Site_number': u'09427500'}
{'Date': '2019-08-09 01:15:00', 'Reservoir_storage': u'2019-08-09 01:15', 'Site_number': u'09427500'}