无法在 express 中正确管理 redis 连接

Unable to manage redis connections properly in express

我正在使用 redis 包,我遇到了很多连接问题,连接突然出现 ECONNREFUSED。

我怀疑是因为我做错了连接管理。

这个项目的问题是我的应用程序将参数发送到 api(ip 和端口),api 必须根据这些值创建连接,获取一些数据,和 return 它。 我有数百台服务器,所以我不知道如何管理所有这些连接。

到目前为止,我是在单个连接中管理它的。这就是我认为它失败的原因。

目前看起来是这样的..

let redisClient;

const killRedis = () => {
    redisClient.quit()
    console.log("redis client shut down")
}


const createRedisClient = async (port, url) => {
    redisClient = require('redis').createClient(port, url, {
        no_ready_check: true,
        db: 1
    })
    redisClient.on('error', function (err) {
        console.log('Error ' + err);
        killRedis();
        return undefined;
    });
    redisClient.on('connect', function () {
        console.log('Connected to Redis');
    });

    return redisClient;
}
module.exports = { createRedisClient, }

有点用,但最终会因连接不时被拒绝而失败。

我在我的路线中像下面这样使用它。

const scanAll = async (port, url) => {
    const redisClient = await createRedisClient(port, url)
    if (!redisClient) return 500
    const scan = promisify(redisClient.scan).bind(redisClient);
    const found = [];
    let cursor = '0';
    do {
        const reply = await scan(cursor, 'MATCH', '*');
        cursor = reply[0];
        found.push(...reply[1]);
    } while (cursor !== '0');
    return found;
};

/* Return all the users id */
router.post('/user/list', async function (req, res, next) {
    const data = await scanAll(req.body.port, req.body.ip);
    console.log("data ", data)
    if (data === 500) {
        res.status(500).json({
            error: "Error, server connection refused"
        })
    }
    else if (data.length === 0) {
        res.status(204).json(data)
    } else {
        res.status(200).json(data);
    }

})

我怎样才能进行正确的连接管理?

编辑:我的新尝试,但我认为在同时进行 2 个请愿时我的连接溢出

let connections = []


findConnection = (ip, port) => {
    let connection = connections.filter(i => i.ip == ip && i.port == port)
    console.log("pre")
    console.log(connection)
    if (connection && connection.connection) {
        console.log("opcion1: ", connection.ip)
        console.log("connection already exists")
        return connection[0].connection
    } else {
        console.log("opcion2")
        console.log(connections)
        connections.push({
            ip: ip,
            port: port,
            connection: require('redis').createClient(port, ip, {
                no_ready_check: true,
                db: 1
            })
        })
        return connections.filter(i => i.ip == ip && i.port == port)[0].connection
    }
}
const createRedisClient = async (port, url) => {
    let redisClient = findConnection(url, port)
    redisClient.on('error', function (err) {
        console.log('Error ' + err);
        redisClient.quit()
        return undefined;
    });
    redisClient.on('connect', function () {
        console.log('Connected to Redis');
    });

    return redisClient;
}
module.exports = { createRedisClient, }

我注意到我收到以下错误

MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 error listeners added. Use emitter.setMaxListeners() to increase limit

编辑:最后一个实施问题

My current implementation is the following

    let connections = []

const killRedis = (redisClient, ip, port) => {
    redisClient.quit()
    connections = connections.filter((i) => { return i.ip !== ip && i.port != port })
}

const subscribe = (redisClient, url, port) => {
    redisClient.on('error', function (err) {
        console.log('Error ' + err);
        killRedis(redisClient, url, port)
        return undefined;
    });
    redisClient.on('connect', function () {
        console.log('Connected to Redis');
        return redisClient;
    });
}

findConnection = (ip, port) => {
    let connection = connections.filter(i => i.ip == ip && i.port == port)

    if (connection && connection.length > 0) {
        subscribe(connection[0].connection)
        return connection[0].connection
    } else {
        connections.push({
            ip: ip,
            port: port,
            connection: require('redis').createClient(port, ip, {
                no_ready_check: true,
                db: 1
            })
        })
        subscribe(connections.filter(i => i.ip == ip && i.port == port)[0].connection, ip, port)
        return connections.filter(i => i.ip == ip && i.port == port)[0].connection
    }
}
const createRedisClient = async (port, url) => {
    let redisClient = findConnection(url, port)
    return redisClient
}
module.exports = { createRedisClient }

差不多可以用了,问题是我不知道如何处理

的错误

而且我不确定如何处理错误事件侦听器。如果它失败了,我应该 return 一个未定义的,但似乎它没有这样做,

你可以使用这个 npm 包

npm i connect-redis

您可以构建现有连接的缓存池并重用这些连接,而不是为每个新请求创建一个新的 Redis 连接。这样,您就不会超过每个 Redis 连接的事件侦听器限制。 如果池中尚不存在连接,您只需要创建一个连接即可。

PS - 为简单起见,我当前的缓存池实现为每对主机和端口创建一个连接并存储它们。如果需要,您可以在 Cache Pool 中实施 LRU 缓存以驱逐未使用的 Redis 连接。

这样您应该能够解决您的连接管理问题,因为您将创建一次连接并重新使用它们。

缓存-pool.js

const redis = require('redis');
const Q = require('q');

class CachePool {
    constructor() {
        this.cachedClients = {};
        this.pendingCachedClients = {};
    }

    getConnection(host, port) {
        const deferred = Q.defer();
        const connectionId = CachePool.getConnectionId(host, port);
        if (this.cachedClients[connectionId]) {
            deferred.resolve(this.cachedClients[connectionId]);
        } else {
            this.cachedClients[connectionId] = redis.createClient({
                host,
                port,
            });
            this.cachedClients[connectionId].on('connect', (connection) => {
                deferred.resolve(this.cachedClients[connectionId]);
            });
            this.cachedClients[connectionId].on('error', (err) => {
                deferred.reject(err);
            });
        }
        return deferred.promise;
    }

    static getConnectionId(host, port) {
        return `${host}:${port}`;
    }
}

module.exports = new CachePool();

cache.js

const { promisify } = require('util');

const CachePool = require('./cache-pool');

class Cache {
    constructor(host, port) {
        this.host = host;
        this.port = port;
    }

    connect() {
        return CachePool.getConnection(this.host, this.port);
    }

    async scanAll() {
        const redisConnection = await this.connect();
        const scan = promisify(redisConnection.scan).bind(redisConnection);
        const found = [];
        let cursor = '0';
        do {
            const reply = await scan(cursor, 'MATCH', '*');
            cursor = reply[0];
            found.push(...reply[1]);
        } while (cursor !== '0');
        return found;
    }
}

module.exports = Cache;

测试-sever.js

const express = require('express');

const Cache = require('./cache');

const app = express();

app.use(express.json({ type: '*/json' }));

const port = 3000;

/* Return all the users id */
app.post('/user/list', async function (req, res, next) {
    const redisClient = new Cache(req.body.ip, req.body.port);
    const data = await redisClient.scanAll();
    if (data === 500) {
        res.status(500).json({
            error: "Error, server connection refused"
        })
    }
    else if (data.length === 0) {
        res.status(204).json(data)
    } else {
        res.status(200).json(data);
    }

})

app.listen(port, () => {
  console.log(`Example app listening at http://localhost:${port}`)
});

单连接方案或者缓存池方案都可以。您在使用这两种解决方案时遇到了一些问题。

  • 单连接解决方​​案:ECONNREFUSED

我猜是redis maxclient到达时触发的错误,因为你的单连接解决方​​案在处理每个请求时没有关闭客户端。因此,当未达到 maxclient 限制但以 ECONNREFUSED 结束时,它会起作用。

你可以尝试解决,在request前加上killRedis即可return.

const scanAll = async (port, url) => {
    const redisClient = await createRedisClient(port, url)
    if (!redisClient) return 500
    const scan = promisify(redisClient.scan).bind(redisClient);
    const found = [];
    let cursor = '0';
    do {
        const reply = await scan(cursor, 'MATCH', '*');
        cursor = reply[0];
        found.push(...reply[1]);
    } while (cursor !== '0');
    killRedis(); // add code here.
    return found;
};
  • 缓存池解决方案:MaxListenersExceededWarning

参考答案

对于每个请求,你在createRedisClient函数中运行redisClient.on()两次,你做了两次订阅,然后添加了两个新的监听器。默认限制为 10。由于缺少取消订阅,它将以 MaxListenersExceededWarning 结束。解决方案是将所有 redisClient.on 代码从 createRedisClient 函数移动到 findConnection 函数,仅在创建连接时订阅两次,与用户请求无关。

  • 添加评论码
findConnection = (ip, port) => {
    let connection = connections.filter(i => i.ip == ip && i.port == port)

    if (connection && connection.length > 0) {
        // subscribe(connection[0].connection) // should be deleted
        return connection[0].connection
    } else {
        connections.push({
            ip: ip,
            port: port,
            connection: require('redis').createClient(port, ip, {
                no_ready_check: true,
                db: 1
            })
        })
        subscribe(connections.filter(i => i.ip == ip && i.port == port)[0].connection, ip, port)
        return connections.filter(i => i.ip == ip && i.port == port)[0].connection
    }
}

我的最终实现按预期工作

let connections = []

const killRedis = (redisClient, ip, port) => {
    redisClient.quit()
    connections = connections.filter((i) => { return i.ip !== ip && i.port != port })
}

const subscribe = (redisClient, url, port) => new Promise((resolve, reject) => {
    redisClient.on('error', function (err) {
        killRedis(redisClient, url, port)
        reject(err)
    });
    redisClient.on('connect', function () {
        resolve(redisClient)
    });
})

findConnection = async (ip, port) => {
    let connection = connections.filter(i => i.ip == ip && i.port == port)

    if (connection && connection.length > 0) {
        return connection[0].connection
    } else {
        try {
            let newConnection = require('redis').createClient(port, ip, {
                no_ready_check: true,
                db: 1
            })
            const client = await subscribe(newConnection, ip, port)
            connections.push({
                ip: ip,
                port: port,
                connection: newConnection
            })
            return client
        } catch (error) {
            return undefined
        }
    }
}
const createRedisClient = async (port, url) => {
    let redisClient = await findConnection(url, port)
    return redisClient
}
module.exports = { createRedisClient }