pyspark: sort in reduceByKey error: in <lambda> TypeError: 'int' object is not callable
pyspark: sort in reduceByKey error: in <lambda> TypeError: 'int' object is not callable
我有以下代码:对于每个 my_id
,我试图根据 timestamp
字段对 amount
字段进行排序:
output_rdd = my_df.rdd.map(lambda r: (r['my_id'], [r['timestamp'],[r['amount']]]))\
.reduceByKey(lambda a, b: sorted(a+b, key=(a+b)[0]))\
.map(lambda r: r[1])
但是,我收到以下错误:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 30.0 failed 4 times, most recent failure: Lost task 0.3 in stage 30.0 (TID 52, ph-hdp-prd-dn02): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/data/0/yarn/nm/usercache/phanalytics-test/appcache/application_1474532589728_2983/container_e203_1474532589728_2983_01_000014/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/data/0/yarn/nm/usercache/analytics-test/appcache/application_1474532589728_2983/container_e203_1474532589728_2983_01_000014/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark-latest/python/pyspark/rdd.py", line 2371, in pipeline_func
File "/usr/local/spark-latest/python/pyspark/rdd.py", line 2371, in pipeline_func
File "/usr/local/spark-latest/python/pyspark/rdd.py", line 317, in func
File "/usr/local/spark-latest/python/pyspark/rdd.py", line 1792, in combineLocally
File "/data/0/yarn/nm/usercache/phanalytics-test/appcache/application_1474532589728_2983/container_e203_1474532589728_2983_01_000014/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
d[k] = comb(d[k], v) if k in d else creator(v)
File "<ipython-input-11-ec09929e01e4>", line 6, in <lambda>
TypeError: 'int' object is not callable
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
知道我错过了什么吗?非常感谢!
key
应该是一个函数。尝试
... .reduceByKey(lambda a, b: sorted(a+b, key=lambda x: x[0] )) \
注意以下来自 Python documentation -
关键参数的值应该是一个接受单个参数的函数,returns 一个用于排序目的的键。这种技术很快,因为对于每个输入记录只调用一次键函数。
将传递给 key 的参数转换为 python 函数或 lambda 函数,然后重试。
我有以下代码:对于每个 my_id
,我试图根据 timestamp
字段对 amount
字段进行排序:
output_rdd = my_df.rdd.map(lambda r: (r['my_id'], [r['timestamp'],[r['amount']]]))\
.reduceByKey(lambda a, b: sorted(a+b, key=(a+b)[0]))\
.map(lambda r: r[1])
但是,我收到以下错误:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 30.0 failed 4 times, most recent failure: Lost task 0.3 in stage 30.0 (TID 52, ph-hdp-prd-dn02): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/data/0/yarn/nm/usercache/phanalytics-test/appcache/application_1474532589728_2983/container_e203_1474532589728_2983_01_000014/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/data/0/yarn/nm/usercache/analytics-test/appcache/application_1474532589728_2983/container_e203_1474532589728_2983_01_000014/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark-latest/python/pyspark/rdd.py", line 2371, in pipeline_func
File "/usr/local/spark-latest/python/pyspark/rdd.py", line 2371, in pipeline_func
File "/usr/local/spark-latest/python/pyspark/rdd.py", line 317, in func
File "/usr/local/spark-latest/python/pyspark/rdd.py", line 1792, in combineLocally
File "/data/0/yarn/nm/usercache/phanalytics-test/appcache/application_1474532589728_2983/container_e203_1474532589728_2983_01_000014/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
d[k] = comb(d[k], v) if k in d else creator(v)
File "<ipython-input-11-ec09929e01e4>", line 6, in <lambda>
TypeError: 'int' object is not callable
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
知道我错过了什么吗?非常感谢!
key
应该是一个函数。尝试
... .reduceByKey(lambda a, b: sorted(a+b, key=lambda x: x[0] )) \
注意以下来自 Python documentation -
关键参数的值应该是一个接受单个参数的函数,returns 一个用于排序目的的键。这种技术很快,因为对于每个输入记录只调用一次键函数。
将传递给 key 的参数转换为 python 函数或 lambda 函数,然后重试。