PySpark - 如何合并忽略大小写的键
PySpark - how to merge key with case ignored
在字数统计的例子中,比如我在映射后有(python,1)
和(Python,2)
。如何通过执行以下操作将两者合并为一个 (python,3)
:
def combine(a, b):
if a[0].lower() == b[0].lower():
return (a[0], a[1] + b[1])
那我打电话给
(sc.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b)
.reduce(lambda a, b :combine(a, b)))
reduce
在这里不太适合(它需要一个额外的 map
到可以处理预期聚合的缓冲区)并且在地图内部简单地转换为小写更有意义:
from operator import add
rdd = sc.parallelize([
"Python", "python", "Haskell", "Clojure", "Scala", "scala"])
rdd.map(lambda word: (word.lower(), 1)).reduceByKey(add).collectAsMap()
## {'clojure': 1, 'haskell': 1, 'python': 2, 'scala': 2}
在字数统计的例子中,比如我在映射后有(python,1)
和(Python,2)
。如何通过执行以下操作将两者合并为一个 (python,3)
:
def combine(a, b):
if a[0].lower() == b[0].lower():
return (a[0], a[1] + b[1])
那我打电话给
(sc.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b)
.reduce(lambda a, b :combine(a, b)))
reduce
在这里不太适合(它需要一个额外的 map
到可以处理预期聚合的缓冲区)并且在地图内部简单地转换为小写更有意义:
from operator import add
rdd = sc.parallelize([
"Python", "python", "Haskell", "Clojure", "Scala", "scala"])
rdd.map(lambda word: (word.lower(), 1)).reduceByKey(add).collectAsMap()
## {'clojure': 1, 'haskell': 1, 'python': 2, 'scala': 2}