PySpark:在 RDD 中使用对象
PySpark: Using Object in RDD
我目前正在学习 Python 并想应用它 on/with Spark。
我有这个非常简单(而且没用)的脚本:
import sys
from pyspark import SparkContext
class MyClass:
def __init__(self, value):
self.v = str(value)
def addValue(self, value):
self.v += str(value)
def getValue(self):
return self.v
if __name__ == "__main__":
if len(sys.argv) != 1:
print("Usage CC")
exit(-1)
data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
sc = SparkContext(appName="WordCount")
d = sc.parallelize(data)
inClass = d.map(lambda input: (input, MyClass(input)))
reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue))
print(reduzed.collect())
用
执行时
spark-submit CustomClass.py
..出现以下错误(输出缩短):
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream
for obj in iterator:
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1728, in add_shuffle_key
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 415, in dumps
return pickle.dumps(obj, protocol)
PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:166)...
对我来说
PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed
似乎很重要。这意味着 class 个实例不能被序列化,对吧?
你知道如何解决这个问题吗?
感谢和问候
存在一些问题:
- 如果你把
MyClass
放在一个单独的文件中,它可以被腌制。这是 pickle 的许多 Python 用途的常见问题。这很容易通过移动 MyClass
和使用 from myclass import MyClass
来解决。通常 dill
可以解决这些问题(如 import dill as pickle
),但它在这里对我不起作用。
- 一旦这个问题解决了,你的 reduce 就不起作用了,因为调用
addValue
return None
(没有 return),而不是 [=12= 的实例].您需要将 addValue
更改为 return self
.
- 最后,
lambda
需要调用getValue
,所以应该有a.addValue(b.getValue())
一起:
myclass.py
class MyClass:
def __init__(self, value):
self.v = str(value)
def addValue(self, value):
self.v += str(value)
return self
def getValue(self):
return self.v
main.py
import sys
from pyspark import SparkContext
from myclass import MyClass
if __name__ == "__main__":
if len(sys.argv) != 1:
print("Usage CC")
exit(-1)
data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
sc = SparkContext(appName="WordCount")
d = sc.parallelize(data)
inClass = d.map(lambda input: (input, MyClass(input)))
reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue()))
print(reduzed.collect())
我目前正在学习 Python 并想应用它 on/with Spark。 我有这个非常简单(而且没用)的脚本:
import sys
from pyspark import SparkContext
class MyClass:
def __init__(self, value):
self.v = str(value)
def addValue(self, value):
self.v += str(value)
def getValue(self):
return self.v
if __name__ == "__main__":
if len(sys.argv) != 1:
print("Usage CC")
exit(-1)
data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
sc = SparkContext(appName="WordCount")
d = sc.parallelize(data)
inClass = d.map(lambda input: (input, MyClass(input)))
reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue))
print(reduzed.collect())
用
执行时spark-submit CustomClass.py
..出现以下错误(输出缩短):
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream
for obj in iterator:
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1728, in add_shuffle_key
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 415, in dumps
return pickle.dumps(obj, protocol)
PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:166)...
对我来说
PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed
似乎很重要。这意味着 class 个实例不能被序列化,对吧? 你知道如何解决这个问题吗?
感谢和问候
存在一些问题:
- 如果你把
MyClass
放在一个单独的文件中,它可以被腌制。这是 pickle 的许多 Python 用途的常见问题。这很容易通过移动MyClass
和使用from myclass import MyClass
来解决。通常dill
可以解决这些问题(如import dill as pickle
),但它在这里对我不起作用。 - 一旦这个问题解决了,你的 reduce 就不起作用了,因为调用
addValue
returnNone
(没有 return),而不是 [=12= 的实例].您需要将addValue
更改为 returnself
. - 最后,
lambda
需要调用getValue
,所以应该有a.addValue(b.getValue())
一起:
myclass.py
class MyClass:
def __init__(self, value):
self.v = str(value)
def addValue(self, value):
self.v += str(value)
return self
def getValue(self):
return self.v
main.py
import sys
from pyspark import SparkContext
from myclass import MyClass
if __name__ == "__main__":
if len(sys.argv) != 1:
print("Usage CC")
exit(-1)
data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
sc = SparkContext(appName="WordCount")
d = sc.parallelize(data)
inClass = d.map(lambda input: (input, MyClass(input)))
reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue()))
print(reduzed.collect())