如何在火花流中减少两个键?
How to reduce by two keys in spark streaming?
我有来自 Kafka 消费者的以下类型的数据
(u'0:l1', ({u'partyField': u'0:n5m, u'attr1': u'ok'})
(u'0:l1', ({u'partyField': u'0:n8m, u'attr1': u'ok'})
(u'0:l1', ({u'partyField': u'0:n8m, u'attr1': u'ok'})
我想对此执行 reduceByKey
操作。目前,我得到以下输出
(u'0:l1', {u'partyField': u'0:n5m, u'attr1': u'ok'},
{u'partyField': u'0:n8m, u'attr1': u'ok'},
{u'partyField': u'0:n8m, u'attr1': u'ok'})
但我想要某种组合键,因为我想按另一个参数进行分组,该参数是值的一部分,即 partyField
我正在寻找一个与此类似的分组,即按键和 partyField
分组
(u'0:l1', ({u'partyField': u'0:n5m, u'attr1': u'ok'})
(u'0:l1', {u'partyField': u'0:n8m, u'attr1': u'ok'},
u'0:l1', {u'partyField': u'0:n8m, u'attr1': u'ok'})
如何在 spark 中执行此操作?
根据每条记录的 partyField
形成键并应用 reduceByKey
并从 reduced_rdd
.
中提取值
例如:
>>> in_rdd = sc.parallelize(a)
[('0:l1', {'partyField': '0:n5m', 'attr1': 'ok'}),
('0:l1', {'partyField': '0:n8m', 'attr1': 'ok'}),
('0:l1', {'partyField': '0:n8m', 'attr1': 'ok'})]
>>> key_rdd = in_rdd.map(lambda x : (x[1]['partyField'],x))
>>> reduced_rdd = key_rdd.reduceByKey(lambda acc, curr: acc + curr)
>>> final_rdd = reduced_rdd.map(lambda x: x[1])
>>> final_rdd.collect()
[('0:l1',{'partyField': '0:n8m', 'attr1': 'ok'},
'0:l1',{'partyField': '0:n8m', 'attr1': 'ok'}),
('0:l1',{'partyField': '0:n5m', 'attr1': 'ok'})]
希望对您有所帮助!
我有来自 Kafka 消费者的以下类型的数据
(u'0:l1', ({u'partyField': u'0:n5m, u'attr1': u'ok'})
(u'0:l1', ({u'partyField': u'0:n8m, u'attr1': u'ok'})
(u'0:l1', ({u'partyField': u'0:n8m, u'attr1': u'ok'})
我想对此执行 reduceByKey
操作。目前,我得到以下输出
(u'0:l1', {u'partyField': u'0:n5m, u'attr1': u'ok'},
{u'partyField': u'0:n8m, u'attr1': u'ok'},
{u'partyField': u'0:n8m, u'attr1': u'ok'})
但我想要某种组合键,因为我想按另一个参数进行分组,该参数是值的一部分,即 partyField
我正在寻找一个与此类似的分组,即按键和 partyField
(u'0:l1', ({u'partyField': u'0:n5m, u'attr1': u'ok'})
(u'0:l1', {u'partyField': u'0:n8m, u'attr1': u'ok'},
u'0:l1', {u'partyField': u'0:n8m, u'attr1': u'ok'})
如何在 spark 中执行此操作?
根据每条记录的 partyField
形成键并应用 reduceByKey
并从 reduced_rdd
.
例如:
>>> in_rdd = sc.parallelize(a)
[('0:l1', {'partyField': '0:n5m', 'attr1': 'ok'}),
('0:l1', {'partyField': '0:n8m', 'attr1': 'ok'}),
('0:l1', {'partyField': '0:n8m', 'attr1': 'ok'})]
>>> key_rdd = in_rdd.map(lambda x : (x[1]['partyField'],x))
>>> reduced_rdd = key_rdd.reduceByKey(lambda acc, curr: acc + curr)
>>> final_rdd = reduced_rdd.map(lambda x: x[1])
>>> final_rdd.collect()
[('0:l1',{'partyField': '0:n8m', 'attr1': 'ok'},
'0:l1',{'partyField': '0:n8m', 'attr1': 'ok'}),
('0:l1',{'partyField': '0:n5m', 'attr1': 'ok'})]
希望对您有所帮助!