Spark RuntimeError: uninitialized classmethod object
Spark RuntimeError: uninitialized classmethod object
我在Python写了一个非常简单的Spark代码:
import collections
Person = collections.namedtuple('Person', ['name', 'age', 'gender'])
a = sc.parallelize([['Barack Obama', 54, 'M'], ['Joe Biden', 74, 'M']])
a = a.map(lambda row: Person(*row))
print a.collect()
def func(row):
tmp = row._replace(name='Jack Rabbit')
return tmp
print a.map(func).collect()
我得到以下输出和错误:
[Person(name='Barack Obama', age=29, gender='M'), Person(name='Joe Biden', age=57, gender='M')]
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 49 in stage 11.0 failed 4 times, most recent failure: Lost task 49.3 in stage 11.0 (TID 618, 172.19.75.121): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/etc/spark-1.4.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/etc/spark-1.4.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/etc/spark-1.4.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<ipython-input-19-f0b4885784cb>", line 2, in func
File "<string>", line 32, in _replace
RuntimeError: uninitialized classmethod object
at org.apache.spark.api.python.PythonRDD$$anon.read(PythonRDD.scala:138)
at org.apache.spark.api.python.PythonRDD$$anon.<init>(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
但是,如果我 运行 下面的代码,我不会收到任何错误:
for row in a.collect():
func(row)
什么给了?
编辑:
SPARK-10542 引入了对命名元组序列化的支持。
原回答
为什么不起作用?因为 namedtuple
调用创建了一个 class 并且 Spark 中的 classes 没有被序列化为闭包的一部分。这意味着您必须创建一个单独的模块*并确保它在工作人员上可用:
txt = "\n".join(["import collections",
"Person = collections.namedtuple('Person', ['name', 'age', 'gender'])"])
with open("persons.py", "w") as fw:
fw.write(txt)
sc.addPyFile("persons.py") # Ship module to the worker nodes
接下来您可以简单地导入,一切都应该按预期工作:
import persons
a.map(func).collect()
旁注前导下划线是有原因的。
* 它可以像这样 a.map(lambda row: collections.namedtuple('Person', ['name', 'age', 'gender'])(*row))
或通过在 mapPartitions
中定义 Person
动态完成,但它既不美观也不高效。
我在Python写了一个非常简单的Spark代码:
import collections
Person = collections.namedtuple('Person', ['name', 'age', 'gender'])
a = sc.parallelize([['Barack Obama', 54, 'M'], ['Joe Biden', 74, 'M']])
a = a.map(lambda row: Person(*row))
print a.collect()
def func(row):
tmp = row._replace(name='Jack Rabbit')
return tmp
print a.map(func).collect()
我得到以下输出和错误:
[Person(name='Barack Obama', age=29, gender='M'), Person(name='Joe Biden', age=57, gender='M')]
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 49 in stage 11.0 failed 4 times, most recent failure: Lost task 49.3 in stage 11.0 (TID 618, 172.19.75.121): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/etc/spark-1.4.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/etc/spark-1.4.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/etc/spark-1.4.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<ipython-input-19-f0b4885784cb>", line 2, in func
File "<string>", line 32, in _replace
RuntimeError: uninitialized classmethod object
at org.apache.spark.api.python.PythonRDD$$anon.read(PythonRDD.scala:138)
at org.apache.spark.api.python.PythonRDD$$anon.<init>(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
但是,如果我 运行 下面的代码,我不会收到任何错误:
for row in a.collect():
func(row)
什么给了?
编辑:
SPARK-10542 引入了对命名元组序列化的支持。
原回答
为什么不起作用?因为 namedtuple
调用创建了一个 class 并且 Spark 中的 classes 没有被序列化为闭包的一部分。这意味着您必须创建一个单独的模块*并确保它在工作人员上可用:
txt = "\n".join(["import collections",
"Person = collections.namedtuple('Person', ['name', 'age', 'gender'])"])
with open("persons.py", "w") as fw:
fw.write(txt)
sc.addPyFile("persons.py") # Ship module to the worker nodes
接下来您可以简单地导入,一切都应该按预期工作:
import persons
a.map(func).collect()
旁注前导下划线是有原因的。
* 它可以像这样 a.map(lambda row: collections.namedtuple('Person', ['name', 'age', 'gender'])(*row))
或通过在 mapPartitions
中定义 Person
动态完成,但它既不美观也不高效。