在 MassTransit 中使用 TTL 调度 ActiveMQ 消息
Scheduling an ActiveMQ message with a TTL, in MassTransit
我正在使用 MassTransit 安排消息以供稍后传送,使用 ActiveMQ 的计划插件。
var provider = new ActiveMqScheduleMessageProvider(_sendEndpointProvider);
var scheduler = new MessageScheduler(provider, _bus.Topology);
var isPublishAddressFetched = _bus.Topology
.Publish<TMessage>()
.TryGetPublishAddress(_bus.Address, out var publishAddress);
if (!isPublishAddressFetched)
throw new InvalidOperationException("Publish address could not be fetched from " + _bus.Address);
using var combinedCancellationTokenSource = GetCombinedCancellationTokenWithTimeout(cancellationToken);
await scheduler.ScheduleSend(
publishAddress,
delay,
message,
combinedCancellationTokenSource.Token);
工作正常,但我不知道如何在这种情况下指定 TTL(使用 MessageScheduler
)。有什么想法吗?
要在使用消息调度程序时在 SendContext
上设置属性,您可以创建一个执行管道并将其传递给调度程序方法。
Also, the message scheduler already has a built-in SchedulePublish
method, so there is no reason to do all the work above – just call it.
DateTime scheduledTime = DateTime.UtcNow + TimeSpan.FromMinutes(2);
await scheduler.SchedulePublish(scheduledTime, new Message(),
Pipe.Execute<SendContext<Message>>(x => x.TimeToLive = TimeSpan.FromSeconds(30)));
我正在使用 MassTransit 安排消息以供稍后传送,使用 ActiveMQ 的计划插件。
var provider = new ActiveMqScheduleMessageProvider(_sendEndpointProvider);
var scheduler = new MessageScheduler(provider, _bus.Topology);
var isPublishAddressFetched = _bus.Topology
.Publish<TMessage>()
.TryGetPublishAddress(_bus.Address, out var publishAddress);
if (!isPublishAddressFetched)
throw new InvalidOperationException("Publish address could not be fetched from " + _bus.Address);
using var combinedCancellationTokenSource = GetCombinedCancellationTokenWithTimeout(cancellationToken);
await scheduler.ScheduleSend(
publishAddress,
delay,
message,
combinedCancellationTokenSource.Token);
工作正常,但我不知道如何在这种情况下指定 TTL(使用 MessageScheduler
)。有什么想法吗?
要在使用消息调度程序时在 SendContext
上设置属性,您可以创建一个执行管道并将其传递给调度程序方法。
Also, the message scheduler already has a built-in
SchedulePublish
method, so there is no reason to do all the work above – just call it.
DateTime scheduledTime = DateTime.UtcNow + TimeSpan.FromMinutes(2);
await scheduler.SchedulePublish(scheduledTime, new Message(),
Pipe.Execute<SendContext<Message>>(x => x.TimeToLive = TimeSpan.FromSeconds(30)));