使用 SPARK 从 zip 到 seq
From zip to seq with SPARK
我每天都会收到一个压缩包“2018-06-26.zip”,大小约为。压缩后 250 Mb,包含 165-170.000 个 XML 小文件(Kb)。我将 zip-archive 加载到 HDFS(避免小文件问题),并使用 SPARK 从 zip 中提取它们(zip 不可拆分),制作成对的 RDD,文件名作为键,内容作为值并保存通过成对的 RDD 将它们作为序列文件。一切 运行 都非常顺利,带有一个仅包含 3 个 XML 文件的小型 zip 存档用于测试目的,但是当我向它提供大存档时,我得到
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Arrays.copyOf(Arrays.java:2367)
at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
...
...
我运行正在使用 Cloudera Quickstart VM:CDH 5.13.3(HDFS:2.60,JDK:1.7.0.67,SPARK:1.6.0,Scala 2.10)
我还没有 运行 它在一个成熟的集群上,因为我想在部署它之前确保我的代码是正确的...
垃圾收集器保持 运行ing OOM,但超出了开销限制。我知道增加驱动程序和 Java 堆 Space 的内存量,但我怀疑我的方法占用了太多内存....监控内存使用情况,不会显示任何内容虽然内存泄漏....
代码如下:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import java.util.zip.{ZipEntry, ZipInputStream}
import org.apache.spark.input.PortableDataStream
import scala.collection.mutable
val conf = new SparkConf().setMaster("local").setAppName("ZipToSeq")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var xml_map = new mutable.HashMap[String, String]()
sc.binaryFiles("/user/cloudera/test/2018-06-26.zip", 10).collect
.foreach { zip_file : (String, PortableDataStream) =>
val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
var zip_entry : ZipEntry = null
while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
if (!zip_entry.isDirectory) {
val key_file_name = zip_entry.getName
val value_file_content = scala.io.Source.fromInputStream(zip_stream, "iso-8859-1").getLines.mkString("\n")
xml_map += ( key_file_name -> value_file_content )
}
zip_stream.closeEntry()
}
zip_stream.close()
}
val xml_rdd = sc.parallelize(xml_map.toSeq).saveAsSequenceFile("/user/cloudera/2018_06_26")
非常感谢任何帮助或想法。
我的最终解决方案:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import java.util.zip.{ZipEntry, ZipInputStream}
import org.apache.spark.input.PortableDataStream
import scala.collection.mutable
val conf = new SparkConf().setMaster("local").setAppName("ZipToSeq")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var xml_map = new mutable.HashMap[String, String]()
sc.binaryFiles("/user/cloudera/test/2018-06-26.zip").collect
.foreach { zip_file : (String, PortableDataStream) =>
val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
var zip_entry : ZipEntry = null
while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
if (!zip_entry.isDirectory) {
val key_file_name = zip_entry.getName
val value_file_content = scala.io.Source.fromInputStream(zip_stream, "iso-8859-1").getLines.mkString("\n")
xml_map += ( key_file_name -> value_file_content )
}
zip_stream.closeEntry()
}
zip_stream.close()
}
val xml_rdd = sc.parallelize(xml_map.toSeq, 75).saveAsSequenceFile("/user/cloudera/2018_06_26")
原始 zip 文件 325 Mb,包含 170.000 XML 个文件
产生 75 个分区,每个分区大约。 35 兆字节。总共 ~ 2.5 Gb
运行- 在我的 Windows PC 上本地时间:1.2 分钟 :-)
我每天都会收到一个压缩包“2018-06-26.zip”,大小约为。压缩后 250 Mb,包含 165-170.000 个 XML 小文件(Kb)。我将 zip-archive 加载到 HDFS(避免小文件问题),并使用 SPARK 从 zip 中提取它们(zip 不可拆分),制作成对的 RDD,文件名作为键,内容作为值并保存通过成对的 RDD 将它们作为序列文件。一切 运行 都非常顺利,带有一个仅包含 3 个 XML 文件的小型 zip 存档用于测试目的,但是当我向它提供大存档时,我得到
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Arrays.copyOf(Arrays.java:2367)
at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
...
...
我运行正在使用 Cloudera Quickstart VM:CDH 5.13.3(HDFS:2.60,JDK:1.7.0.67,SPARK:1.6.0,Scala 2.10)
我还没有 运行 它在一个成熟的集群上,因为我想在部署它之前确保我的代码是正确的...
垃圾收集器保持 运行ing OOM,但超出了开销限制。我知道增加驱动程序和 Java 堆 Space 的内存量,但我怀疑我的方法占用了太多内存....监控内存使用情况,不会显示任何内容虽然内存泄漏....
代码如下:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import java.util.zip.{ZipEntry, ZipInputStream}
import org.apache.spark.input.PortableDataStream
import scala.collection.mutable
val conf = new SparkConf().setMaster("local").setAppName("ZipToSeq")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var xml_map = new mutable.HashMap[String, String]()
sc.binaryFiles("/user/cloudera/test/2018-06-26.zip", 10).collect
.foreach { zip_file : (String, PortableDataStream) =>
val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
var zip_entry : ZipEntry = null
while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
if (!zip_entry.isDirectory) {
val key_file_name = zip_entry.getName
val value_file_content = scala.io.Source.fromInputStream(zip_stream, "iso-8859-1").getLines.mkString("\n")
xml_map += ( key_file_name -> value_file_content )
}
zip_stream.closeEntry()
}
zip_stream.close()
}
val xml_rdd = sc.parallelize(xml_map.toSeq).saveAsSequenceFile("/user/cloudera/2018_06_26")
非常感谢任何帮助或想法。
我的最终解决方案:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import java.util.zip.{ZipEntry, ZipInputStream}
import org.apache.spark.input.PortableDataStream
import scala.collection.mutable
val conf = new SparkConf().setMaster("local").setAppName("ZipToSeq")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var xml_map = new mutable.HashMap[String, String]()
sc.binaryFiles("/user/cloudera/test/2018-06-26.zip").collect
.foreach { zip_file : (String, PortableDataStream) =>
val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
var zip_entry : ZipEntry = null
while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
if (!zip_entry.isDirectory) {
val key_file_name = zip_entry.getName
val value_file_content = scala.io.Source.fromInputStream(zip_stream, "iso-8859-1").getLines.mkString("\n")
xml_map += ( key_file_name -> value_file_content )
}
zip_stream.closeEntry()
}
zip_stream.close()
}
val xml_rdd = sc.parallelize(xml_map.toSeq, 75).saveAsSequenceFile("/user/cloudera/2018_06_26")
原始 zip 文件 325 Mb,包含 170.000 XML 个文件 产生 75 个分区,每个分区大约。 35 兆字节。总共 ~ 2.5 Gb 运行- 在我的 Windows PC 上本地时间:1.2 分钟 :-)