在 GroupByKey 之后减少 PCollection
Reducing PCollection after GroupByKey
我正在尝试根据交易数据生成简单的客户摘要。例如,给定一个目标交易类型,发生了多少笔交易,总金额是多少?
原始输入示例:
custid desc amount
111 coffee 3.50
111 grocery 23.00
333 coffee 4.00
222 gas station 32.00
222 gas station 55.50
333 coffee 3.00
所需输出示例:
custid nbr_coffee amt_coffee nbr_gas_station amt_gas_station
111 1 3.50 0 0.00
222 0 0 2 87.50
333 2 7.00 0 0
我的目标运行器将是 Dataflow(但目前使用 DirectRunner 进行测试)。
这是我所拥有的代码片段:
def categorize_coffee(transaction):
if transaction['trx_desc'] == 'coffee':
transaction['coffee'] = True
else:
transaction['coffee'] = False
return transaction
def categorize_gas_station(transaction):
if transaction['trx_desc'] == 'gas station':
transaction['gas_station'] = True
else:
transaction['gas_station'] = False
return transaction
def summarize_coffee(grouping):
key, values = grouping
values = list(values)
nbr = 0
amt = 0
for d in values:
if d['coffee'] == True:
nbr+=1
amt+=d['amount']
ret_val = {}
ret_val['cust'] = d['cust']
ret_val['nbr_coffee'] = nbr
ret_val['amt_coffee'] = amt
return ret_val
def summarize_gas_station(grouping):
key, values = grouping
values = list(values)
nbr = 0
amt = 0
for d in values:
if d['gas_station'] == True:
nbr += 1
amt += d['amount']
ret_val = {}
ret_val['cust'] = d['cust']
ret_val['nbr_gas_station'] = nbr
ret_val['amt_gas_station'] = amt
return ret_val
def create_dict(row):
vars = row.split(',')
return {'cust': vars[0], 'trx_desc': str(vars[1]), 'amount': float(vars[2])}
with beam.Pipeline(options=pipeline_options) as p:
categorized_trx = (
p | 'get data' >> beam.io.ReadFromText('./test.csv')
| beam.Map(create_dict)
| beam.Map(categorize_coffee)
| beam.Map(categorize_gas_station)
| beam.Map(lambda trx: (trx['cust'], trx))
| beam.GroupByKey()
)
coffee_trx = (categorized_trx | beam.Map(summarize_coffee))
gas_station_trx = (categorized_trx | beam.Map(summarize_gas_station))
result = (coffee_trx, gas_station_trx) | beam.Flatten()
现在的实际结果是:
{'amt_coffee': 7.0, 'cust': u'333', 'nbr_coffee': 2}
{'amt_coffee': 0, 'cust': u'222', 'nbr_coffee': 0}
{'amt_coffee': 3.5, 'cust': u'111', 'nbr_coffee': 1}
{'nbr_gas_station': 0, 'cust': u'333', 'amt_gas_station': 0}
{'nbr_gas_station': 2, 'cust': u'222', 'amt_gas_station': 87.5}
{'nbr_gas_station': 0, 'cust': u'111', 'amt_gas_station': 0}
没有像我预期的那样展平或连接。我是 Beam 的新手 - 不确定我是否理解如何正确解决这个问题,所以希望能提供一些见解。
Beam 提供 Combine transform that should allow your to combine elements of a PCollection. Looks like for your use-case you can use Combine.perKey() to combine keyed elements of a PCollection based on the key. As the combine function, you can either provide a function or implement CombineFn.
这应该有效:
...
def summarize_coffee(grouping):
...
return (d['cust'], ret_val)
def summarize_gas_station(grouping):
...
return (d['cust'], ret_val)
...
def processJoin(row):
(customer, data) = row
coffee_trx=data['coffee_trx']
gas_station_trx=data['gas_station_trx']
return (customer, coffee_trx, gas_station_trx)
result = ({coffee_trx: coffee_trx, gas_station_trx: gas_station_trx}
| 'Group' >> beam.CoGroupByKey()
| 'Reshape' >> beam.Map(processJoin)
| 'Unwind' >> beam.FlatMap(lambda x: x)
)
我正在尝试根据交易数据生成简单的客户摘要。例如,给定一个目标交易类型,发生了多少笔交易,总金额是多少?
原始输入示例:
custid desc amount
111 coffee 3.50
111 grocery 23.00
333 coffee 4.00
222 gas station 32.00
222 gas station 55.50
333 coffee 3.00
所需输出示例:
custid nbr_coffee amt_coffee nbr_gas_station amt_gas_station
111 1 3.50 0 0.00
222 0 0 2 87.50
333 2 7.00 0 0
我的目标运行器将是 Dataflow(但目前使用 DirectRunner 进行测试)。
这是我所拥有的代码片段:
def categorize_coffee(transaction):
if transaction['trx_desc'] == 'coffee':
transaction['coffee'] = True
else:
transaction['coffee'] = False
return transaction
def categorize_gas_station(transaction):
if transaction['trx_desc'] == 'gas station':
transaction['gas_station'] = True
else:
transaction['gas_station'] = False
return transaction
def summarize_coffee(grouping):
key, values = grouping
values = list(values)
nbr = 0
amt = 0
for d in values:
if d['coffee'] == True:
nbr+=1
amt+=d['amount']
ret_val = {}
ret_val['cust'] = d['cust']
ret_val['nbr_coffee'] = nbr
ret_val['amt_coffee'] = amt
return ret_val
def summarize_gas_station(grouping):
key, values = grouping
values = list(values)
nbr = 0
amt = 0
for d in values:
if d['gas_station'] == True:
nbr += 1
amt += d['amount']
ret_val = {}
ret_val['cust'] = d['cust']
ret_val['nbr_gas_station'] = nbr
ret_val['amt_gas_station'] = amt
return ret_val
def create_dict(row):
vars = row.split(',')
return {'cust': vars[0], 'trx_desc': str(vars[1]), 'amount': float(vars[2])}
with beam.Pipeline(options=pipeline_options) as p:
categorized_trx = (
p | 'get data' >> beam.io.ReadFromText('./test.csv')
| beam.Map(create_dict)
| beam.Map(categorize_coffee)
| beam.Map(categorize_gas_station)
| beam.Map(lambda trx: (trx['cust'], trx))
| beam.GroupByKey()
)
coffee_trx = (categorized_trx | beam.Map(summarize_coffee))
gas_station_trx = (categorized_trx | beam.Map(summarize_gas_station))
result = (coffee_trx, gas_station_trx) | beam.Flatten()
现在的实际结果是:
{'amt_coffee': 7.0, 'cust': u'333', 'nbr_coffee': 2}
{'amt_coffee': 0, 'cust': u'222', 'nbr_coffee': 0}
{'amt_coffee': 3.5, 'cust': u'111', 'nbr_coffee': 1}
{'nbr_gas_station': 0, 'cust': u'333', 'amt_gas_station': 0}
{'nbr_gas_station': 2, 'cust': u'222', 'amt_gas_station': 87.5}
{'nbr_gas_station': 0, 'cust': u'111', 'amt_gas_station': 0}
没有像我预期的那样展平或连接。我是 Beam 的新手 - 不确定我是否理解如何正确解决这个问题,所以希望能提供一些见解。
Beam 提供 Combine transform that should allow your to combine elements of a PCollection. Looks like for your use-case you can use Combine.perKey() to combine keyed elements of a PCollection based on the key. As the combine function, you can either provide a function or implement CombineFn.
这应该有效:
...
def summarize_coffee(grouping):
...
return (d['cust'], ret_val)
def summarize_gas_station(grouping):
...
return (d['cust'], ret_val)
...
def processJoin(row):
(customer, data) = row
coffee_trx=data['coffee_trx']
gas_station_trx=data['gas_station_trx']
return (customer, coffee_trx, gas_station_trx)
result = ({coffee_trx: coffee_trx, gas_station_trx: gas_station_trx}
| 'Group' >> beam.CoGroupByKey()
| 'Reshape' >> beam.Map(processJoin)
| 'Unwind' >> beam.FlatMap(lambda x: x)
)