无法在 express 中正确管理 redis 连接
Unable to manage redis connections properly in express
我正在使用 redis 包,我遇到了很多连接问题,连接突然出现 ECONNREFUSED。
我怀疑是因为我做错了连接管理。
这个项目的问题是我的应用程序将参数发送到 api(ip 和端口),api 必须根据这些值创建连接,获取一些数据,和 return 它。
我有数百台服务器,所以我不知道如何管理所有这些连接。
到目前为止,我是在单个连接中管理它的。这就是我认为它失败的原因。
目前看起来是这样的..
let redisClient;
const killRedis = () => {
redisClient.quit()
console.log("redis client shut down")
}
const createRedisClient = async (port, url) => {
redisClient = require('redis').createClient(port, url, {
no_ready_check: true,
db: 1
})
redisClient.on('error', function (err) {
console.log('Error ' + err);
killRedis();
return undefined;
});
redisClient.on('connect', function () {
console.log('Connected to Redis');
});
return redisClient;
}
module.exports = { createRedisClient, }
有点用,但最终会因连接不时被拒绝而失败。
我在我的路线中像下面这样使用它。
const scanAll = async (port, url) => {
const redisClient = await createRedisClient(port, url)
if (!redisClient) return 500
const scan = promisify(redisClient.scan).bind(redisClient);
const found = [];
let cursor = '0';
do {
const reply = await scan(cursor, 'MATCH', '*');
cursor = reply[0];
found.push(...reply[1]);
} while (cursor !== '0');
return found;
};
/* Return all the users id */
router.post('/user/list', async function (req, res, next) {
const data = await scanAll(req.body.port, req.body.ip);
console.log("data ", data)
if (data === 500) {
res.status(500).json({
error: "Error, server connection refused"
})
}
else if (data.length === 0) {
res.status(204).json(data)
} else {
res.status(200).json(data);
}
})
我怎样才能进行正确的连接管理?
编辑:我的新尝试,但我认为在同时进行 2 个请愿时我的连接溢出
let connections = []
findConnection = (ip, port) => {
let connection = connections.filter(i => i.ip == ip && i.port == port)
console.log("pre")
console.log(connection)
if (connection && connection.connection) {
console.log("opcion1: ", connection.ip)
console.log("connection already exists")
return connection[0].connection
} else {
console.log("opcion2")
console.log(connections)
connections.push({
ip: ip,
port: port,
connection: require('redis').createClient(port, ip, {
no_ready_check: true,
db: 1
})
})
return connections.filter(i => i.ip == ip && i.port == port)[0].connection
}
}
const createRedisClient = async (port, url) => {
let redisClient = findConnection(url, port)
redisClient.on('error', function (err) {
console.log('Error ' + err);
redisClient.quit()
return undefined;
});
redisClient.on('connect', function () {
console.log('Connected to Redis');
});
return redisClient;
}
module.exports = { createRedisClient, }
我注意到我收到以下错误
MaxListenersExceededWarning: Possible EventEmitter memory leak
detected. 11 error listeners added. Use emitter.setMaxListeners() to
increase limit
编辑:最后一个实施问题
My current implementation is the following
let connections = []
const killRedis = (redisClient, ip, port) => {
redisClient.quit()
connections = connections.filter((i) => { return i.ip !== ip && i.port != port })
}
const subscribe = (redisClient, url, port) => {
redisClient.on('error', function (err) {
console.log('Error ' + err);
killRedis(redisClient, url, port)
return undefined;
});
redisClient.on('connect', function () {
console.log('Connected to Redis');
return redisClient;
});
}
findConnection = (ip, port) => {
let connection = connections.filter(i => i.ip == ip && i.port == port)
if (connection && connection.length > 0) {
subscribe(connection[0].connection)
return connection[0].connection
} else {
connections.push({
ip: ip,
port: port,
connection: require('redis').createClient(port, ip, {
no_ready_check: true,
db: 1
})
})
subscribe(connections.filter(i => i.ip == ip && i.port == port)[0].connection, ip, port)
return connections.filter(i => i.ip == ip && i.port == port)[0].connection
}
}
const createRedisClient = async (port, url) => {
let redisClient = findConnection(url, port)
return redisClient
}
module.exports = { createRedisClient }
差不多可以用了,问题是我不知道如何处理
的错误
而且我不确定如何处理错误事件侦听器。如果它失败了,我应该 return 一个未定义的,但似乎它没有这样做,
你可以使用这个 npm 包
npm i connect-redis
您可以构建现有连接的缓存池并重用这些连接,而不是为每个新请求创建一个新的 Redis 连接。这样,您就不会超过每个 Redis 连接的事件侦听器限制。 如果池中尚不存在连接,您只需要创建一个连接即可。
PS - 为简单起见,我当前的缓存池实现为每对主机和端口创建一个连接并存储它们。如果需要,您可以在 Cache Pool
中实施 LRU 缓存以驱逐未使用的 Redis 连接。
这样您应该能够解决您的连接管理问题,因为您将创建一次连接并重新使用它们。
缓存-pool.js
const redis = require('redis');
const Q = require('q');
class CachePool {
constructor() {
this.cachedClients = {};
this.pendingCachedClients = {};
}
getConnection(host, port) {
const deferred = Q.defer();
const connectionId = CachePool.getConnectionId(host, port);
if (this.cachedClients[connectionId]) {
deferred.resolve(this.cachedClients[connectionId]);
} else {
this.cachedClients[connectionId] = redis.createClient({
host,
port,
});
this.cachedClients[connectionId].on('connect', (connection) => {
deferred.resolve(this.cachedClients[connectionId]);
});
this.cachedClients[connectionId].on('error', (err) => {
deferred.reject(err);
});
}
return deferred.promise;
}
static getConnectionId(host, port) {
return `${host}:${port}`;
}
}
module.exports = new CachePool();
cache.js
const { promisify } = require('util');
const CachePool = require('./cache-pool');
class Cache {
constructor(host, port) {
this.host = host;
this.port = port;
}
connect() {
return CachePool.getConnection(this.host, this.port);
}
async scanAll() {
const redisConnection = await this.connect();
const scan = promisify(redisConnection.scan).bind(redisConnection);
const found = [];
let cursor = '0';
do {
const reply = await scan(cursor, 'MATCH', '*');
cursor = reply[0];
found.push(...reply[1]);
} while (cursor !== '0');
return found;
}
}
module.exports = Cache;
测试-sever.js
const express = require('express');
const Cache = require('./cache');
const app = express();
app.use(express.json({ type: '*/json' }));
const port = 3000;
/* Return all the users id */
app.post('/user/list', async function (req, res, next) {
const redisClient = new Cache(req.body.ip, req.body.port);
const data = await redisClient.scanAll();
if (data === 500) {
res.status(500).json({
error: "Error, server connection refused"
})
}
else if (data.length === 0) {
res.status(204).json(data)
} else {
res.status(200).json(data);
}
})
app.listen(port, () => {
console.log(`Example app listening at http://localhost:${port}`)
});
单连接方案或者缓存池方案都可以。您在使用这两种解决方案时遇到了一些问题。
- 单连接解决方案:ECONNREFUSED
我猜是redis maxclient
到达时触发的错误,因为你的单连接解决方案在处理每个请求时没有关闭客户端。因此,当未达到 maxclient
限制但以 ECONNREFUSED 结束时,它会起作用。
你可以尝试解决,在request前加上killRedis
即可return.
const scanAll = async (port, url) => {
const redisClient = await createRedisClient(port, url)
if (!redisClient) return 500
const scan = promisify(redisClient.scan).bind(redisClient);
const found = [];
let cursor = '0';
do {
const reply = await scan(cursor, 'MATCH', '*');
cursor = reply[0];
found.push(...reply[1]);
} while (cursor !== '0');
killRedis(); // add code here.
return found;
};
- 缓存池解决方案:MaxListenersExceededWarning
参考答案
对于每个请求,你在createRedisClient
函数中运行redisClient.on()
两次,你做了两次订阅,然后添加了两个新的监听器。默认限制为 10。由于缺少取消订阅,它将以 MaxListenersExceededWarning 结束。解决方案是将所有 redisClient.on
代码从 createRedisClient
函数移动到 findConnection
函数,仅在创建连接时订阅两次,与用户请求无关。
- 添加评论码
findConnection = (ip, port) => {
let connection = connections.filter(i => i.ip == ip && i.port == port)
if (connection && connection.length > 0) {
// subscribe(connection[0].connection) // should be deleted
return connection[0].connection
} else {
connections.push({
ip: ip,
port: port,
connection: require('redis').createClient(port, ip, {
no_ready_check: true,
db: 1
})
})
subscribe(connections.filter(i => i.ip == ip && i.port == port)[0].connection, ip, port)
return connections.filter(i => i.ip == ip && i.port == port)[0].connection
}
}
我的最终实现按预期工作
let connections = []
const killRedis = (redisClient, ip, port) => {
redisClient.quit()
connections = connections.filter((i) => { return i.ip !== ip && i.port != port })
}
const subscribe = (redisClient, url, port) => new Promise((resolve, reject) => {
redisClient.on('error', function (err) {
killRedis(redisClient, url, port)
reject(err)
});
redisClient.on('connect', function () {
resolve(redisClient)
});
})
findConnection = async (ip, port) => {
let connection = connections.filter(i => i.ip == ip && i.port == port)
if (connection && connection.length > 0) {
return connection[0].connection
} else {
try {
let newConnection = require('redis').createClient(port, ip, {
no_ready_check: true,
db: 1
})
const client = await subscribe(newConnection, ip, port)
connections.push({
ip: ip,
port: port,
connection: newConnection
})
return client
} catch (error) {
return undefined
}
}
}
const createRedisClient = async (port, url) => {
let redisClient = await findConnection(url, port)
return redisClient
}
module.exports = { createRedisClient }
我正在使用 redis 包,我遇到了很多连接问题,连接突然出现 ECONNREFUSED。
我怀疑是因为我做错了连接管理。
这个项目的问题是我的应用程序将参数发送到 api(ip 和端口),api 必须根据这些值创建连接,获取一些数据,和 return 它。 我有数百台服务器,所以我不知道如何管理所有这些连接。
到目前为止,我是在单个连接中管理它的。这就是我认为它失败的原因。
目前看起来是这样的..
let redisClient;
const killRedis = () => {
redisClient.quit()
console.log("redis client shut down")
}
const createRedisClient = async (port, url) => {
redisClient = require('redis').createClient(port, url, {
no_ready_check: true,
db: 1
})
redisClient.on('error', function (err) {
console.log('Error ' + err);
killRedis();
return undefined;
});
redisClient.on('connect', function () {
console.log('Connected to Redis');
});
return redisClient;
}
module.exports = { createRedisClient, }
有点用,但最终会因连接不时被拒绝而失败。
我在我的路线中像下面这样使用它。
const scanAll = async (port, url) => {
const redisClient = await createRedisClient(port, url)
if (!redisClient) return 500
const scan = promisify(redisClient.scan).bind(redisClient);
const found = [];
let cursor = '0';
do {
const reply = await scan(cursor, 'MATCH', '*');
cursor = reply[0];
found.push(...reply[1]);
} while (cursor !== '0');
return found;
};
/* Return all the users id */
router.post('/user/list', async function (req, res, next) {
const data = await scanAll(req.body.port, req.body.ip);
console.log("data ", data)
if (data === 500) {
res.status(500).json({
error: "Error, server connection refused"
})
}
else if (data.length === 0) {
res.status(204).json(data)
} else {
res.status(200).json(data);
}
})
我怎样才能进行正确的连接管理?
编辑:我的新尝试,但我认为在同时进行 2 个请愿时我的连接溢出
let connections = []
findConnection = (ip, port) => {
let connection = connections.filter(i => i.ip == ip && i.port == port)
console.log("pre")
console.log(connection)
if (connection && connection.connection) {
console.log("opcion1: ", connection.ip)
console.log("connection already exists")
return connection[0].connection
} else {
console.log("opcion2")
console.log(connections)
connections.push({
ip: ip,
port: port,
connection: require('redis').createClient(port, ip, {
no_ready_check: true,
db: 1
})
})
return connections.filter(i => i.ip == ip && i.port == port)[0].connection
}
}
const createRedisClient = async (port, url) => {
let redisClient = findConnection(url, port)
redisClient.on('error', function (err) {
console.log('Error ' + err);
redisClient.quit()
return undefined;
});
redisClient.on('connect', function () {
console.log('Connected to Redis');
});
return redisClient;
}
module.exports = { createRedisClient, }
我注意到我收到以下错误
MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 error listeners added. Use emitter.setMaxListeners() to increase limit
编辑:最后一个实施问题
My current implementation is the following
let connections = []
const killRedis = (redisClient, ip, port) => {
redisClient.quit()
connections = connections.filter((i) => { return i.ip !== ip && i.port != port })
}
const subscribe = (redisClient, url, port) => {
redisClient.on('error', function (err) {
console.log('Error ' + err);
killRedis(redisClient, url, port)
return undefined;
});
redisClient.on('connect', function () {
console.log('Connected to Redis');
return redisClient;
});
}
findConnection = (ip, port) => {
let connection = connections.filter(i => i.ip == ip && i.port == port)
if (connection && connection.length > 0) {
subscribe(connection[0].connection)
return connection[0].connection
} else {
connections.push({
ip: ip,
port: port,
connection: require('redis').createClient(port, ip, {
no_ready_check: true,
db: 1
})
})
subscribe(connections.filter(i => i.ip == ip && i.port == port)[0].connection, ip, port)
return connections.filter(i => i.ip == ip && i.port == port)[0].connection
}
}
const createRedisClient = async (port, url) => {
let redisClient = findConnection(url, port)
return redisClient
}
module.exports = { createRedisClient }
差不多可以用了,问题是我不知道如何处理
的错误而且我不确定如何处理错误事件侦听器。如果它失败了,我应该 return 一个未定义的,但似乎它没有这样做,
你可以使用这个 npm 包
npm i connect-redis
您可以构建现有连接的缓存池并重用这些连接,而不是为每个新请求创建一个新的 Redis 连接。这样,您就不会超过每个 Redis 连接的事件侦听器限制。 如果池中尚不存在连接,您只需要创建一个连接即可。
PS - 为简单起见,我当前的缓存池实现为每对主机和端口创建一个连接并存储它们。如果需要,您可以在 Cache Pool
中实施 LRU 缓存以驱逐未使用的 Redis 连接。
这样您应该能够解决您的连接管理问题,因为您将创建一次连接并重新使用它们。
缓存-pool.js
const redis = require('redis');
const Q = require('q');
class CachePool {
constructor() {
this.cachedClients = {};
this.pendingCachedClients = {};
}
getConnection(host, port) {
const deferred = Q.defer();
const connectionId = CachePool.getConnectionId(host, port);
if (this.cachedClients[connectionId]) {
deferred.resolve(this.cachedClients[connectionId]);
} else {
this.cachedClients[connectionId] = redis.createClient({
host,
port,
});
this.cachedClients[connectionId].on('connect', (connection) => {
deferred.resolve(this.cachedClients[connectionId]);
});
this.cachedClients[connectionId].on('error', (err) => {
deferred.reject(err);
});
}
return deferred.promise;
}
static getConnectionId(host, port) {
return `${host}:${port}`;
}
}
module.exports = new CachePool();
cache.js
const { promisify } = require('util');
const CachePool = require('./cache-pool');
class Cache {
constructor(host, port) {
this.host = host;
this.port = port;
}
connect() {
return CachePool.getConnection(this.host, this.port);
}
async scanAll() {
const redisConnection = await this.connect();
const scan = promisify(redisConnection.scan).bind(redisConnection);
const found = [];
let cursor = '0';
do {
const reply = await scan(cursor, 'MATCH', '*');
cursor = reply[0];
found.push(...reply[1]);
} while (cursor !== '0');
return found;
}
}
module.exports = Cache;
测试-sever.js
const express = require('express');
const Cache = require('./cache');
const app = express();
app.use(express.json({ type: '*/json' }));
const port = 3000;
/* Return all the users id */
app.post('/user/list', async function (req, res, next) {
const redisClient = new Cache(req.body.ip, req.body.port);
const data = await redisClient.scanAll();
if (data === 500) {
res.status(500).json({
error: "Error, server connection refused"
})
}
else if (data.length === 0) {
res.status(204).json(data)
} else {
res.status(200).json(data);
}
})
app.listen(port, () => {
console.log(`Example app listening at http://localhost:${port}`)
});
单连接方案或者缓存池方案都可以。您在使用这两种解决方案时遇到了一些问题。
- 单连接解决方案:ECONNREFUSED
我猜是redis maxclient
到达时触发的错误,因为你的单连接解决方案在处理每个请求时没有关闭客户端。因此,当未达到 maxclient
限制但以 ECONNREFUSED 结束时,它会起作用。
你可以尝试解决,在request前加上killRedis
即可return.
const scanAll = async (port, url) => {
const redisClient = await createRedisClient(port, url)
if (!redisClient) return 500
const scan = promisify(redisClient.scan).bind(redisClient);
const found = [];
let cursor = '0';
do {
const reply = await scan(cursor, 'MATCH', '*');
cursor = reply[0];
found.push(...reply[1]);
} while (cursor !== '0');
killRedis(); // add code here.
return found;
};
- 缓存池解决方案:MaxListenersExceededWarning
参考答案
对于每个请求,你在createRedisClient
函数中运行redisClient.on()
两次,你做了两次订阅,然后添加了两个新的监听器。默认限制为 10。由于缺少取消订阅,它将以 MaxListenersExceededWarning 结束。解决方案是将所有 redisClient.on
代码从 createRedisClient
函数移动到 findConnection
函数,仅在创建连接时订阅两次,与用户请求无关。
- 添加评论码
findConnection = (ip, port) => {
let connection = connections.filter(i => i.ip == ip && i.port == port)
if (connection && connection.length > 0) {
// subscribe(connection[0].connection) // should be deleted
return connection[0].connection
} else {
connections.push({
ip: ip,
port: port,
connection: require('redis').createClient(port, ip, {
no_ready_check: true,
db: 1
})
})
subscribe(connections.filter(i => i.ip == ip && i.port == port)[0].connection, ip, port)
return connections.filter(i => i.ip == ip && i.port == port)[0].connection
}
}
我的最终实现按预期工作
let connections = []
const killRedis = (redisClient, ip, port) => {
redisClient.quit()
connections = connections.filter((i) => { return i.ip !== ip && i.port != port })
}
const subscribe = (redisClient, url, port) => new Promise((resolve, reject) => {
redisClient.on('error', function (err) {
killRedis(redisClient, url, port)
reject(err)
});
redisClient.on('connect', function () {
resolve(redisClient)
});
})
findConnection = async (ip, port) => {
let connection = connections.filter(i => i.ip == ip && i.port == port)
if (connection && connection.length > 0) {
return connection[0].connection
} else {
try {
let newConnection = require('redis').createClient(port, ip, {
no_ready_check: true,
db: 1
})
const client = await subscribe(newConnection, ip, port)
connections.push({
ip: ip,
port: port,
connection: newConnection
})
return client
} catch (error) {
return undefined
}
}
}
const createRedisClient = async (port, url) => {
let redisClient = await findConnection(url, port)
return redisClient
}
module.exports = { createRedisClient }