内存分配失败 - JavaScript 堆内存不足
Memory allocation failed - JavaScript heap out of memory
我的 Node.js 代码遇到内存泄漏问题。我正在尝试流式读取一个包含 10 万行的 CSV(link 中的示例文件)文件,并处理文件中的每个条目。一段时间后,该进程因内存分配错误而中断。
"FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory"
我的代码示例
const fs = require('fs');
const config = require('../config/config');
const csv = require('csv-parser');
const tls = require('../services/tls');
processCSV('crawler', 'sample-csv.csv');
流式处理包含 100k 个条目的 csv 文件
async function processCSV (jobName, fileName) {
return new Promise((resolve, reject) => {
let filePath = config.api.basePath + fileName;
fs.createReadStream(filePath)
.on('error', () => {
// handle error
console.log('error processing csv');
reject();
})
.pipe(csv())
.on('data', (row) => {
createJob(jobName, row);
})
.on('end', () => {
// handle end of CSV
console.log('Finished processing csv');
resolve(filePath);
})
});
}
验证 csv 文件中的每个 url
async function createJob (name, data) {
let {hostname, port, ip} = data;
let protocol = 'https';
if (port === 80) {
protocol = 'http';
}
let url = protocol + '://' + hostname;
try {
await tls.getHostData(url); // call an external api to get details of hostname
return url;
} catch (error) {
return error;
}
}
我不知道哪个函数导致了内存泄漏。
在我看来,您正在为 CSV 文件中的每一行调用 createJob()
,并且您可能导致这些作业中的每一项同时在进程中和内存中。这会耗尽系统资源,尤其是当文件中有很多行时。
解决这个问题的一个想法是调整代码,以便只有 N createJob()
操作同时是 "in-flight"。这是一种方法,当您同时处理的请求达到最大数量时暂停流,然后在有更多空间时恢复它:
async function processCSV (jobName, fileName) {
return new Promise((resolve, reject) => {
let filePath = config.api.basePath + fileName;
let numConcurrent = 0;
let paused = false;
const maxConcurrent = 10;
let stream = fs.createReadStream(filePath)
.on('error', (err) => {
// handle error
console.log('error processing csv');
reject(err);
})
.pipe(csv())
.on('data', (row) => {
function checkResume() {
--numConcurrent;
if (paused && numConcurrent < maxConcurrent) {
// restart the stream, there's room for more
paused = false;
stream.resume();
}
}
++numConcurrent;
createJob(jobName, row).then(checkResume, checkResume);
if (numConcurrent >= maxConcurrent) {
// pause the stream because we have max number of operations going
stream.pause();
paused = true;
}
})
.on('end', () => {
// handle end of CSV
console.log('Finished processing csv');
resolve(filePath);
})
});
}
async function createJob (name, data) {
let {hostname, port, ip} = data;
let protocol = 'https';
if (port === 80) {
protocol = 'http';
}
let url = protocol + '://' + hostname;
try {
await tls.getHostData(url); // call an external api to get details of hostname
return url;
} catch (error) {
// make sure returned promise is rejected
throw error;
}
}
注意:如果在处理给定行时出现错误,此实现(如您在问题中展示的那个)将继续运行。该行为可以根据需要进行更改。
我的 Node.js 代码遇到内存泄漏问题。我正在尝试流式读取一个包含 10 万行的 CSV(link 中的示例文件)文件,并处理文件中的每个条目。一段时间后,该进程因内存分配错误而中断。
"FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory"
我的代码示例
const fs = require('fs');
const config = require('../config/config');
const csv = require('csv-parser');
const tls = require('../services/tls');
processCSV('crawler', 'sample-csv.csv');
流式处理包含 100k 个条目的 csv 文件
async function processCSV (jobName, fileName) {
return new Promise((resolve, reject) => {
let filePath = config.api.basePath + fileName;
fs.createReadStream(filePath)
.on('error', () => {
// handle error
console.log('error processing csv');
reject();
})
.pipe(csv())
.on('data', (row) => {
createJob(jobName, row);
})
.on('end', () => {
// handle end of CSV
console.log('Finished processing csv');
resolve(filePath);
})
});
}
验证 csv 文件中的每个 url
async function createJob (name, data) {
let {hostname, port, ip} = data;
let protocol = 'https';
if (port === 80) {
protocol = 'http';
}
let url = protocol + '://' + hostname;
try {
await tls.getHostData(url); // call an external api to get details of hostname
return url;
} catch (error) {
return error;
}
}
我不知道哪个函数导致了内存泄漏。
在我看来,您正在为 CSV 文件中的每一行调用 createJob()
,并且您可能导致这些作业中的每一项同时在进程中和内存中。这会耗尽系统资源,尤其是当文件中有很多行时。
解决这个问题的一个想法是调整代码,以便只有 N createJob()
操作同时是 "in-flight"。这是一种方法,当您同时处理的请求达到最大数量时暂停流,然后在有更多空间时恢复它:
async function processCSV (jobName, fileName) {
return new Promise((resolve, reject) => {
let filePath = config.api.basePath + fileName;
let numConcurrent = 0;
let paused = false;
const maxConcurrent = 10;
let stream = fs.createReadStream(filePath)
.on('error', (err) => {
// handle error
console.log('error processing csv');
reject(err);
})
.pipe(csv())
.on('data', (row) => {
function checkResume() {
--numConcurrent;
if (paused && numConcurrent < maxConcurrent) {
// restart the stream, there's room for more
paused = false;
stream.resume();
}
}
++numConcurrent;
createJob(jobName, row).then(checkResume, checkResume);
if (numConcurrent >= maxConcurrent) {
// pause the stream because we have max number of operations going
stream.pause();
paused = true;
}
})
.on('end', () => {
// handle end of CSV
console.log('Finished processing csv');
resolve(filePath);
})
});
}
async function createJob (name, data) {
let {hostname, port, ip} = data;
let protocol = 'https';
if (port === 80) {
protocol = 'http';
}
let url = protocol + '://' + hostname;
try {
await tls.getHostData(url); // call an external api to get details of hostname
return url;
} catch (error) {
// make sure returned promise is rejected
throw error;
}
}
注意:如果在处理给定行时出现错误,此实现(如您在问题中展示的那个)将继续运行。该行为可以根据需要进行更改。