Spark Map Task 内存消耗巨大

Huge memory consumption in Map Task in Spark

我有很多包含大约 60.000.000 行的文件。我所有文件的格式都是 {timestamp}#{producer}#{messageId}#{data_bytes}\n

我一个一个地浏览我的文件,还想为每个输入文件构建一个输出文件。 因为有些台词依赖于以前的台词,所以我将它们按制作人分组。每当一行依赖于一个或多个先前的行时,它们的生产者总是相同的。 在对所有行进行分组后,我将它们交给我的 Java 解析器。 然后解析器会将所有已解析的数据对象包含在内存中,然后将其输出为 JSON。

为了可视化我认为我的工作是如何处理的,我拼凑了以下“流程图”。请注意,我没有想象 groupByKey-Shuffeling-Process。

我的问题:

我的问题:

  1. 如何确保 Spark 会为我的地图任务中的每个文件拆分创建一个结果? (如果我的任务成功,也许他们会,但我现在永远不会看到输出。)

  2. 我认为我的 map 转换 val lineMap = lines.map ...(见下面的 Scala 代码)产生了一个分区的 rdd。因此,我希望在调用我的第二个地图任务之前以某种方式拆分 rdd 的值。

    此外,我认为在这个 rdd lineMap 上调用 saveAsTextFile 会在我的每个地图任务完成后产生一个 运行s 的输出任务。如果我的假设是正确的,为什么我的执行者仍然 运行 内存不足? Spark 是否正在执行多个(太大)大文件拆分并同时处理它们,从而导致解析器填满内存?

  3. 重新分区 lineMap rdd 以便为我的解析器获取更多(更小)输入是个好主意吗?

  4. 有没有我不知道的额外减速器步骤?喜欢在写入文件或类似文件之前汇总结果?


Scala 代码(我省略了不相关的代码部分):

def main(args: Array[String]) {
    val inputFilePath = args(0)
    val outputFilePath = args(1)

    val inputFiles = fs.listStatus(new Path(inputFilePath))
    inputFiles.foreach( filename => {
        processData(filename.getPath, ...)
    }) 
}


def processData(filePath: Path, ...) {
    val lines  = sc.textFile(filePath.toString())
    val lineMap = lines.map(line => (line.split(" ")(1), line)).groupByKey()

    val parsedLines = lineMap.map{ case(key, values) => parseLinesByKey(key, values, config) }
    //each output should be saved separately
    parsedLines.saveAsTextFile(outputFilePath.toString() + "/" + filePath.getName)     
}


def parseLinesByKey(key: String, values: Iterable[String], config : Config) = {
    val importer = new LogFileImporter(...)
    importer.parseData(values.toIterator.asJava, ...)

    //importer from now contains all parsed data objects in memory that could be parsed 
    //from the given values.  

    val jsonMapper = getJsonMapper(...)
    val jsonStringData = jsonMapper.getValueFromString(importer.getDataObject)

    (key, jsonStringData)
}

我通过删除 groupByKey 调用并实施新的 FileInputFormat 以及 RecordReader 来解决此问题,以消除行依赖于其他行的限制。现在,我实现了它,以便每个拆分都包含前一个拆分的 50.000 字节开销。这将确保可以正确解析依赖于先前行的所有行。

我现在继续查看之前拆分的最后 50.000 字节,但只复制实际影响当前拆分解析的行。因此,我最大限度地减少了开销,并且仍然获得了高度并行化的任务。

以下链接将我拖向了正确的方向。因为 FileInputFormat/RecordReader 的主题乍一看很复杂(至少对我来说是这样),所以最好通读这些文章并了解这是否适合您的问题:

ae.be文章中的相关代码部分,以防网站出现故障。作者(@Gurdt)使用它来检测聊天消息是否包含转义行 return(通过以“\”结尾的行)并将转义行附加在一起,直到找到未转义的 \n .这将允许他检索跨越两行或更多行的消息。用 Scala 编写的代码:

用法

val conf = new Configuration(sparkContext.hadoopConfiguration)
val rdd = sparkContext.newAPIHadoopFile("data.txt", classOf[MyFileInputFormat],
classOf[LongWritable], classOf[Text], conf)

文件输入格式

class MyFileInputFormat extends FileInputFormat[LongWritable, Text] {
    override def createRecordReader(split: InputSplit, context: TaskAttemptContext):
    RecordReader[LongWritable, Text] = new MyRecordReader()
}

记录器

class MyRecordReader() extends RecordReader[LongWritable, Text] {
    var start, end, position = 0L
    var reader: LineReader = null
    var key = new LongWritable
    var value = new Text

    override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
        // split position in data (start one byte earlier to detect if
        // the split starts in the middle of a previous record)
        val split = inputSplit.asInstanceOf[FileSplit]
        start = 0.max(split.getStart - 1)
        end = start + split.getLength

        // open a stream to the data, pointing to the start of the split
        val stream = split.getPath.getFileSystem(context.getConfiguration)
        .open(split.getPath)
        stream.seek(start)
        reader = new LineReader(stream, context.getConfiguration)

        // if the split starts at a newline, we want to start yet another byte
        // earlier to check if the newline was escaped or not
        val firstByte = stream.readByte().toInt
        if(firstByte == '\n')
            start = 0.max(start - 1)
        stream.seek(start)

        if(start != 0)
            skipRemainderFromPreviousSplit(reader)
    }

    def skipRemainderFromPreviousSplit(reader: LineReader): Unit = {
        var readAnotherLine = true
        while(readAnotherLine) {
            // read next line
            val buffer = new Text()
            start += reader.readLine(buffer, Integer.MAX_VALUE, Integer.MAX_VALUE)
            pos = start

            // detect if delimiter was escaped
            readAnotherLine = buffer.getLength >= 1 && // something was read
            buffer.charAt(buffer.getLength - 1) == '\' && // newline was escaped
            pos <= end // seek head hasn't passed the split
        }
    }

    override def nextKeyValue(): Boolean = {
        key.set(pos)

        // read newlines until an unescaped newline is read
        var lastNewlineWasEscaped = false
        while (pos < end || lastNewlineWasEscaped) {
            // read next line
            val buffer = new Text
            pos += reader.readLine(buffer, Integer.MAX_VALUE, Integer.MAX_VALUE)

            // append newly read data to previous data if necessary
            value = if(lastNewlineWasEscaped) new Text(value + "\n" + buffer) else buffer

            // detect if delimiter was escaped
            lastNewlineWasEscaped = buffer.charAt(buffer.getLength - 1) == '\'

            // let Spark know that a key-value pair is ready!
            if(!lastNewlineWasEscaped)
                return true
        }

        // end of split reached?
        return false
    }
}

注意:您可能还需要在 RecordReader 中实现 getCurrentKey、getCurrentValue、close 和 getProgress。