combineByKey 失败
combineByKey fails
我正在复制并粘贴 O'Reilly Learning Spark 教科书中的确切代码,但出现错误:org.apache.spark.SparkException:作业因阶段失败而中止
我试图理解这段代码的作用,但无法理解它,因为它不会 运行:
nums = sc.parallelize([1, 2, 3, 4])
sumCount = nums.combineByKey((lambda x: (x,1)),
(lambda x, y: (x[0] + y, x[1] + 1)),
(lambda x, y: (x[0] + y[0], x[1] + y[1])))
sumCount.map(lambda key, xy: (key, xy[0]/xy[1])).collectAsMap()
下面是完整的错误,有什么见解吗?
Job aborted due to stage failure: Task 3 in stage 26.0 failed 1 times, most recent failure: Lost task 3.0 in stage 26.0 (TID 73, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 480, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 470, in process
out_iter = func(split_index, iterator)
File "/databricks/spark/python/pyspark/rdd.py", line 2543, in pipeline_func
return func(split, prev_func(split, iterator))
File "/databricks/spark/python/pyspark/rdd.py", line 353, in func
return f(iterator)
File "/databricks/spark/python/pyspark/rdd.py", line 1905, in combineLocally
merger.mergeValues(iterator)
File "/databricks/spark/python/pyspark/shuffle.py", line 238, in mergeValues
for k, v in iterator:
TypeError: 'int' object is not iterable
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:514)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:650)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:633)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:468)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
好吧好吧
假设上面的nums不是很好,因为它不是(K,V)元组,那么假设代码如下:
data = sc.parallelize( [(0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)] )
sumCount = data.combineByKey(lambda value: (value, 1),
lambda x, value: (x[0] + value, x[1] + 1),
lambda x, y: (x[0] + y[0], x[1] + y[1]))
averageByKey = sumCount.map(lambda (label, (value_sum, count)): (label, value_sum / count))
print averageByKey.collectAsMap()
在 Spark 下,使用 python2 (pyspark),上面的代码 运行 没问题。
在 Spark 下,使用 python3 (pyspark),上面的代码在以下位置生成错误:
averageByKey = sumCount.map(lambda (label, (value_sum, count)): (label, value_sum / count))
https://www.python.org/dev/peps/pep-3113/ 解释了为什么这个功能 "tuple parameter unpacking" 在 Python 3 中被删除。这似乎让我有些失望。
最简单的解决方法是将上面的代码在线传入https://www.pythonconverter.com/和运行代码转换器。这是:
data = sc.parallelize( [(0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)] )
sumCount = data.combineByKey(lambda value: (value, 1),
lambda x, value: (x[0] + value, x[1] + 1),
lambda x, y: (x[0] + y[0], x[1] + y[1]))
averageByKey = sumCount.map(lambda label_value_sum_count: (label_value_sum_count[0], label_value_sum_count[1][0] / label_value_sum_count[1][1]))
print(averageByKey.collectAsMap())
returns 正确:
{0: 3.0, 1: 10.0}
averageByKey 现在有了不同的声明。您需要研究和阅读 link 并熟悉使用 Python 2 至 3 转换器。节省一些时间,您可以轻松进入它。尊敬的 SO 成员 pault 对此也有一些问题,所以你有它,不是那么简单。
我正在复制并粘贴 O'Reilly Learning Spark 教科书中的确切代码,但出现错误:org.apache.spark.SparkException:作业因阶段失败而中止
我试图理解这段代码的作用,但无法理解它,因为它不会 运行:
nums = sc.parallelize([1, 2, 3, 4])
sumCount = nums.combineByKey((lambda x: (x,1)),
(lambda x, y: (x[0] + y, x[1] + 1)),
(lambda x, y: (x[0] + y[0], x[1] + y[1])))
sumCount.map(lambda key, xy: (key, xy[0]/xy[1])).collectAsMap()
下面是完整的错误,有什么见解吗?
Job aborted due to stage failure: Task 3 in stage 26.0 failed 1 times, most recent failure: Lost task 3.0 in stage 26.0 (TID 73, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 480, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 470, in process
out_iter = func(split_index, iterator)
File "/databricks/spark/python/pyspark/rdd.py", line 2543, in pipeline_func
return func(split, prev_func(split, iterator))
File "/databricks/spark/python/pyspark/rdd.py", line 353, in func
return f(iterator)
File "/databricks/spark/python/pyspark/rdd.py", line 1905, in combineLocally
merger.mergeValues(iterator)
File "/databricks/spark/python/pyspark/shuffle.py", line 238, in mergeValues
for k, v in iterator:
TypeError: 'int' object is not iterable
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:514)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:650)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:633)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:468)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
好吧好吧
假设上面的nums不是很好,因为它不是(K,V)元组,那么假设代码如下:
data = sc.parallelize( [(0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)] )
sumCount = data.combineByKey(lambda value: (value, 1),
lambda x, value: (x[0] + value, x[1] + 1),
lambda x, y: (x[0] + y[0], x[1] + y[1]))
averageByKey = sumCount.map(lambda (label, (value_sum, count)): (label, value_sum / count))
print averageByKey.collectAsMap()
在 Spark 下,使用 python2 (pyspark),上面的代码 运行 没问题。
在 Spark 下,使用 python3 (pyspark),上面的代码在以下位置生成错误:
averageByKey = sumCount.map(lambda (label, (value_sum, count)): (label, value_sum / count))
https://www.python.org/dev/peps/pep-3113/ 解释了为什么这个功能 "tuple parameter unpacking" 在 Python 3 中被删除。这似乎让我有些失望。
最简单的解决方法是将上面的代码在线传入https://www.pythonconverter.com/和运行代码转换器。这是:
data = sc.parallelize( [(0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)] )
sumCount = data.combineByKey(lambda value: (value, 1),
lambda x, value: (x[0] + value, x[1] + 1),
lambda x, y: (x[0] + y[0], x[1] + y[1]))
averageByKey = sumCount.map(lambda label_value_sum_count: (label_value_sum_count[0], label_value_sum_count[1][0] / label_value_sum_count[1][1]))
print(averageByKey.collectAsMap())
returns 正确:
{0: 3.0, 1: 10.0}
averageByKey 现在有了不同的声明。您需要研究和阅读 link 并熟悉使用 Python 2 至 3 转换器。节省一些时间,您可以轻松进入它。尊敬的 SO 成员 pault 对此也有一些问题,所以你有它,不是那么简单。