从 S3 通配符加载文件时出错
spark error loading files from S3 wildcard
我正在使用 pyspark shell 并尝试使用 spark 的文件通配符功能从 S3 读取数据,但出现以下错误:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.2.0
/_/
Using Python version 2.7.6 (default, Jul 24 2015 16:07:07)
SparkContext available as sc.
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", 'AWS_ACCESS_KEY_ID')
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", 'AWS_SECRET_ACCESS_KEY')
>>> sc.textFile("s3n://myBucket/path/files-*", use_unicode=False).count()
16/01/07 18:03:02 INFO MemoryStore: ensureFreeSpace(37645) called with curMem=83944, maxMem=278019440
16/01/07 18:03:02 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 36.8 KB, free 265.0 MB)
16/01/07 18:03:02 INFO MemoryStore: ensureFreeSpace(5524) called with curMem=121589, maxMem=278019440
16/01/07 18:03:02 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 5.4 KB, free 265.0 MB)
16/01/07 18:03:02 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on salve1:48235 (size: 5.4 KB, free: 265.1 MB)
16/01/07 18:03:02 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
16/01/07 18:03:02 INFO SparkContext: Created broadcast 2 from textFile at NativeMethodAccessorImpl.java:-2
16/01/07 18:03:03 WARN RestS3Service: Response '/path' - Unexpected response code 404, expected 200
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 819, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 810, in sum
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 715, in reduce
vals = self.mapPartitions(func).collect()
File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 676, in collect
bytesInJava = self._jrdd.collect().iterator()
File "/spark-1.2.0-bin-1.0.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/spark-1.2.0-bin-1.0.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o65.collect.
: org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: Failed to sanitize XML document destined for handler class org.jets3t.service.impl.rest.XmlResponsesSaxParser$ListBucketHandler
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:197)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:166)
at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
at org.apache.hadoop.fs.s3native.$Proxy7.list(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:375)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:842)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:902)
at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1032)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:987)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:177)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:57)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1352)
at org.apache.spark.rdd.RDD.collect(RDD.scala:780)
at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:309)
at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.jets3t.service.S3ServiceException: Failed to sanitize XML document destined for handler class org.jets3t.service.impl.rest.XmlResponsesSaxParser$ListBucketHandler
at org.jets3t.service.impl.rest.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:179)
at org.jets3t.service.impl.rest.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:198)
at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsInternal(RestS3Service.java:1090)
at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsChunkedImpl(RestS3Service.java:1056)
at org.jets3t.service.S3Service.listObjectsChunked(S3Service.java:1328)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:181)
... 44 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:569)
at java.lang.StringBuffer.append(StringBuffer.java:369)
at org.jets3t.service.impl.rest.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:160)
at org.jets3t.service.impl.rest.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:198)
at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsInternal(RestS3Service.java:1090)
at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsChunkedImpl(RestS3Service.java:1056)
at org.jets3t.service.S3Service.listObjectsChunked(S3Service.java:1328)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:181)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:166)
at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
at org.apache.hadoop.fs.s3native.$Proxy7.list(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:375)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:842)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:902)
at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1032)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:987)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:177)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:205)
当我尝试加载单个文件(不使用通配符)时,代码有效。因为我需要读取大约 100k 个文件,所以我想知道将所有文件加载到 RDD 中的最佳方法是什么。
更新
在我看来,问题是我使用的键前缀在包含我所有文件的 s3 "directory" 中有超过 300k 个文件。我的文件有日期作为后缀。
s3://myBucket/path/files-2016-01-01-02-00
s3://myBucket/path/files-2016-01-01-02-01
s3://myBucket/path/files-2016-01-01-03-00
s3://myBucket/path/files-2016-01-01-03-01
我试图使用通配符按日期 s3n://myBucket/path/files-2016-01-01-03-*
仅 select 一些文件 当我打开调试日志记录时,我看到 spark 列出了 s3 "directory" (s3://myBucket/path/
),而不仅仅是具有我指定的键前缀 (s3://myBucket/path/files-2016-01-01-03-
) 的文件。所以即使我只是试图读取 2 个文件,所有 300k 文件都被列出,这可能是导致内存不足的原因。
这是抛出内存不足的问题。因此,请尝试先将模式限制为较少的文件,然后查看是否可以解决问题。
Spark 在加载大量小文件时遇到一个愚蠢的问题,因为它会为每个文件广播一些数据。这个可能在几天前发布的1.6.0中得到修复。目前我希望您的代码正在加载每个文件并将 RDD 联合在一起 under-the-hood.
我使用的解决方案是将所有文件移动到 S3 上的单个目录中,然后将其作为 glob 传递,例如:s3n://myBucket/path/input-files/*
。这样,就 spark 而言,您只加载一个路径,它不会为该路径中的每个文件创建一个广播变量。
我已经直接从 S3 列出了我的文件,然后制作了一个包含确切文件名的 RDD,到目前为止它对我有用。
raw_file_list = subprocess.Popen("env AWS_ACCESS_KEY_ID="myId" AWS_SECRET_ACCESS_KEY="myKey" aws s3 ls s3://myBucket/path/files-2016-01-01-02", shell=True, stdout=subprocess.PIPE).stdout.read().strip().split('\n')
s3_file_list = sc.parallelize(raw_file_list).map(lambda line: "s3n://myBucket/path/%s" % line.split()[3]).collect()
rdd = sc.textFile(','.join(s3_file_list), use_unicode=False)
我正在使用 pyspark shell 并尝试使用 spark 的文件通配符功能从 S3 读取数据,但出现以下错误:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.2.0
/_/
Using Python version 2.7.6 (default, Jul 24 2015 16:07:07)
SparkContext available as sc.
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", 'AWS_ACCESS_KEY_ID')
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", 'AWS_SECRET_ACCESS_KEY')
>>> sc.textFile("s3n://myBucket/path/files-*", use_unicode=False).count()
16/01/07 18:03:02 INFO MemoryStore: ensureFreeSpace(37645) called with curMem=83944, maxMem=278019440
16/01/07 18:03:02 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 36.8 KB, free 265.0 MB)
16/01/07 18:03:02 INFO MemoryStore: ensureFreeSpace(5524) called with curMem=121589, maxMem=278019440
16/01/07 18:03:02 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 5.4 KB, free 265.0 MB)
16/01/07 18:03:02 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on salve1:48235 (size: 5.4 KB, free: 265.1 MB)
16/01/07 18:03:02 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
16/01/07 18:03:02 INFO SparkContext: Created broadcast 2 from textFile at NativeMethodAccessorImpl.java:-2
16/01/07 18:03:03 WARN RestS3Service: Response '/path' - Unexpected response code 404, expected 200
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 819, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 810, in sum
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 715, in reduce
vals = self.mapPartitions(func).collect()
File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 676, in collect
bytesInJava = self._jrdd.collect().iterator()
File "/spark-1.2.0-bin-1.0.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/spark-1.2.0-bin-1.0.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o65.collect.
: org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: Failed to sanitize XML document destined for handler class org.jets3t.service.impl.rest.XmlResponsesSaxParser$ListBucketHandler
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:197)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:166)
at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
at org.apache.hadoop.fs.s3native.$Proxy7.list(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:375)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:842)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:902)
at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1032)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:987)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:177)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:57)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1352)
at org.apache.spark.rdd.RDD.collect(RDD.scala:780)
at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:309)
at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.jets3t.service.S3ServiceException: Failed to sanitize XML document destined for handler class org.jets3t.service.impl.rest.XmlResponsesSaxParser$ListBucketHandler
at org.jets3t.service.impl.rest.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:179)
at org.jets3t.service.impl.rest.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:198)
at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsInternal(RestS3Service.java:1090)
at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsChunkedImpl(RestS3Service.java:1056)
at org.jets3t.service.S3Service.listObjectsChunked(S3Service.java:1328)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:181)
... 44 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:569)
at java.lang.StringBuffer.append(StringBuffer.java:369)
at org.jets3t.service.impl.rest.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:160)
at org.jets3t.service.impl.rest.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:198)
at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsInternal(RestS3Service.java:1090)
at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsChunkedImpl(RestS3Service.java:1056)
at org.jets3t.service.S3Service.listObjectsChunked(S3Service.java:1328)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:181)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:166)
at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
at org.apache.hadoop.fs.s3native.$Proxy7.list(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:375)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:842)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:902)
at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1032)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:987)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:177)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:205)
当我尝试加载单个文件(不使用通配符)时,代码有效。因为我需要读取大约 100k 个文件,所以我想知道将所有文件加载到 RDD 中的最佳方法是什么。
更新
在我看来,问题是我使用的键前缀在包含我所有文件的 s3 "directory" 中有超过 300k 个文件。我的文件有日期作为后缀。
s3://myBucket/path/files-2016-01-01-02-00
s3://myBucket/path/files-2016-01-01-02-01
s3://myBucket/path/files-2016-01-01-03-00
s3://myBucket/path/files-2016-01-01-03-01
我试图使用通配符按日期 s3n://myBucket/path/files-2016-01-01-03-*
仅 select 一些文件 当我打开调试日志记录时,我看到 spark 列出了 s3 "directory" (s3://myBucket/path/
),而不仅仅是具有我指定的键前缀 (s3://myBucket/path/files-2016-01-01-03-
) 的文件。所以即使我只是试图读取 2 个文件,所有 300k 文件都被列出,这可能是导致内存不足的原因。
这是抛出内存不足的问题。因此,请尝试先将模式限制为较少的文件,然后查看是否可以解决问题。
Spark 在加载大量小文件时遇到一个愚蠢的问题,因为它会为每个文件广播一些数据。这个可能在几天前发布的1.6.0中得到修复。目前我希望您的代码正在加载每个文件并将 RDD 联合在一起 under-the-hood.
我使用的解决方案是将所有文件移动到 S3 上的单个目录中,然后将其作为 glob 传递,例如:s3n://myBucket/path/input-files/*
。这样,就 spark 而言,您只加载一个路径,它不会为该路径中的每个文件创建一个广播变量。
我已经直接从 S3 列出了我的文件,然后制作了一个包含确切文件名的 RDD,到目前为止它对我有用。
raw_file_list = subprocess.Popen("env AWS_ACCESS_KEY_ID="myId" AWS_SECRET_ACCESS_KEY="myKey" aws s3 ls s3://myBucket/path/files-2016-01-01-02", shell=True, stdout=subprocess.PIPE).stdout.read().strip().split('\n')
s3_file_list = sc.parallelize(raw_file_list).map(lambda line: "s3n://myBucket/path/%s" % line.split()[3]).collect()
rdd = sc.textFile(','.join(s3_file_list), use_unicode=False)