为什么此 PySpark 连接失败?
Why does this PySpark join fail?
我误解了以下示例中 PySpark 的性能。
我有几个 DataFrame,因此我加入了它们。
print"users_data"
print users_data.show()
print"calc"
print calc.show()
print"users_cat_data"
print users_cat_data.show()
data1 = calc.join(users_data, ['category_pk','item_pk'], 'leftouter')
print "DATA1"
print data1.show()
data2 = data1.join(users_cat_data, ['category_pk'], 'leftouter')
print "DATA2"
print data2.show()
data3 = data2.join(category_data, ['category_pk'], 'leftouter')
print "DATA3"
print data3.show()
data4 = data3.join(clicks_data, ['category_pk','item_pk'], 'leftouter')
print "DATA4"
print data4.show()
data4.write.parquet(output + '/test.parquet', mode="overwrite")
我预计 leftouter
加入将 return 左侧 DataFrame 与右侧 DataFrame 匹配(如果有)。
Soma 样本输出:
users_data
+--------------+----------+-------------------------+
| category_pk| item_pk| unique_users|
+--------------+----------+-------------------------+
| 321| 460| 1|
| 730| 740| 2|
| 140| 720| 10|
users_cat_data
+--------------+-----------------------+
| category_pk| unique_users_per_cat|
+--------------+-----------------------+
| 111| 258|
| 100| 260|
| 750| 9|
但是,我观察到不同的行为。我使用 show()
打印出我在连接操作中使用的所有 DataFrame 的前 5 行。所有数据帧都包含数据。但是我收到以下错误:
None
DATA1
Traceback (most recent call last):
File "mytest.py", line 884, in <module>
args.field1, args.field2, args.field3)
File "mytest.py", line 802, in calc
print data1.show()
File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/pyspark.zip/pyspark/sql/dataframe.py", line 336, in show
File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o802.showString.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123)
at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:248)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:138)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:2837)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
Caused by: org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions.apply(RDD.scala:794)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions.apply(RDD.scala:793)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:793)
我不明白为什么我在第 print data1.show()
行收到任务序列化错误。用于创建 ata1
的 DataFrame 不为空。此外,show()
在这行代码上方 2 行成功使用。
有时它在最后一行失败data4.write.parquet(output + '/test.parquet', mode="overwrite")
,当我删除它时,它运行良好。但现在它在 data1.show()
.
行更早的时候就失败了
如何解决这个问题。非常感谢任何帮助。
我 认为 最上面 org.apache.spark.SparkException: Exception thrown in awaitResult
的原因是在请求 BroadcastExchangeExec
物理运算符广播关系时(又名 table) 它只是超时了(在默认的 5 分钟等待完成后)。
这就是异常含义的底层背景。
现在,您可能会问自己,为什么会发生这种情况?
将 spark.sql.broadcastTimeout
设置为 -1
以完全禁用超时(这将导致线程无限期地等待广播完成)或将其增加到 10 分钟左右。
您还可以通过将 spark.sql.autoBroadcastJoinThreshold
设置为 -1
来禁用广播 table。
然而,这只会解决您的环境中发生的更严重的问题。
我的猜测是你的YARN集群(/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001
猜测)资源紧张,网络也可能很慢。
总而言之,我的猜测是您查询中的某些table低于导致Spark SQL优化器选择的默认10MB广播(通过其他方式在执行者上分发数据集)。
我 认为 集群中发生了更严重的事情,您面临一些临时问题,直到...管理员修复 YARN 集群。当您提交 PySpark 应用程序时,集群是否会承受更多负载?
I do not understand why I get Task Serialization error
我 认为 考虑到 PySpark 如何在两个进程(即 Python和 JVM)通过套接字进行通信。
我误解了以下示例中 PySpark 的性能。
我有几个 DataFrame,因此我加入了它们。
print"users_data"
print users_data.show()
print"calc"
print calc.show()
print"users_cat_data"
print users_cat_data.show()
data1 = calc.join(users_data, ['category_pk','item_pk'], 'leftouter')
print "DATA1"
print data1.show()
data2 = data1.join(users_cat_data, ['category_pk'], 'leftouter')
print "DATA2"
print data2.show()
data3 = data2.join(category_data, ['category_pk'], 'leftouter')
print "DATA3"
print data3.show()
data4 = data3.join(clicks_data, ['category_pk','item_pk'], 'leftouter')
print "DATA4"
print data4.show()
data4.write.parquet(output + '/test.parquet', mode="overwrite")
我预计 leftouter
加入将 return 左侧 DataFrame 与右侧 DataFrame 匹配(如果有)。
Soma 样本输出:
users_data
+--------------+----------+-------------------------+
| category_pk| item_pk| unique_users|
+--------------+----------+-------------------------+
| 321| 460| 1|
| 730| 740| 2|
| 140| 720| 10|
users_cat_data
+--------------+-----------------------+
| category_pk| unique_users_per_cat|
+--------------+-----------------------+
| 111| 258|
| 100| 260|
| 750| 9|
但是,我观察到不同的行为。我使用 show()
打印出我在连接操作中使用的所有 DataFrame 的前 5 行。所有数据帧都包含数据。但是我收到以下错误:
None
DATA1
Traceback (most recent call last):
File "mytest.py", line 884, in <module>
args.field1, args.field2, args.field3)
File "mytest.py", line 802, in calc
print data1.show()
File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/pyspark.zip/pyspark/sql/dataframe.py", line 336, in show
File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o802.showString.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123)
at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:248)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:138)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:2837)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
Caused by: org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions.apply(RDD.scala:794)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions.apply(RDD.scala:793)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:793)
我不明白为什么我在第 print data1.show()
行收到任务序列化错误。用于创建 ata1
的 DataFrame 不为空。此外,show()
在这行代码上方 2 行成功使用。
有时它在最后一行失败data4.write.parquet(output + '/test.parquet', mode="overwrite")
,当我删除它时,它运行良好。但现在它在 data1.show()
.
如何解决这个问题。非常感谢任何帮助。
我 认为 最上面 org.apache.spark.SparkException: Exception thrown in awaitResult
的原因是在请求 BroadcastExchangeExec
物理运算符广播关系时(又名 table) 它只是超时了(在默认的 5 分钟等待完成后)。
这就是异常含义的底层背景。
现在,您可能会问自己,为什么会发生这种情况?
将 spark.sql.broadcastTimeout
设置为 -1
以完全禁用超时(这将导致线程无限期地等待广播完成)或将其增加到 10 分钟左右。
您还可以通过将 spark.sql.autoBroadcastJoinThreshold
设置为 -1
来禁用广播 table。
然而,这只会解决您的环境中发生的更严重的问题。
我的猜测是你的YARN集群(/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001
猜测)资源紧张,网络也可能很慢。
总而言之,我的猜测是您查询中的某些table低于导致Spark SQL优化器选择的默认10MB广播(通过其他方式在执行者上分发数据集)。
我 认为 集群中发生了更严重的事情,您面临一些临时问题,直到...管理员修复 YARN 集群。当您提交 PySpark 应用程序时,集群是否会承受更多负载?
I do not understand why I get Task Serialization error
我 认为 考虑到 PySpark 如何在两个进程(即 Python和 JVM)通过套接字进行通信。