将大量对象传递给 RabbitMQ 交换

Pass large array of objects to RabbitMQ exchange

我从外部来源接收到大量对象(大约超过 10 000 个对象)。然后我将它传递给交换,以便通知其他微服务有关要处理的新条目。

this._rmqClient.publishToExchange({
    exchange: 'my-exchange',
    exchangeOptions: {
        type: 'fanout',
        durable: true,
    },
    data: myData, // [object1, object2, object3, ...]
    pattern: 'myPattern',
})

问题是将这么大的消息推送到交换器是不好的做法,我想解决这个问题。我已阅读相关文章和 Whosebug 帖子以查找有关流数据的代码示例或信息,但没有成功。

我发现的唯一方法是将我的大数组分成块并发布每个块以使用 for ... loop 进行交换。这是好的做法吗?如何确定每个块(对象数)的长度?或者还有其他方法吗?

这实际上取决于 Object 大小。这件事您必须自己弄清楚。获取 10k 个对象并计算出它们的平均大小(将它们作为 json 放入文件中并取 fileSize/10'000 就是这样。也许请求主体大小为 50-100kb 是一件好事?但这仍然是取决于你..

从数字 50 开始并进行测试。检查花费的时间、带宽和一切有意义的东西。在 1-5000 和 test test test 之间更改块大小。在某些时候,您会感觉到什么数字比较好! .

下面是一些遍历元素的示例代码:


// send function for show caseing the idea.
function send(data) {
    return this._rmqClient.publishToExchange({
        exchange: 'my-exchange',
        exchangeOptions: {
            type: 'fanout',
            durable: true,
        },
        data: data,
        pattern: 'myPattern',
    })
}

// this sends chunks one by one.. 
async function sendLargeDataPacket(data, chunkSize) {

    // Pure functions do prevent headache
    const mutated = [...data]

    // send full packages aslong as possible!.
    while (mutated.length >= chunkSize) {

        // send a packet of chunkSize length
        await send(mutated.splice(0, chunkSize))

    }

    // send the remaining elements if there are any!
    if(mutated.length > 0) {
        await send(mutated)
    }

}

你会这样称呼它:

// that's your 10k+ items array!.
var myData = [/**...**/]

// let's start with 50, but try out all numbers!.
const chunkSize = 50
sendLargeDataPacket(chunkSize).then(() => console.log('done')).catch(console.error)

此方法一个接一个地发送数据包,并且可能需要一些时间,因为它不是并行完成的。我不知道您的要求,但如果您需要,我可以帮助您编写并行方法..