Rabbitmq:消费前修改消息体
Rabbitmq: Modify message body before consuming
我是 RabbitMQ 的新手,我想在队列使用消息之前修改它。我有一个应该保持不可触及的交换。客户端接收带有特定路由键的消息。但是有很多,我想在将它们发布到队列之前过滤和更改正文。
Exchange 填充消息如下所示:
{
"_context_domain": "unsuitable",
"_msg_id": "1",
"_context_quota_class": null,
"_context_read_only": false,
"_context_request_id": "1"
}
{
"_context_domain": "suitable",
"_msg_id": "2",
"_context_quota_class": null,
"_context_read_only": false,
"_context_request_id": "2"
}
有没有办法在消费前过滤和修改它们?
例如:
...
channel.queueBind(QUEUE_NAME, "EXCHANGE_NAME", "ROUTNG_KEY");
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
Gson mapper = new Gson();
SomeObject object = (SomeObject) mapper.fromJson(message, SomeObject.class);
if (SomeObject.getContext_domain = "suitable"){
//publish somehow SomeObject.getMsg_id into QUEUE_NAME
}
}
有什么办法吗?
AMQP 支持的交换不允许过滤消息 body。您可以使用 "topic" or "header" exchanges,它可以根据路由键或消息 header.
路由消息
但是其中 none 允许您修改消息本身。如果你想这样做,你需要开发自己的 RabbitMQ 插件来实现交换。
如果你想走那条路,那是很容易做到的。然而,交换并不是完全为此目的而设计的:它们只是路由 tables。如果你这样做,RabbitMQ 中的通道进程将消耗资源来做你想要的任何修改(即交换不是进程,它基本上是 table 中的一行)。这也意味着同一连接上的其他客户端可能会被阻塞,直到通道完成修改并 queue 一条消息。
一个更常见的实现你想要的方法是使用一个消费者来处理所有的消息,在那里做任何你需要的filtering/modifications然后queue在一秒钟内得到结果queue .这第二 queue 将由您的普通工作人员消耗。
我是 RabbitMQ 的新手,我想在队列使用消息之前修改它。我有一个应该保持不可触及的交换。客户端接收带有特定路由键的消息。但是有很多,我想在将它们发布到队列之前过滤和更改正文。
Exchange 填充消息如下所示:
{
"_context_domain": "unsuitable",
"_msg_id": "1",
"_context_quota_class": null,
"_context_read_only": false,
"_context_request_id": "1"
}
{
"_context_domain": "suitable",
"_msg_id": "2",
"_context_quota_class": null,
"_context_read_only": false,
"_context_request_id": "2"
}
有没有办法在消费前过滤和修改它们? 例如:
...
channel.queueBind(QUEUE_NAME, "EXCHANGE_NAME", "ROUTNG_KEY");
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
Gson mapper = new Gson();
SomeObject object = (SomeObject) mapper.fromJson(message, SomeObject.class);
if (SomeObject.getContext_domain = "suitable"){
//publish somehow SomeObject.getMsg_id into QUEUE_NAME
}
}
有什么办法吗?
AMQP 支持的交换不允许过滤消息 body。您可以使用 "topic" or "header" exchanges,它可以根据路由键或消息 header.
路由消息但是其中 none 允许您修改消息本身。如果你想这样做,你需要开发自己的 RabbitMQ 插件来实现交换。
如果你想走那条路,那是很容易做到的。然而,交换并不是完全为此目的而设计的:它们只是路由 tables。如果你这样做,RabbitMQ 中的通道进程将消耗资源来做你想要的任何修改(即交换不是进程,它基本上是 table 中的一行)。这也意味着同一连接上的其他客户端可能会被阻塞,直到通道完成修改并 queue 一条消息。
一个更常见的实现你想要的方法是使用一个消费者来处理所有的消息,在那里做任何你需要的filtering/modifications然后queue在一秒钟内得到结果queue .这第二 queue 将由您的普通工作人员消耗。