内存分配失败 - JavaScript 堆内存不足

Memory allocation failed - JavaScript heap out of memory

我的 Node.js 代码遇到内存泄漏问题。我正在尝试流式读取一个包含 10 万行的 CSV(link 中的示例文件)文件,并处理文件中的每个条目。一段时间后,该进程因内存分配错误而中断。

"FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory"

sample csv:

我的代码示例

const fs = require('fs');
const config = require('../config/config');
const csv = require('csv-parser');
const tls = require('../services/tls');

processCSV('crawler', 'sample-csv.csv');

流式处理包含 100k 个条目的 csv 文件

async function processCSV (jobName, fileName) {
  return new Promise((resolve, reject) => {
    let filePath = config.api.basePath + fileName;
    fs.createReadStream(filePath)
        .on('error', () => {
          // handle error
          console.log('error processing csv');
          reject();

        })
        .pipe(csv())
        .on('data', (row) => {
          createJob(jobName, row);
        })
        .on('end', () => {
          // handle end of CSV
          console.log('Finished processing csv');
          resolve(filePath);
        })
  });
}

验证 csv 文件中的每个 url

async function createJob (name, data) {
  let {hostname, port, ip} = data;
  let protocol = 'https';
  if (port === 80) {
    protocol = 'http';
  }
  let url = protocol + '://' + hostname;
  try {
    await tls.getHostData(url); // call an external api to get details of hostname
    return url;
  } catch (error) {
    return error;
  }
}

我不知道哪个函数导致了内存泄漏。

在我看来,您正在为 CSV 文件中的每一行调用 createJob(),并且您可能导致这些作业中的每一项同时在进程中和内存中。这会耗尽系统资源,尤其是当文件中有很多行时。

解决这个问题的一个想法是调整代码,以便只有 N createJob() 操作同时是 "in-flight"。这是一种方法,当您同时处理的请求达到最大数量时暂停流,然后在有更多空间时恢复它:

async function processCSV (jobName, fileName) {
  return new Promise((resolve, reject) => {
    let filePath = config.api.basePath + fileName;
    let numConcurrent = 0;
    let paused = false;
    const maxConcurrent = 10;
    let stream = fs.createReadStream(filePath)
        .on('error', (err) => {
          // handle error
          console.log('error processing csv');
          reject(err);

        })
        .pipe(csv())
        .on('data', (row) => {

          function checkResume() {
              --numConcurrent;
              if (paused && numConcurrent < maxConcurrent) {
                  // restart the stream, there's room for more
                  paused = false;
                  stream.resume();
              }
          }
          ++numConcurrent;
          createJob(jobName, row).then(checkResume, checkResume);
          if (numConcurrent >= maxConcurrent) {
              // pause the stream because we have max number of operations going
              stream.pause();
              paused = true;
          }
        })
        .on('end', () => {
          // handle end of CSV
          console.log('Finished processing csv');
          resolve(filePath);
        })
  });
}


async function createJob (name, data) {
  let {hostname, port, ip} = data;
  let protocol = 'https';
  if (port === 80) {
    protocol = 'http';
  }
  let url = protocol + '://' + hostname;
  try {
    await tls.getHostData(url); // call an external api to get details of hostname
    return url;
  } catch (error) {
    // make sure returned promise is rejected
    throw error;
  }
}

注意:如果在处理给定行时出现错误,此实现(如您在问题中展示的那个)将继续运行。该行为可以根据需要进行更改。