如何就地增加 PySpark 数据框的分区?
How to increase partitions for PySpark dataframe in place?
我收到一条错误消息,根据我在此处找到的相关问题,答案很可能是增加分区数。我卡住的地方实际上是实施解决方案。
三个问题:
(A) 我是否需要回到我实例化我的 spark 上下文并在那里定义分区号的时候?示例代码将不胜感激。
(B) 更好的是,我可以增加分区 "in place" 吗?一些代码行需要很长时间才能执行;如果可能的话,我想避免重新开始。
(C) 也许我应该先问这个;增加分区数是否是解决以下错误的适当方法?
users = r.select('user_id').distinct().collect()
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-56-0c036c22fe44> in <module>()
----> 1 users = r.select('user_id').distinct().collect()
3 frames
/content/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o191.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 1030, localhost, executor driver): org.apache.hadoop.fs.FSError: java.io.IOException: Operation canceled
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:163)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:436)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:257)
at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:276)
at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:228)
at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:196)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readCustomLine(LineReader.java:304)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:172)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
at org.apache.spark.rdd.NewHadoopRDD$$anon.hasNext(NewHadoopRDD.scala:230)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: java.io.IOException: Operation canceled
at java.io.FileInputStream.readBytes(Native Method)
at java.io.FileInputStream.read(FileInputStream.java:255)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:156)
... 28 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython.apply(Dataset.scala:3263)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython.apply(Dataset.scala:3260)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:3370)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3260)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.FSError: java.io.IOException: Operation canceled
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:163)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:436)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:257)
at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:276)
at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:228)
at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:196)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readCustomLine(LineReader.java:304)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:172)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
at org.apache.spark.rdd.NewHadoopRDD$$anon.hasNext(NewHadoopRDD.scala:230)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: java.io.IOException: Operation canceled
at java.io.FileInputStream.readBytes(Native Method)
at java.io.FileInputStream.read(FileInputStream.java:255)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:156)
... 28 more
这是我到目前为止尝试过的方法,触发了与上面相同的错误:
r2 = r.repartition(4000)
users = r2.select('user_id').distinct().collect()
4000 个分区对于您的集群来说可能太高了。然后你执行 collect()
,这是在 主节点 上完成的任务(瓶颈)。使用大批量小分区不是一个好主意。并行处理不是最优的。
问题一:
可以,推荐。
spark.sql.shuffle.partitions
B题:
分区数应该是相等或倍数(最大x 2或x 3) 您的 集群执行器 上可用的 核心数量 的总和。 spark 文档建议分区大小大于 128 MB.
https://medium.com/parrot-prediction/partitioning-in-apache-spark-8134ad840b0
问题 C:
关于答案B,不是。
我收到一条错误消息,根据我在此处找到的相关问题,答案很可能是增加分区数。我卡住的地方实际上是实施解决方案。
三个问题:
(A) 我是否需要回到我实例化我的 spark 上下文并在那里定义分区号的时候?示例代码将不胜感激。
(B) 更好的是,我可以增加分区 "in place" 吗?一些代码行需要很长时间才能执行;如果可能的话,我想避免重新开始。
(C) 也许我应该先问这个;增加分区数是否是解决以下错误的适当方法?
users = r.select('user_id').distinct().collect()
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-56-0c036c22fe44> in <module>()
----> 1 users = r.select('user_id').distinct().collect()
3 frames
/content/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o191.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 1030, localhost, executor driver): org.apache.hadoop.fs.FSError: java.io.IOException: Operation canceled
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:163)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:436)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:257)
at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:276)
at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:228)
at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:196)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readCustomLine(LineReader.java:304)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:172)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
at org.apache.spark.rdd.NewHadoopRDD$$anon.hasNext(NewHadoopRDD.scala:230)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: java.io.IOException: Operation canceled
at java.io.FileInputStream.readBytes(Native Method)
at java.io.FileInputStream.read(FileInputStream.java:255)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:156)
... 28 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython.apply(Dataset.scala:3263)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython.apply(Dataset.scala:3260)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:3370)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3260)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.FSError: java.io.IOException: Operation canceled
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:163)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:436)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:257)
at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:276)
at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:228)
at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:196)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readCustomLine(LineReader.java:304)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:172)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
at org.apache.spark.rdd.NewHadoopRDD$$anon.hasNext(NewHadoopRDD.scala:230)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: java.io.IOException: Operation canceled
at java.io.FileInputStream.readBytes(Native Method)
at java.io.FileInputStream.read(FileInputStream.java:255)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:156)
... 28 more
这是我到目前为止尝试过的方法,触发了与上面相同的错误:
r2 = r.repartition(4000)
users = r2.select('user_id').distinct().collect()
4000 个分区对于您的集群来说可能太高了。然后你执行 collect()
,这是在 主节点 上完成的任务(瓶颈)。使用大批量小分区不是一个好主意。并行处理不是最优的。
问题一:
可以,推荐。
spark.sql.shuffle.partitions
B题:
分区数应该是相等或倍数(最大x 2或x 3) 您的 集群执行器 上可用的 核心数量 的总和。 spark 文档建议分区大小大于 128 MB.
https://medium.com/parrot-prediction/partitioning-in-apache-spark-8134ad840b0
问题 C:
关于答案B,不是。