我怎样才能在 PySpark 中获得不同的字典 RDD?
How can I get a distinct RDD of dicts in PySpark?
我有一个字典 RDD,我想得到一个仅包含不同元素的 RDD。但是,当我尝试调用
rdd.distinct()
PySpark 给我以下错误
TypeError: unhashable type: 'dict'
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/02/19 16:55:56 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 1776, in combineLocally
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
d[k] = comb(d[k], v) if k in d else creator(v)
TypeError: unhashable type: 'dict'
我在 dict 中确实有一个键可以用作不同的元素,但是文档没有提供任何关于如何解决这个问题的线索。
编辑:内容由字符串、字符串数组和数字字典组成
编辑 2: 字典示例...我希望具有相等 "data_fingerprint" 键的字典被认为是相等的:
{"id":"4eece341","data_fingerprint":"1707db7bddf011ad884d132bf80baf3c"}
谢谢
正如@zero323 在他的评论中指出的那样,您必须决定如何比较字典,因为它们不可散列。一种方法是对键进行排序(因为它们没有任何特定顺序),例如按字典顺序。然后创建以下形式的字符串:
def dict_to_string(dict):
...
return 'key1|value1|key2|value2...|keyn|valuen'
如果您嵌套了不可散列的对象,则必须递归执行此操作。
现在您可以将 RDD 转换为与字符串配对作为键(或它的某种哈希)
pairs = dictRDD.map(lambda d: (dict_to_string(d), d))
要得到你想要的东西,你只需要按休耕方式减少
distinctDicts = pairs.reduceByKey(lambda val1, val2: val1).values()
由于您的数据提供了一个唯一的密钥,您可以简单地执行以下操作:
(rdd
.keyBy(lambda d: d.get("data_fingerprint"))
.reduceByKey(lambda x, y: x)
.values())
Python 字典至少存在两个问题,这使得它们不适合进行哈希处理:
- 可变性——这使得任何散列变得棘手
- 键的任意顺序
前段时间有个PEP提议frozerdicts
(PEP 0416),最后被否决了
我有一个字典 RDD,我想得到一个仅包含不同元素的 RDD。但是,当我尝试调用
rdd.distinct()
PySpark 给我以下错误
TypeError: unhashable type: 'dict'
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/02/19 16:55:56 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 1776, in combineLocally
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
d[k] = comb(d[k], v) if k in d else creator(v)
TypeError: unhashable type: 'dict'
我在 dict 中确实有一个键可以用作不同的元素,但是文档没有提供任何关于如何解决这个问题的线索。
编辑:内容由字符串、字符串数组和数字字典组成
编辑 2: 字典示例...我希望具有相等 "data_fingerprint" 键的字典被认为是相等的:
{"id":"4eece341","data_fingerprint":"1707db7bddf011ad884d132bf80baf3c"}
谢谢
正如@zero323 在他的评论中指出的那样,您必须决定如何比较字典,因为它们不可散列。一种方法是对键进行排序(因为它们没有任何特定顺序),例如按字典顺序。然后创建以下形式的字符串:
def dict_to_string(dict):
...
return 'key1|value1|key2|value2...|keyn|valuen'
如果您嵌套了不可散列的对象,则必须递归执行此操作。
现在您可以将 RDD 转换为与字符串配对作为键(或它的某种哈希)
pairs = dictRDD.map(lambda d: (dict_to_string(d), d))
要得到你想要的东西,你只需要按休耕方式减少
distinctDicts = pairs.reduceByKey(lambda val1, val2: val1).values()
由于您的数据提供了一个唯一的密钥,您可以简单地执行以下操作:
(rdd
.keyBy(lambda d: d.get("data_fingerprint"))
.reduceByKey(lambda x, y: x)
.values())
Python 字典至少存在两个问题,这使得它们不适合进行哈希处理:
- 可变性——这使得任何散列变得棘手
- 键的任意顺序
前段时间有个PEP提议frozerdicts
(PEP 0416),最后被否决了