MassTransit:多个消费者,分开 queues/endpoints,消息未送达
MassTransit: multiple consumers, separate queues/endpoints, messages not delivered
我设置了 2 个服务来接收相同的消息,例如ConsumerA : IConsumer<IMessageA>
和 ConsumerB : IConsumer<IMessageA>
。每个服务都设置一个唯一的端点,例如queue_a
和 queue_b
并注册其消费者。我在 RabbitMQ 中看到 IMessageA 的交换,它是扇出类型并绑定到 queue_a
和 queue_b
。到目前为止,一切都很好。
- 我运行同时提供服务和发布消息,但只有服务 A 获取它。
- 我停止服务 A 并在 RabbitMQ 中手动向 B 发布一条消息(服务 A 是一个 Web 服务,它发布
IMessageA
以响应使用 IRequestClient<IMessageA, IMessageAResponse>
的 POST,我为什么我需要手动 post),现在服务 B 收到消息并按预期使用它。
明确地说,在服务 A 停止的情况下,RabbitMQ 将消息路由到 queue_a
和 queue_b
。如果服务 A 是 运行ning,则消息 仅 转到 queue_b
,尽管存在交换绑定表明 queue_b
绑定到 IMessageA
交换,绝对 应该 得到它们。或者至少在我能够通过管理 Web UI 检查 RabbitMQ 时,没有证据表明曾经有消息传递到 queue_b
(即 queue_b_error
或 queue_b_skipped
,后者甚至不存在)。
我已将 IReceiveObserver
添加到服务 A 和 B,但没有任何内容触发 ReceiveFault
或 ConsumeFault
。
服务A中的消费者基本上在做:
var result = await MethodThatReturnsIMessageAResponse(messageA);
context.Respond(result);
为什么服务 A 会干扰向服务 B 传递消息?我从哪里开始寻找?
问题是我 "publishing" 使用以下消息:
c.Resolve<IBus>().CreateRequestClient<IToDoMessage, IToDoMessageResponse>(new Uri(QueueAddress + QueueName),TimeSpan.FromSeconds(5));
这需要特定的点(例如队列名称)。相反,我需要使用 CreatePublishRequestClient
:
c.Resolve<IBus>().CreatePublishRequestClient<IToDoMessage, IToDoMessageResponse>(TimeSpan.FromSeconds(5));
它使用总线发布,并通过交换器,而不是特定的队列。 GitHub sample project 显示前者无济于事...
我设置了 2 个服务来接收相同的消息,例如ConsumerA : IConsumer<IMessageA>
和 ConsumerB : IConsumer<IMessageA>
。每个服务都设置一个唯一的端点,例如queue_a
和 queue_b
并注册其消费者。我在 RabbitMQ 中看到 IMessageA 的交换,它是扇出类型并绑定到 queue_a
和 queue_b
。到目前为止,一切都很好。
- 我运行同时提供服务和发布消息,但只有服务 A 获取它。
- 我停止服务 A 并在 RabbitMQ 中手动向 B 发布一条消息(服务 A 是一个 Web 服务,它发布
IMessageA
以响应使用IRequestClient<IMessageA, IMessageAResponse>
的 POST,我为什么我需要手动 post),现在服务 B 收到消息并按预期使用它。
明确地说,在服务 A 停止的情况下,RabbitMQ 将消息路由到 queue_a
和 queue_b
。如果服务 A 是 运行ning,则消息 仅 转到 queue_b
,尽管存在交换绑定表明 queue_b
绑定到 IMessageA
交换,绝对 应该 得到它们。或者至少在我能够通过管理 Web UI 检查 RabbitMQ 时,没有证据表明曾经有消息传递到 queue_b
(即 queue_b_error
或 queue_b_skipped
,后者甚至不存在)。
我已将 IReceiveObserver
添加到服务 A 和 B,但没有任何内容触发 ReceiveFault
或 ConsumeFault
。
服务A中的消费者基本上在做:
var result = await MethodThatReturnsIMessageAResponse(messageA);
context.Respond(result);
为什么服务 A 会干扰向服务 B 传递消息?我从哪里开始寻找?
问题是我 "publishing" 使用以下消息:
c.Resolve<IBus>().CreateRequestClient<IToDoMessage, IToDoMessageResponse>(new Uri(QueueAddress + QueueName),TimeSpan.FromSeconds(5));
这需要特定的点(例如队列名称)。相反,我需要使用 CreatePublishRequestClient
:
c.Resolve<IBus>().CreatePublishRequestClient<IToDoMessage, IToDoMessageResponse>(TimeSpan.FromSeconds(5));
它使用总线发布,并通过交换器,而不是特定的队列。 GitHub sample project 显示前者无济于事...