如何提前终止 MrJob reducer?
How to prematurely terminate MrJob reducer?
我想使用 MapReduce 为满足某些条件的稀有实体过滤庞大的数据集。一旦 reducer 违反标准,我可以通过终止 reducer 来大大加快速度,因为它们将计算我不感兴趣的实体。
举个例子,假设我有一个包含数十亿篇文章的语料库,我想 return 仅包含少于 100 个单词的文章。绝大多数文章的字数都超过 100,000,因此我可以通过在达到停止标准 (word_count >100) 时终止 reducer 来跳过大部分工作。
这不会终止 reducer,但会阻止它接收任何新作业。它通过将特征计数维护为 class 字典来工作:
from mrjob.job import MRJob
class Mr_Count_Words(MRJob):
feature_counts = {}
def mapper(self, _, line):
...
然后,你可以在某个地方计算特征并检查字典以查看你是否已经收敛:
try:
self.feature_counts[feature_name] += 1
except KeyError:
self.feature_counts[feature_name] = 1
if self.feature_counts[feature_name] > feature_thresh:
return None
else:
yield ('feature_name', 1)
我想使用 MapReduce 为满足某些条件的稀有实体过滤庞大的数据集。一旦 reducer 违反标准,我可以通过终止 reducer 来大大加快速度,因为它们将计算我不感兴趣的实体。
举个例子,假设我有一个包含数十亿篇文章的语料库,我想 return 仅包含少于 100 个单词的文章。绝大多数文章的字数都超过 100,000,因此我可以通过在达到停止标准 (word_count >100) 时终止 reducer 来跳过大部分工作。
这不会终止 reducer,但会阻止它接收任何新作业。它通过将特征计数维护为 class 字典来工作:
from mrjob.job import MRJob
class Mr_Count_Words(MRJob):
feature_counts = {}
def mapper(self, _, line):
...
然后,你可以在某个地方计算特征并检查字典以查看你是否已经收敛:
try:
self.feature_counts[feature_name] += 1
except KeyError:
self.feature_counts[feature_name] = 1
if self.feature_counts[feature_name] > feature_thresh:
return None
else:
yield ('feature_name', 1)