具有 python 的 apache beam 中 PCollection 中几个字段的最大值和最小值
Max and Min for several fields inside PCollection in apache beam with python
我正在通过 python SDK 使用 apache beam 并遇到以下问题:
我有一个包含大约 100 万个条目的 PCollection,PCollection 中的每个条目看起来像一个长度为 ~150 的二元组列表 [(key1,value1),(key2,value2),...]
。我需要在 PCollection 的所有条目中为每个键找到最大值和最小值,以便对值进行规范化。
理想情况下,获取带有元组列表的PCollection会很好[(key,max_value,min_value),...]
然后很容易进行归一化得到
[(key1,norm_value1),(key2,norm_value2),...]
,其中 norm_value = (value - min) / (max - min)
目前我只能对每个键单独进行手工操作,这不是很方便,也不可持续,所以任何建议都会有所帮助。
我决定试一试,使用自定义 CombineFn
函数来确定每个键的最小值和最大值。然后,使用 CoGroupByKey
将它们与输入数据连接起来,并应用所需的映射来规范化这些值。
"""Normalize PCollection values."""
import logging
import argparse
import sys
import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
# custom CombineFn that outputs min and max value
class MinMaxFn(beam.CombineFn):
# initialize min and max values (I assumed int type)
def create_accumulator(self):
return (sys.maxint, 0)
# update if current value is a new min or max
def add_input(self, min_max, input):
(current_min, current_max) = min_max
return min(current_min, input), max(current_max, input)
def merge_accumulators(self, accumulators):
return accumulators
def extract_output(self, min_max):
return min_max
def run(argv=None):
"""Main entry point; defines and runs the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument('--output',
dest='output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)
# create test data
pc = [('foo', 1), ('bar', 5), ('foo', 5), ('bar', 9), ('bar', 2)]
# first run through data to apply custom combineFn and determine min/max per key
minmax = pc | 'Determine Min Max' >> beam.CombinePerKey(MinMaxFn())
# group input data by key and append corresponding min and max
merged = (pc, minmax) | 'Join Pcollections' >> beam.CoGroupByKey()
# apply mapping to normalize values according to 'norm_value = (value - min) / (max - min)'
normalized = merged | 'Normalize values' >> beam.Map(lambda (a, (b, c)): (a, [float(val - c[0][0][0])/(c[0][0][1] -c[0][0][0]) for val in b]))
# write results to output file
normalized | 'Write results' >> WriteToText(known_args.output)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
代码段可以是 运行 和 python SCRIPT_NAME.py --output OUTPUT_FILENAME
。我的测试数据,按键分组,是:
('foo', [1, 5])
('bar', [5, 9, 2])
CombineFn 将 return 每个键的最小值和最大值:
('foo', [(1, 5)])
('bar', [(2, 9)])
join/cogroup按键操作的输出:
('foo', ([1, 5], [[(1, 5)]]))
('bar', ([5, 9, 2], [[(2, 9)]]))
标准化后:
('foo', [0.0, 1.0])
('bar', [0.42857142857142855, 1.0, 0.0])
这只是一个简单的测试,所以我确信它可以针对上述数据量进行优化,但它似乎可以作为一个起点。考虑到可能需要进一步考虑(即如果最小值 = 最大值,则避免除以零)
我正在通过 python SDK 使用 apache beam 并遇到以下问题:
我有一个包含大约 100 万个条目的 PCollection,PCollection 中的每个条目看起来像一个长度为 ~150 的二元组列表 [(key1,value1),(key2,value2),...]
。我需要在 PCollection 的所有条目中为每个键找到最大值和最小值,以便对值进行规范化。
理想情况下,获取带有元组列表的PCollection会很好[(key,max_value,min_value),...]
然后很容易进行归一化得到
[(key1,norm_value1),(key2,norm_value2),...]
,其中 norm_value = (value - min) / (max - min)
目前我只能对每个键单独进行手工操作,这不是很方便,也不可持续,所以任何建议都会有所帮助。
我决定试一试,使用自定义 CombineFn
函数来确定每个键的最小值和最大值。然后,使用 CoGroupByKey
将它们与输入数据连接起来,并应用所需的映射来规范化这些值。
"""Normalize PCollection values."""
import logging
import argparse
import sys
import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
# custom CombineFn that outputs min and max value
class MinMaxFn(beam.CombineFn):
# initialize min and max values (I assumed int type)
def create_accumulator(self):
return (sys.maxint, 0)
# update if current value is a new min or max
def add_input(self, min_max, input):
(current_min, current_max) = min_max
return min(current_min, input), max(current_max, input)
def merge_accumulators(self, accumulators):
return accumulators
def extract_output(self, min_max):
return min_max
def run(argv=None):
"""Main entry point; defines and runs the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument('--output',
dest='output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)
# create test data
pc = [('foo', 1), ('bar', 5), ('foo', 5), ('bar', 9), ('bar', 2)]
# first run through data to apply custom combineFn and determine min/max per key
minmax = pc | 'Determine Min Max' >> beam.CombinePerKey(MinMaxFn())
# group input data by key and append corresponding min and max
merged = (pc, minmax) | 'Join Pcollections' >> beam.CoGroupByKey()
# apply mapping to normalize values according to 'norm_value = (value - min) / (max - min)'
normalized = merged | 'Normalize values' >> beam.Map(lambda (a, (b, c)): (a, [float(val - c[0][0][0])/(c[0][0][1] -c[0][0][0]) for val in b]))
# write results to output file
normalized | 'Write results' >> WriteToText(known_args.output)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
代码段可以是 运行 和 python SCRIPT_NAME.py --output OUTPUT_FILENAME
。我的测试数据,按键分组,是:
('foo', [1, 5])
('bar', [5, 9, 2])
CombineFn 将 return 每个键的最小值和最大值:
('foo', [(1, 5)])
('bar', [(2, 9)])
join/cogroup按键操作的输出:
('foo', ([1, 5], [[(1, 5)]]))
('bar', ([5, 9, 2], [[(2, 9)]]))
标准化后:
('foo', [0.0, 1.0])
('bar', [0.42857142857142855, 1.0, 0.0])
这只是一个简单的测试,所以我确信它可以针对上述数据量进行优化,但它似乎可以作为一个起点。考虑到可能需要进一步考虑(即如果最小值 = 最大值,则避免除以零)