如何让 Reducer 根据键类型发出
How to get the Reducer to emit according to key type
作为 问题的后续,我有一个 Mapper,它正在处理大量数据并发出 ID 号作为值为 1 的键。每个键都有两个部分,由一个分隔竖线分隔符,例如:
映射器发出:
a|abc 1
b|efg 1
a|cba 1
a|abc 1
b|dhh 1
b|dhh 1
我想要做的是让 Reducer 解析键,并且对于每个 'a' 类型的键,即 'a|abc',我希望 Reducer 只发出重复项,但是对于每一种其他类型(例如类型 'b',即 'b|abc'),我希望 Reducer 发出所有内容,即使值仅为 1.
所以上面的数据会产生:
a|abc 2
b|efg 1
b|dhh 2
在这种情况下,不会发出 'a|cba 1',因为它是 'a' 类型的键并且没有重复项。下面是我尝试过的代码,它几乎按预期工作,除了我得到 92 个额外的发射,其中键的类型为 'a' 并且计数为 1。注意:92 是根据我的 Reduce 任务的数量MapReduce 日志。
由于我只想要键类型 'a' 的重复项,我该如何修复 Reducer,以免我得到额外的 92 个值为 1 的键类型 'a' 发射?
import sys
import codecs
sys.stdout = codecs.getwriter('utf-8')(sys.stdout)
inData = codecs.getreader('utf-8')(sys.stdin)
(last_key, tot_cnt) = (None, 0)
for line in inData:
(key, val) = line.strip().split("\t")
if last_key != key:
k = key.split('|')
v_id = k[0]
if v_id == 'a':
if tot_cnt > 1:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
else:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
(last_key, tot_cnt) = (key, int(val))
else:
(last_key, tot_cnt) = (key, tot_cnt + int(val))
if last_key:
if v_id == 'a':
if tot_cnt > 1:
sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))
else:
sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))
以下是您的代码中的错误:
在全局级别声明v_id
,以便它随处可见。
更改此行:
(last_key, tot_cnt) = (None, 0)
收件人:
(last_key, tot_cnt, v_id) = (None, 0, None)
后续拆分应该在 last_key
而不是当前 key
。当当前键是 "b|dhh" 并且最后一个键是 "a|abc" 时,你应该得到 v_id
for "a|abc".
更改此代码:
if last_key != key:
k = key.split('|')
v_id = k[0]
if v_id == 'a':
if tot_cnt > 1:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
else:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
收件人:
if last_key != key:
if last_key != None:
k = last_key.split('|')
v_id = k[0]
if v_id == 'a':
if tot_cnt > 1:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
else:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
所以,修改后的 reducer 代码如下所示:
import sys
import codecs
sys.stdout = codecs.getwriter('utf-8')(sys.stdout)
inData = codecs.getreader('utf-8')(sys.stdin)
(last_key, tot_cnt, v_id) = (None, 0, None)
for line in inData:
(key, val) = line.strip().split("\t")
if last_key != key:
if last_key != None:
k = last_key.split('|')
v_id = k[0]
if v_id == 'a':
if tot_cnt > 1:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
else:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
(last_key, tot_cnt) = (key, int(val))
else:
(last_key, tot_cnt) = (key, tot_cnt + int(val))
if last_key:
if v_id == 'a':
if tot_cnt > 1:
sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))
else:
sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))
当我运行这个时,我得到了输出:
a|abc 2
b|dhh 2
b|efg 1
注:我不是Python专家。我觉得,你可以优化这段代码。因此,请检查脚本中是否存在任何极端情况和冗余检查。
作为
映射器发出:
a|abc 1
b|efg 1
a|cba 1
a|abc 1
b|dhh 1
b|dhh 1
我想要做的是让 Reducer 解析键,并且对于每个 'a' 类型的键,即 'a|abc',我希望 Reducer 只发出重复项,但是对于每一种其他类型(例如类型 'b',即 'b|abc'),我希望 Reducer 发出所有内容,即使值仅为 1.
所以上面的数据会产生:
a|abc 2
b|efg 1
b|dhh 2
在这种情况下,不会发出 'a|cba 1',因为它是 'a' 类型的键并且没有重复项。下面是我尝试过的代码,它几乎按预期工作,除了我得到 92 个额外的发射,其中键的类型为 'a' 并且计数为 1。注意:92 是根据我的 Reduce 任务的数量MapReduce 日志。
由于我只想要键类型 'a' 的重复项,我该如何修复 Reducer,以免我得到额外的 92 个值为 1 的键类型 'a' 发射?
import sys
import codecs
sys.stdout = codecs.getwriter('utf-8')(sys.stdout)
inData = codecs.getreader('utf-8')(sys.stdin)
(last_key, tot_cnt) = (None, 0)
for line in inData:
(key, val) = line.strip().split("\t")
if last_key != key:
k = key.split('|')
v_id = k[0]
if v_id == 'a':
if tot_cnt > 1:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
else:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
(last_key, tot_cnt) = (key, int(val))
else:
(last_key, tot_cnt) = (key, tot_cnt + int(val))
if last_key:
if v_id == 'a':
if tot_cnt > 1:
sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))
else:
sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))
以下是您的代码中的错误:
在全局级别声明
v_id
,以便它随处可见。更改此行:
(last_key, tot_cnt) = (None, 0)
收件人:
(last_key, tot_cnt, v_id) = (None, 0, None)
后续拆分应该在
last_key
而不是当前key
。当当前键是 "b|dhh" 并且最后一个键是 "a|abc" 时,你应该得到v_id
for "a|abc".更改此代码:
if last_key != key: k = key.split('|') v_id = k[0] if v_id == 'a': if tot_cnt > 1: sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt)) else: sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
收件人:
if last_key != key: if last_key != None: k = last_key.split('|') v_id = k[0] if v_id == 'a': if tot_cnt > 1: sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt)) else: sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
所以,修改后的 reducer 代码如下所示:
import sys
import codecs
sys.stdout = codecs.getwriter('utf-8')(sys.stdout)
inData = codecs.getreader('utf-8')(sys.stdin)
(last_key, tot_cnt, v_id) = (None, 0, None)
for line in inData:
(key, val) = line.strip().split("\t")
if last_key != key:
if last_key != None:
k = last_key.split('|')
v_id = k[0]
if v_id == 'a':
if tot_cnt > 1:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
else:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
(last_key, tot_cnt) = (key, int(val))
else:
(last_key, tot_cnt) = (key, tot_cnt + int(val))
if last_key:
if v_id == 'a':
if tot_cnt > 1:
sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))
else:
sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))
当我运行这个时,我得到了输出:
a|abc 2
b|dhh 2
b|efg 1
注:我不是Python专家。我觉得,你可以优化这段代码。因此,请检查脚本中是否存在任何极端情况和冗余检查。