Spark:当键是不可散列的 numpy 数组时如何 "reduceByKey"?
Spark: How to "reduceByKey" when the keys are numpy arrays which are not hashable?
我有一个(键,值)元素的 RDD。键是 NumPy 数组。 NumPy 数组不可哈希,这会在我尝试执行 reduceByKey
操作时导致问题。
有没有办法为我的手动哈希函数提供 Spark 上下文?或者有没有其他方法可以解决这个问题(除了实际散列数组 "offline" 并将散列密钥传递给 Spark 之外)?
这是一个例子:
import numpy as np
from pyspark import SparkContext
sc = SparkContext()
data = np.array([[1,2,3],[4,5,6],[1,2,3],[4,5,6]])
rd = sc.parallelize(data).map(lambda x: (x,np.sum(x))).reduceByKey(lambda x,y: x+y)
rd.collect()
错误是:
An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
...
TypeError: unhashable type: 'numpy.ndarray'
最简单的解决方案是将其转换为可散列的对象。例如:
from operator import add
reduced = sc.parallelize(data).map(
lambda x: (tuple(x), x.sum())
).reduceByKey(add)
并在需要时将其转换回来。
Is there a way to supply the Spark context with my manual hash function
不是一个直截了当的。整个机制取决于事实对象实现了 __hash__
方法,并且 C 扩展不能被猴子修补。您可以尝试使用调度来覆盖 pyspark.rdd.portable_hash
,但我怀疑即使考虑转换成本,它是否值得。
我有一个(键,值)元素的 RDD。键是 NumPy 数组。 NumPy 数组不可哈希,这会在我尝试执行 reduceByKey
操作时导致问题。
有没有办法为我的手动哈希函数提供 Spark 上下文?或者有没有其他方法可以解决这个问题(除了实际散列数组 "offline" 并将散列密钥传递给 Spark 之外)?
这是一个例子:
import numpy as np
from pyspark import SparkContext
sc = SparkContext()
data = np.array([[1,2,3],[4,5,6],[1,2,3],[4,5,6]])
rd = sc.parallelize(data).map(lambda x: (x,np.sum(x))).reduceByKey(lambda x,y: x+y)
rd.collect()
错误是:
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
...
TypeError: unhashable type: 'numpy.ndarray'
最简单的解决方案是将其转换为可散列的对象。例如:
from operator import add
reduced = sc.parallelize(data).map(
lambda x: (tuple(x), x.sum())
).reduceByKey(add)
并在需要时将其转换回来。
Is there a way to supply the Spark context with my manual hash function
不是一个直截了当的。整个机制取决于事实对象实现了 __hash__
方法,并且 C 扩展不能被猴子修补。您可以尝试使用调度来覆盖 pyspark.rdd.portable_hash
,但我怀疑即使考虑转换成本,它是否值得。