使用 window 函数时出现 pyspark 错误(Spark 2.1.0 报告未找到列的问题)?
pyspark error when working with window function (Spark 2.1.0 reports issue with column not found)?
更新:
我创建了以下 JIRA 问题:https://issues.apache.org/jira/browse/SPARK-20086
状态:已修复! (整个周末!速度快得惊人!)
更新 2:
此问题已由 https://github.com/apache/spark/pull/17432 for versions 2.1.1, 2.2.0
. So I got a newer spark version from the nightly builds at http://people.apache.org/~pwendell/spark-nightly/ 修复
如果您在 <=2.1.0
.
上,您可能仍会 运行 陷入这个问题
原文Post:
我在使用 pyspark window 函数时遇到错误。这是一些示例代码:
import pyspark
import pyspark.sql.functions as sf
import pyspark.sql.types as sparktypes
from pyspark.sql import window
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
rdd = sc.parallelize([(1, 2.0), (1, 3.0), (1, 1.), (1, -2.), (1, -1.)])
df = sqlc.createDataFrame(rdd, ["x", "AmtPaid"])
df.show()
给出:
+---+-------+
| x|AmtPaid|
+---+-------+
| 1| 2.0|
| 1| 3.0|
| 1| 1.0|
| 1| -2.0|
| 1| -1.0|
+---+-------+
接下来,计算累计和
win_spec_max = (window.Window
.partitionBy(['x'])
.rowsBetween(window.Window.unboundedPreceding, 0)))
df = df.withColumn('AmtPaidCumSum',
sf.sum(sf.col('AmtPaid')).over(win_spec_max))
df.show()
给予,
+---+-------+-------------+
| x|AmtPaid|AmtPaidCumSum|
+---+-------+-------------+
| 1| 2.0| 2.0|
| 1| 3.0| 5.0|
| 1| 1.0| 6.0|
| 1| -2.0| 4.0|
| 1| -1.0| 3.0|
+---+-------+-------------+
接下来,计算累积最大值,
df = df.withColumn('AmtPaidCumSumMax', sf.max(sf.col('AmtPaidCumSum')).over(win_spec_max))
df.show()
给出错误日志
Py4JJavaError: An error occurred while calling o2609.showString.
带追溯:
Py4JJavaErrorTraceback (most recent call last)
<ipython-input-215-3106d06b6e49> in <module>()
----> 1 df.show()
/Users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
316 """
317 if isinstance(truncate, bool) and truncate:
--> 318 print(self._jdf.showString(n, 20))
319 else:
320 print(self._jdf.showString(n, int(truncate)))
/Users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/Users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/Users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
但有趣的是,如果我在第二个 window 操作之前引入另一个更改,比如插入一列,那么它不会给出该错误:
df = df.withColumn('MaxBound', sf.lit(6.))
df.show()
+---+-------+-------------+--------+
| x|AmtPaid|AmtPaidCumSum|MaxBound|
+---+-------+-------------+--------+
| 1| 2.0| 2.0| 6.0|
| 1| 3.0| 5.0| 6.0|
| 1| 1.0| 6.0| 6.0|
| 1| -2.0| 4.0| 6.0|
| 1| -1.0| 3.0| 6.0|
+---+-------+-------------+--------+
#then apply the second window operations
df = df.withColumn('AmtPaidCumSumMax', sf.max(sf.col('AmtPaidCumSum')).over(win_spec_max))
df.show()
+---+-------+-------------+--------+----------------+
| x|AmtPaid|AmtPaidCumSum|MaxBound|AmtPaidCumSumMax|
+---+-------+-------------+--------+----------------+
| 1| 2.0| 2.0| 6.0| 2.0|
| 1| 3.0| 5.0| 6.0| 5.0|
| 1| 1.0| 6.0| 6.0| 6.0|
| 1| -2.0| 4.0| 6.0| 6.0|
| 1| -1.0| 3.0| 6.0| 6.0|
+---+-------+-------------+--------+----------------+
我不理解这种行为
好吧,到目前为止一切顺利,但后来我尝试了另一个操作,然后再次出现类似的错误:
def _udf_compare_cumsum_sll(x):
if x['AmtPaidCumSumMax'] >= x['MaxBound']:
output = 0
else:
output = x['AmtPaid']
return output
udf_compare_cumsum_sll = sf.udf(_udf_compare_cumsum_sll, sparktypes.FloatType())
df = df.withColumn('AmtPaidAdjusted', udf_compare_cumsum_sll(sf.struct([df[x] for x in df.columns])))
df.show()
给予,
Py4JJavaErrorTraceback (most recent call last)
<ipython-input-18-3106d06b6e49> in <module>()
----> 1 df.show()
/Users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
316 """
317 if isinstance(truncate, bool) and truncate:
--> 318 print(self._jdf.showString(n, 20))
319 else:
320 print(self._jdf.showString(n, int(truncate)))
/Users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/Users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/Users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o91.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 36.0 failed 1 times, most recent failure: Lost task 0.0 in stage 36.0 (TID 645, localhost, executor driver): org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: AmtPaidCumSum#10
我想知道是否有人可以重现此行为...
这里是完整的日志..
Py4JJavaErrorTraceback (most recent call last)
<ipython-input-69-3106d06b6e49> in <module>()
----> 1 df.show()
/Users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
316 """
317 if isinstance(truncate, bool) and truncate:
--> 318 print(self._jdf.showString(n, 20))
319 else:
320 print(self._jdf.showString(n, int(truncate)))
/Users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/Users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/Users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o703.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 119.0 failed 1 times, most recent failure: Lost task 0.0 in stage 119.0 (TID 1817, localhost, executor driver): org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: AmtPaidCumSum#2076
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference.applyOrElse(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference.applyOrElse(BoundAttribute.scala:87)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$$anonfun$apply.apply(TreeNode.scala:360)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:358)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277)
at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$bind.apply(GenerateMutableProjection.scala:38)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$bind.apply(GenerateMutableProjection.scala:38)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.bind(GenerateMutableProjection.scala:38)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.generate(GenerateMutableProjection.scala:44)
at org.apache.spark.sql.execution.SparkPlan.newMutableProjection(SparkPlan.scala:353)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun$org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor.apply(WindowExec.scala:203)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun$org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor.apply(WindowExec.scala:202)
at org.apache.spark.sql.execution.window.AggregateProcessor$.apply(AggregateProcessor.scala:98)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs.org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor(WindowExec.scala:198)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun.apply(WindowExec.scala:225)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun.apply(WindowExec.scala:222)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon$$anonfun.apply(WindowExec.scala:318)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon$$anonfun.apply(WindowExec.scala:318)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon.<init>(WindowExec.scala:318)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun.apply(WindowExec.scala:290)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun.apply(WindowExec.scala:289)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:796)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:796)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Couldn't find AmtPaidCumSum#2076 in [sum#2299,max#2300,x#2066L,AmtPaid#2067]
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$$anonfun$applyOrElse.apply(BoundAttribute.scala:94)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$$anonfun$applyOrElse.apply(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 62 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute.apply(Dataset.scala:2371)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute(Dataset.scala:2370)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)
at org.apache.spark.sql.Dataset$$anonfun$head.apply(Dataset.scala:2113)
at org.apache.spark.sql.Dataset$$anonfun$head.apply(Dataset.scala:2112)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
at sun.reflect.GeneratedMethodAccessor83.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: null
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference.applyOrElse(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference.applyOrElse(BoundAttribute.scala:87)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$$anonfun$apply.apply(TreeNode.scala:360)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:358)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277)
at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$bind.apply(GenerateMutableProjection.scala:38)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$bind.apply(GenerateMutableProjection.scala:38)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.bind(GenerateMutableProjection.scala:38)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.generate(GenerateMutableProjection.scala:44)
at org.apache.spark.sql.execution.SparkPlan.newMutableProjection(SparkPlan.scala:353)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun$org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor.apply(WindowExec.scala:203)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun$org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor.apply(WindowExec.scala:202)
at org.apache.spark.sql.execution.window.AggregateProcessor$.apply(AggregateProcessor.scala:98)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs.org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor(WindowExec.scala:198)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun.apply(WindowExec.scala:225)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun.apply(WindowExec.scala:222)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon$$anonfun.apply(WindowExec.scala:318)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon$$anonfun.apply(WindowExec.scala:318)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon.<init>(WindowExec.scala:318)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun.apply(WindowExec.scala:290)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun.apply(WindowExec.scala:289)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:796)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:796)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: java.lang.RuntimeException: Couldn't find AmtPaidCumSum#2076 in [sum#2299,max#2300,x#2066L,AmtPaid#2067]
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$$anonfun$applyOrElse.apply(BoundAttribute.scala:94)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$$anonfun$applyOrElse.apply(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 62 more
Spark 2.1 和 2.2.0-SNAPSHOT 中的 window 运算符支持似乎存在问题(今天由主人建造)。请参阅 Scala 中的以下内容。
认为您应该在 Spark 的 JIRA 中报告问题。
val inventory = Seq(
(1, 2.0), (1, 3.0), (1, 1.0), (1, -2.0), (1, -1.0)).toDF("x", "AmtPaid")
scala> inventory.printSchema
root
|-- x: integer (nullable = false)
|-- AmtPaid: double (nullable = false)
import org.apache.spark.sql.expressions.Window
val byXwithAllRowsBefore = Window.partitionBy("x").rowsBetween(Window.unboundedPreceding, Window.currentRow)
import org.apache.spark.sql.functions.sum
val sumOverAmtPaid = inventory.withColumn("AmtPaidCumSum", sum($"AmtPaid") over byXwithAllRowsBefore)
scala> sumOverAmtPaid.show
+---+-------+-------------+
| x|AmtPaid|AmtPaidCumSum|
+---+-------+-------------+
| 1| 2.0| 2.0|
| 1| 3.0| 5.0|
| 1| 1.0| 6.0|
| 1| -2.0| 4.0|
| 1| -1.0| 3.0|
+---+-------+-------------+
scala> sumOverAmtPaid.printSchema
root
|-- x: integer (nullable = false)
|-- AmtPaid: double (nullable = false)
|-- AmtPaidCumSum: double (nullable = true)
到目前为止一切顺利。就像 Python.
累计最大值
由于 java.lang.RuntimeException
.
,以下内容将无法使用
import org.apache.spark.sql.functions.max
val cumulativeMax = sumOverAmtPaid
.withColumn("AmtPaidCumSumMax", max($"AmtPaidCumSum") over byXwithAllRowsBefore)
scala> cumulativeMax.show
17/03/24 22:12:16 ERROR Executor: Exception in task 0.0 in stage 11.0 (TID 210)
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: AmtPaidCumSum#11
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference.applyOrElse(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference.applyOrElse(BoundAttribute.scala:87)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$$anonfun$apply.apply(TreeNode.scala:335)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:333)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$bind.apply(GenerateMutableProjection.scala:38)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$bind.apply(GenerateMutableProjection.scala:38)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.bind(GenerateMutableProjection.scala:38)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.generate(GenerateMutableProjection.scala:44)
at org.apache.spark.sql.execution.SparkPlan.newMutableProjection(SparkPlan.scala:353)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun$org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor.apply(WindowExec.scala:201)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun$org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor.apply(WindowExec.scala:200)
at org.apache.spark.sql.execution.window.AggregateProcessor$.apply(AggregateProcessor.scala:98)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs.org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor(WindowExec.scala:196)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun.apply(WindowExec.scala:223)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun.apply(WindowExec.scala:220)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon$$anonfun.apply(WindowExec.scala:319)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon$$anonfun.apply(WindowExec.scala:319)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon.<init>(WindowExec.scala:319)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun.apply(WindowExec.scala:289)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun.apply(WindowExec.scala:288)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:320)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Couldn't find AmtPaidCumSum#11 in [sum#234,max#235,x#5,AmtPaid#6]
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$$anonfun$applyOrElse.apply(BoundAttribute.scala:94)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$$anonfun$applyOrElse.apply(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 62 more
RuntimeException 说:
Couldn't find AmtPaidCumSum#11 in [sum#234,max#235,x#5,AmtPaid#6]
似乎有 sum
列,对吗?让我们用它代替 max
.
中的 $"AmtPaidCumSum"
但是这次 Spark 报告了一个 AnalysisException
,其中包括 AmtPaidCumSum
列 (!)
org.apache.spark.sql.AnalysisException: cannot resolve 'sum
' given input columns: [x, AmtPaid, AmtPaidCumSum];;
scala> val cumulativeMax = sumOverAmtPaid.withColumn("AmtPaidCumSumMax", max($"sum") over byXwithAllRowsBefore)
org.apache.spark.sql.AnalysisException: cannot resolve '`sum`' given input columns: [x, AmtPaid, AmtPaidCumSum];;
'Project [x#5, AmtPaid#6, AmtPaidCumSum#11, max('sum) windowspecdefinition(x#5, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS AmtPaidCumSumMax#237]
+- Project [x#5, AmtPaid#6, AmtPaidCumSum#11]
+- Project [x#5, AmtPaid#6, AmtPaidCumSum#11, AmtPaidCumSum#11]
+- Window [sum(AmtPaid#6) windowspecdefinition(x#5, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS AmtPaidCumSum#11], [x#5]
+- Project [x#5, AmtPaid#6]
+- Project [_1#2 AS x#5, _2#3 AS AmtPaid#6]
+- LocalRelation [_1#2, _2#3]
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:89)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:86)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:256)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:256)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression(QueryPlan.scala:267)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:277)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform.apply(QueryPlan.scala:281)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:281)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:286)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:256)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:86)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:79)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:79)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:90)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:53)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2832)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1137)
at org.apache.spark.sql.Dataset.withColumn(Dataset.scala:1882)
... 48 elided
我遇到了同样的问题,除了降级 Spark 之外,对我有帮助的是在使用 window 函数应用转换之间将结果保存到配置单元 table 和之前从该配置单元检索数据 table继续使用 window 函数 next
更新: 我创建了以下 JIRA 问题:https://issues.apache.org/jira/browse/SPARK-20086 状态:已修复! (整个周末!速度快得惊人!)
更新 2:
此问题已由 https://github.com/apache/spark/pull/17432 for versions 2.1.1, 2.2.0
. So I got a newer spark version from the nightly builds at http://people.apache.org/~pwendell/spark-nightly/ 修复
如果您在 <=2.1.0
.
原文Post:
我在使用 pyspark window 函数时遇到错误。这是一些示例代码:
import pyspark
import pyspark.sql.functions as sf
import pyspark.sql.types as sparktypes
from pyspark.sql import window
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
rdd = sc.parallelize([(1, 2.0), (1, 3.0), (1, 1.), (1, -2.), (1, -1.)])
df = sqlc.createDataFrame(rdd, ["x", "AmtPaid"])
df.show()
给出:
+---+-------+
| x|AmtPaid|
+---+-------+
| 1| 2.0|
| 1| 3.0|
| 1| 1.0|
| 1| -2.0|
| 1| -1.0|
+---+-------+
接下来,计算累计和
win_spec_max = (window.Window
.partitionBy(['x'])
.rowsBetween(window.Window.unboundedPreceding, 0)))
df = df.withColumn('AmtPaidCumSum',
sf.sum(sf.col('AmtPaid')).over(win_spec_max))
df.show()
给予,
+---+-------+-------------+
| x|AmtPaid|AmtPaidCumSum|
+---+-------+-------------+
| 1| 2.0| 2.0|
| 1| 3.0| 5.0|
| 1| 1.0| 6.0|
| 1| -2.0| 4.0|
| 1| -1.0| 3.0|
+---+-------+-------------+
接下来,计算累积最大值,
df = df.withColumn('AmtPaidCumSumMax', sf.max(sf.col('AmtPaidCumSum')).over(win_spec_max))
df.show()
给出错误日志
Py4JJavaError: An error occurred while calling o2609.showString.
带追溯:
Py4JJavaErrorTraceback (most recent call last)
<ipython-input-215-3106d06b6e49> in <module>()
----> 1 df.show()
/Users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
316 """
317 if isinstance(truncate, bool) and truncate:
--> 318 print(self._jdf.showString(n, 20))
319 else:
320 print(self._jdf.showString(n, int(truncate)))
/Users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/Users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/Users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
但有趣的是,如果我在第二个 window 操作之前引入另一个更改,比如插入一列,那么它不会给出该错误:
df = df.withColumn('MaxBound', sf.lit(6.))
df.show()
+---+-------+-------------+--------+
| x|AmtPaid|AmtPaidCumSum|MaxBound|
+---+-------+-------------+--------+
| 1| 2.0| 2.0| 6.0|
| 1| 3.0| 5.0| 6.0|
| 1| 1.0| 6.0| 6.0|
| 1| -2.0| 4.0| 6.0|
| 1| -1.0| 3.0| 6.0|
+---+-------+-------------+--------+
#then apply the second window operations
df = df.withColumn('AmtPaidCumSumMax', sf.max(sf.col('AmtPaidCumSum')).over(win_spec_max))
df.show()
+---+-------+-------------+--------+----------------+
| x|AmtPaid|AmtPaidCumSum|MaxBound|AmtPaidCumSumMax|
+---+-------+-------------+--------+----------------+
| 1| 2.0| 2.0| 6.0| 2.0|
| 1| 3.0| 5.0| 6.0| 5.0|
| 1| 1.0| 6.0| 6.0| 6.0|
| 1| -2.0| 4.0| 6.0| 6.0|
| 1| -1.0| 3.0| 6.0| 6.0|
+---+-------+-------------+--------+----------------+
我不理解这种行为
好吧,到目前为止一切顺利,但后来我尝试了另一个操作,然后再次出现类似的错误:
def _udf_compare_cumsum_sll(x):
if x['AmtPaidCumSumMax'] >= x['MaxBound']:
output = 0
else:
output = x['AmtPaid']
return output
udf_compare_cumsum_sll = sf.udf(_udf_compare_cumsum_sll, sparktypes.FloatType())
df = df.withColumn('AmtPaidAdjusted', udf_compare_cumsum_sll(sf.struct([df[x] for x in df.columns])))
df.show()
给予,
Py4JJavaErrorTraceback (most recent call last)
<ipython-input-18-3106d06b6e49> in <module>()
----> 1 df.show()
/Users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
316 """
317 if isinstance(truncate, bool) and truncate:
--> 318 print(self._jdf.showString(n, 20))
319 else:
320 print(self._jdf.showString(n, int(truncate)))
/Users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/Users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/Users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o91.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 36.0 failed 1 times, most recent failure: Lost task 0.0 in stage 36.0 (TID 645, localhost, executor driver): org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: AmtPaidCumSum#10
我想知道是否有人可以重现此行为...
这里是完整的日志..
Py4JJavaErrorTraceback (most recent call last)
<ipython-input-69-3106d06b6e49> in <module>()
----> 1 df.show()
/Users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
316 """
317 if isinstance(truncate, bool) and truncate:
--> 318 print(self._jdf.showString(n, 20))
319 else:
320 print(self._jdf.showString(n, int(truncate)))
/Users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/Users/<>/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/Users/<>/.virtualenvs/<>/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o703.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 119.0 failed 1 times, most recent failure: Lost task 0.0 in stage 119.0 (TID 1817, localhost, executor driver): org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: AmtPaidCumSum#2076
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference.applyOrElse(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference.applyOrElse(BoundAttribute.scala:87)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$$anonfun$apply.apply(TreeNode.scala:360)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:358)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277)
at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$bind.apply(GenerateMutableProjection.scala:38)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$bind.apply(GenerateMutableProjection.scala:38)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.bind(GenerateMutableProjection.scala:38)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.generate(GenerateMutableProjection.scala:44)
at org.apache.spark.sql.execution.SparkPlan.newMutableProjection(SparkPlan.scala:353)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun$org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor.apply(WindowExec.scala:203)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun$org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor.apply(WindowExec.scala:202)
at org.apache.spark.sql.execution.window.AggregateProcessor$.apply(AggregateProcessor.scala:98)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs.org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor(WindowExec.scala:198)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun.apply(WindowExec.scala:225)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun.apply(WindowExec.scala:222)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon$$anonfun.apply(WindowExec.scala:318)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon$$anonfun.apply(WindowExec.scala:318)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon.<init>(WindowExec.scala:318)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun.apply(WindowExec.scala:290)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun.apply(WindowExec.scala:289)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:796)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:796)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Couldn't find AmtPaidCumSum#2076 in [sum#2299,max#2300,x#2066L,AmtPaid#2067]
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$$anonfun$applyOrElse.apply(BoundAttribute.scala:94)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$$anonfun$applyOrElse.apply(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 62 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute.apply(Dataset.scala:2371)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute(Dataset.scala:2370)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)
at org.apache.spark.sql.Dataset$$anonfun$head.apply(Dataset.scala:2113)
at org.apache.spark.sql.Dataset$$anonfun$head.apply(Dataset.scala:2112)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
at sun.reflect.GeneratedMethodAccessor83.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: null
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference.applyOrElse(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference.applyOrElse(BoundAttribute.scala:87)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$$anonfun$apply.apply(TreeNode.scala:360)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:358)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277)
at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$bind.apply(GenerateMutableProjection.scala:38)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$bind.apply(GenerateMutableProjection.scala:38)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.bind(GenerateMutableProjection.scala:38)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.generate(GenerateMutableProjection.scala:44)
at org.apache.spark.sql.execution.SparkPlan.newMutableProjection(SparkPlan.scala:353)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun$org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor.apply(WindowExec.scala:203)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun$org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor.apply(WindowExec.scala:202)
at org.apache.spark.sql.execution.window.AggregateProcessor$.apply(AggregateProcessor.scala:98)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs.org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor(WindowExec.scala:198)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun.apply(WindowExec.scala:225)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun.apply(WindowExec.scala:222)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon$$anonfun.apply(WindowExec.scala:318)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon$$anonfun.apply(WindowExec.scala:318)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon.<init>(WindowExec.scala:318)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun.apply(WindowExec.scala:290)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun.apply(WindowExec.scala:289)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:796)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:796)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: java.lang.RuntimeException: Couldn't find AmtPaidCumSum#2076 in [sum#2299,max#2300,x#2066L,AmtPaid#2067]
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$$anonfun$applyOrElse.apply(BoundAttribute.scala:94)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$$anonfun$applyOrElse.apply(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 62 more
Spark 2.1 和 2.2.0-SNAPSHOT 中的 window 运算符支持似乎存在问题(今天由主人建造)。请参阅 Scala 中的以下内容。
认为您应该在 Spark 的 JIRA 中报告问题。
val inventory = Seq(
(1, 2.0), (1, 3.0), (1, 1.0), (1, -2.0), (1, -1.0)).toDF("x", "AmtPaid")
scala> inventory.printSchema
root
|-- x: integer (nullable = false)
|-- AmtPaid: double (nullable = false)
import org.apache.spark.sql.expressions.Window
val byXwithAllRowsBefore = Window.partitionBy("x").rowsBetween(Window.unboundedPreceding, Window.currentRow)
import org.apache.spark.sql.functions.sum
val sumOverAmtPaid = inventory.withColumn("AmtPaidCumSum", sum($"AmtPaid") over byXwithAllRowsBefore)
scala> sumOverAmtPaid.show
+---+-------+-------------+
| x|AmtPaid|AmtPaidCumSum|
+---+-------+-------------+
| 1| 2.0| 2.0|
| 1| 3.0| 5.0|
| 1| 1.0| 6.0|
| 1| -2.0| 4.0|
| 1| -1.0| 3.0|
+---+-------+-------------+
scala> sumOverAmtPaid.printSchema
root
|-- x: integer (nullable = false)
|-- AmtPaid: double (nullable = false)
|-- AmtPaidCumSum: double (nullable = true)
到目前为止一切顺利。就像 Python.
累计最大值
由于 java.lang.RuntimeException
.
import org.apache.spark.sql.functions.max
val cumulativeMax = sumOverAmtPaid
.withColumn("AmtPaidCumSumMax", max($"AmtPaidCumSum") over byXwithAllRowsBefore)
scala> cumulativeMax.show
17/03/24 22:12:16 ERROR Executor: Exception in task 0.0 in stage 11.0 (TID 210)
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: AmtPaidCumSum#11
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference.applyOrElse(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference.applyOrElse(BoundAttribute.scala:87)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$$anonfun$apply.apply(TreeNode.scala:335)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:333)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$bind.apply(GenerateMutableProjection.scala:38)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$bind.apply(GenerateMutableProjection.scala:38)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.bind(GenerateMutableProjection.scala:38)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.generate(GenerateMutableProjection.scala:44)
at org.apache.spark.sql.execution.SparkPlan.newMutableProjection(SparkPlan.scala:353)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun$org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor.apply(WindowExec.scala:201)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun$org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor.apply(WindowExec.scala:200)
at org.apache.spark.sql.execution.window.AggregateProcessor$.apply(AggregateProcessor.scala:98)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs.org$apache$spark$sql$execution$window$WindowExec$$anonfun$$processor(WindowExec.scala:196)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun.apply(WindowExec.scala:223)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$$anonfun.apply(WindowExec.scala:220)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon$$anonfun.apply(WindowExec.scala:319)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon$$anonfun.apply(WindowExec.scala:319)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$$anon.<init>(WindowExec.scala:319)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun.apply(WindowExec.scala:289)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun.apply(WindowExec.scala:288)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:320)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Couldn't find AmtPaidCumSum#11 in [sum#234,max#235,x#5,AmtPaid#6]
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$$anonfun$applyOrElse.apply(BoundAttribute.scala:94)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$$anonfun$applyOrElse.apply(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 62 more
RuntimeException 说:
Couldn't find AmtPaidCumSum#11 in [sum#234,max#235,x#5,AmtPaid#6]
似乎有 sum
列,对吗?让我们用它代替 max
.
$"AmtPaidCumSum"
但是这次 Spark 报告了一个 AnalysisException
,其中包括 AmtPaidCumSum
列 (!)
org.apache.spark.sql.AnalysisException: cannot resolve '
sum
' given input columns: [x, AmtPaid, AmtPaidCumSum];;
scala> val cumulativeMax = sumOverAmtPaid.withColumn("AmtPaidCumSumMax", max($"sum") over byXwithAllRowsBefore)
org.apache.spark.sql.AnalysisException: cannot resolve '`sum`' given input columns: [x, AmtPaid, AmtPaidCumSum];;
'Project [x#5, AmtPaid#6, AmtPaidCumSum#11, max('sum) windowspecdefinition(x#5, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS AmtPaidCumSumMax#237]
+- Project [x#5, AmtPaid#6, AmtPaidCumSum#11]
+- Project [x#5, AmtPaid#6, AmtPaidCumSum#11, AmtPaidCumSum#11]
+- Window [sum(AmtPaid#6) windowspecdefinition(x#5, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS AmtPaidCumSum#11], [x#5]
+- Project [x#5, AmtPaid#6]
+- Project [_1#2 AS x#5, _2#3 AS AmtPaid#6]
+- LocalRelation [_1#2, _2#3]
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:89)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:86)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:256)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:256)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression(QueryPlan.scala:267)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:277)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform.apply(QueryPlan.scala:281)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:281)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:286)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:256)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:86)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:79)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:79)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:90)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:53)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2832)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1137)
at org.apache.spark.sql.Dataset.withColumn(Dataset.scala:1882)
... 48 elided
我遇到了同样的问题,除了降级 Spark 之外,对我有帮助的是在使用 window 函数应用转换之间将结果保存到配置单元 table 和之前从该配置单元检索数据 table继续使用 window 函数 next