如何收集光束变换的所有结果?

How to collect all results from a beam transform?

我有一个光束管道来处理一个相当大的文本文件。管道读取文本,并将行数据提取到字典中。我想将字典写入文本文件,但我只能写入字典键。

字典看起来像这样:

    {'Site_number': '09427500', 'Date': '2019-08-09 10:30:00', 
    'Reservoir_storage': '584900'}

然而,我的result_data.txt文件中写的是:

    Site_number
    Date
    Reservoir_storage

这是我要处理的文本示例:

    # Data provided for site 09427500
#            TS   parameter     Description
#          6385       00054     Reservoir storage, acre feet
#
# Data-value qualification codes included in this output:
#     P  Provisional data subject to revision.
# 
agency_cd   site_no datetime    tz_cd   6385_00054  6385_00054_cd
5s  15s 20d 6s  14n 10s
USGS    09427500    2019-08-09 00:00    MST 580800  P
USGS    09427500    2019-08-09 00:15    MST 581100  P
USGS    09427500    2019-08-09 00:30    MST 581100  P
USGS    09427500    2019-08-09 00:45    MST 581300  P
USGS    09427500    2019-08-09 01:00    MST 581500  P
USGS    09427500    2019-08-09 01:15    MST 581700  P

这是我的代码:

import apache_beam as beam
import pandas as pd
from apache_beam.options.pipeline_options import PipelineOptions
from dateutil import parser
import os

class ExtractData(beam.DoFn):

    def process(self, element):
        line = element[1]
        if not line.startswith('#'):
            dict = {}
            item = line.replace('\n', '').split('\t')
            date_item = item[2]
            try:
                 date = parser.parse(item[2]).strftime('%Y-%m-%d %H:%M:%S')
            except:
                date = date_item
            dict['Site_number'] = item[1]
            dict['Date'] = date
            dict['Reservoir_storage'] = item[-2]
            print(dict)
            return dict


def run():
    output = []
    p = beam.Pipeline('DirectRunner')
    data = ( p
        | 'Read text' >> beam.io.ReadFromTextWithFilename('reservoir_data.txt')
        | 'Process lines' >> beam.ParDo(ExtractData())
        | 'Write' >> beam.io.textio.WriteToText('result_data.txt')
    )

    result = p.run()
    result.wait_until_finish()


if __name__=="__main__":
    run()

最终,我想要一个包含所有词典的列表。 我在写字典时做错了什么 file/what 我是不是没看懂?

我正在使用 python 3.6

这与此相似 and test。基本上,您需要 return 一个带有 return [dict]yield dict 而不是 return dict 的迭代,它会起作用:

$ cat result_data.txt-00000-of-00001 
{'Date': '2019-08-09 00:00:00', 'Reservoir_storage': u'2019-08-09 00:00', 'Site_number': u'09427500'}
{'Date': '2019-08-09 00:15:00', 'Reservoir_storage': u'2019-08-09 00:15', 'Site_number': u'09427500'}
{'Date': '2019-08-09 00:30:00', 'Reservoir_storage': u'2019-08-09 00:30', 'Site_number': u'09427500'}
{'Date': '2019-08-09 00:45:00', 'Reservoir_storage': u'2019-08-09 00:45', 'Site_number': u'09427500'}
{'Date': '2019-08-09 01:00:00', 'Reservoir_storage': u'2019-08-09 01:00', 'Site_number': u'09427500'}
{'Date': '2019-08-09 01:15:00', 'Reservoir_storage': u'2019-08-09 01:15', 'Site_number': u'09427500'}