在 apache beam 中聚合 window 中的数据
Aggregating data in a window in apache beam
我正在接收一个复杂的嵌套 JSON 对象流作为我对管道的输入。
我的目标是创建小批量以提供给另一个 pubsub
主题以进行下游处理。我正在努力使用 beam.beam.GroupByKey()
函数 - 从我读到的内容来看,这是尝试聚合的正确方法。
一个简化的例子,输入事件:
{ data:['a', 'b', 'c'], url: 'websiteA.com' }
{ data:['a', 'b', 'c'], url: 'websiteA.com' }
{ data:['a'], url: 'websiteB.com' }
我正在尝试创建以下内容:
{
'websiteA.com': {a:2, b:2, c:2},
'websiteB.com': {a:1},
}
我的问题在于尝试对最简单的元组抛出的任何内容进行分组 ValueError: too many values to unpack
。
我可以 运行 分两步完成,但根据我的阅读,使用 beam.GroupByKey()
很昂贵,因此应该尽量减少。
根据@Cubez 的回答进行编辑。
这是我的组合函数,它似乎工作了一半:(
class MyCustomCombiner(beam.CombineFn):
def create_accumulator(self):
logging.info('accum_created') #Logs OK!
return {}
def add_input(self, counts, input):
counts = {}
for i in input:
counts[i] = 1
logging.info(counts) #Logs OK!
return counts
def merge_accumulators(self, accumulators):
logging.info('accumcalled') #never logs anything
c = collections.Counter()
for d in accumulators:
c.update(d)
logging.info('accum: %s', accumulators) #never logs anything
return dict(c)
def extract_output(self, counts):
logging.info('Counts2: %s', counts) #never logs anything
return counts
好像过去了add_input
什么都没有被调用?
添加管道代码:
with beam.Pipeline(argv=pipeline_args) as p:
raw_loads_dict = (p
| 'ReadPubsubLoads' >> ReadFromPubSub(topic=PUBSUB_TOPIC_NAME).with_output_types(bytes)
| 'JSONParse' >> beam.Map(lambda x: json.loads(x))
)
fixed_window_events = (raw_loads_dict
| 'KeyOnUrl' >> beam.Map(lambda x: (x['client_id'], x['events']))
| '1MinWindow' >> beam.WindowInto(window.FixedWindows(60))
| 'CustomCombine' >> beam.CombinePerKey(MyCustomCombiner())
)
fixed_window_events | 'LogResults2' >> beam.ParDo(LogResults())
这是需要使用combiners. These are transforms that are used to aggregate or combine collections across multiple workers. As the doc says, CombineFns work by reading in your element (beam.CombineFn.add_input), merging multiple elements (beam.CombineFn.merge_accumulators), then finally outputting the final combined value (beam.CombineFn.extract_output). See the Python docs for the parent class here的完美示例。
例如,要创建一个输出一组数字的平均值的组合器,如下所示:
class AverageFn(beam.CombineFn):
def create_accumulator(self):
return (0.0, 0)
def add_input(self, sum_count, input):
(sum, count) = sum_count
return sum + input, count + 1
def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
def extract_output(self, sum_count):
(sum, count) = sum_count
return sum / count if count else float('NaN')
pc = ...
average = pc | beam.CombineGlobally(AverageFn())
对于您的用例,我建议如下:
values = [
{'data':['a', 'b', 'c'], 'url': 'websiteA.com'},
{'data':['a', 'b', 'c'], 'url': 'websiteA.com'},
{'data':['a'], 'url': 'websiteB.com'}
]
# This counts the number of elements that are the same.
def combine(counts):
# A counter is a dictionary from keys to the number of times it has
# seen that particular key.
c = collections.Counter()
for d in counts:
c.update(d)
return dict(c)
with beam.Pipeline(options=pipeline_options) as p:
pc = (p
# You should replace this step with reading data from your
# source and transforming it to the proper format for below.
| 'create' >> beam.Create(values)
# This step transforms the dictionary to a tuple. For this
# example it returns:
# [ ('url': 'websiteA.com', 'data':['a', 'b', 'c']),
# ('url': 'websiteA.com', 'data':['a', 'b', 'c']),
# ('url': 'websiteB.com', 'data':['a'])]
| 'url as key' >> beam.Map(lambda x: (x['url'], x['data']))
# This is the magic that combines all elements with the same
# URL and outputs a count based on the keys in 'data'.
# This returns the elements:
# [ ('url': 'websiteA.com', {'a': 2, 'b': 2, 'c': 2}),
# ('url': 'websiteB.com', {'a': 1})]
| 'combine' >> beam.CombinePerKey(combine))
# Do something with pc
new_pc = pc | ...
我正在接收一个复杂的嵌套 JSON 对象流作为我对管道的输入。
我的目标是创建小批量以提供给另一个 pubsub
主题以进行下游处理。我正在努力使用 beam.beam.GroupByKey()
函数 - 从我读到的内容来看,这是尝试聚合的正确方法。
一个简化的例子,输入事件:
{ data:['a', 'b', 'c'], url: 'websiteA.com' }
{ data:['a', 'b', 'c'], url: 'websiteA.com' }
{ data:['a'], url: 'websiteB.com' }
我正在尝试创建以下内容:
{
'websiteA.com': {a:2, b:2, c:2},
'websiteB.com': {a:1},
}
我的问题在于尝试对最简单的元组抛出的任何内容进行分组 ValueError: too many values to unpack
。
我可以 运行 分两步完成,但根据我的阅读,使用 beam.GroupByKey()
很昂贵,因此应该尽量减少。
根据@Cubez 的回答进行编辑。
这是我的组合函数,它似乎工作了一半:(
class MyCustomCombiner(beam.CombineFn):
def create_accumulator(self):
logging.info('accum_created') #Logs OK!
return {}
def add_input(self, counts, input):
counts = {}
for i in input:
counts[i] = 1
logging.info(counts) #Logs OK!
return counts
def merge_accumulators(self, accumulators):
logging.info('accumcalled') #never logs anything
c = collections.Counter()
for d in accumulators:
c.update(d)
logging.info('accum: %s', accumulators) #never logs anything
return dict(c)
def extract_output(self, counts):
logging.info('Counts2: %s', counts) #never logs anything
return counts
好像过去了add_input
什么都没有被调用?
添加管道代码:
with beam.Pipeline(argv=pipeline_args) as p:
raw_loads_dict = (p
| 'ReadPubsubLoads' >> ReadFromPubSub(topic=PUBSUB_TOPIC_NAME).with_output_types(bytes)
| 'JSONParse' >> beam.Map(lambda x: json.loads(x))
)
fixed_window_events = (raw_loads_dict
| 'KeyOnUrl' >> beam.Map(lambda x: (x['client_id'], x['events']))
| '1MinWindow' >> beam.WindowInto(window.FixedWindows(60))
| 'CustomCombine' >> beam.CombinePerKey(MyCustomCombiner())
)
fixed_window_events | 'LogResults2' >> beam.ParDo(LogResults())
这是需要使用combiners. These are transforms that are used to aggregate or combine collections across multiple workers. As the doc says, CombineFns work by reading in your element (beam.CombineFn.add_input), merging multiple elements (beam.CombineFn.merge_accumulators), then finally outputting the final combined value (beam.CombineFn.extract_output). See the Python docs for the parent class here的完美示例。
例如,要创建一个输出一组数字的平均值的组合器,如下所示:
class AverageFn(beam.CombineFn):
def create_accumulator(self):
return (0.0, 0)
def add_input(self, sum_count, input):
(sum, count) = sum_count
return sum + input, count + 1
def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
def extract_output(self, sum_count):
(sum, count) = sum_count
return sum / count if count else float('NaN')
pc = ...
average = pc | beam.CombineGlobally(AverageFn())
对于您的用例,我建议如下:
values = [
{'data':['a', 'b', 'c'], 'url': 'websiteA.com'},
{'data':['a', 'b', 'c'], 'url': 'websiteA.com'},
{'data':['a'], 'url': 'websiteB.com'}
]
# This counts the number of elements that are the same.
def combine(counts):
# A counter is a dictionary from keys to the number of times it has
# seen that particular key.
c = collections.Counter()
for d in counts:
c.update(d)
return dict(c)
with beam.Pipeline(options=pipeline_options) as p:
pc = (p
# You should replace this step with reading data from your
# source and transforming it to the proper format for below.
| 'create' >> beam.Create(values)
# This step transforms the dictionary to a tuple. For this
# example it returns:
# [ ('url': 'websiteA.com', 'data':['a', 'b', 'c']),
# ('url': 'websiteA.com', 'data':['a', 'b', 'c']),
# ('url': 'websiteB.com', 'data':['a'])]
| 'url as key' >> beam.Map(lambda x: (x['url'], x['data']))
# This is the magic that combines all elements with the same
# URL and outputs a count based on the keys in 'data'.
# This returns the elements:
# [ ('url': 'websiteA.com', {'a': 2, 'b': 2, 'c': 2}),
# ('url': 'websiteB.com', {'a': 1})]
| 'combine' >> beam.CombinePerKey(combine))
# Do something with pc
new_pc = pc | ...