MassTransit:无法接收从 Saga 到 Consumer 的命令 [已修复]
MassTransit: Can not receive command from Saga to Consumer [Fixed]
我有一个工作流从控制器(属于 csproj1
)发布一个事件,它将被 MySaga
(属于 .csproj2
)使用,然后是 Saga已发布的消息,它将发送一个新命令,该命令将从 CommandConsumer
(属于 csproj2
)中使用。
问题:
从 MySaga
.
发送时无法在 CommandConsumer
中接收 命令
我正在使用 Azure Service Bus 和 Consumer Saga
这是我的 startup.cs
,我认为配置不当:
services.AddMassTransit(x =>
{
x.AddConsumersFromNamespaceContaining<CommandConsumer>();
x.AddSaga<MySaga>().InMemoryRepository();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(Configuration["ServiceBus:ConnectionString"]);
cfg.ConfigureEndpoints(context);
});
});
这里是MySaga.cs
//...
public readonly Uri targetEndpoint = new Uri($"queue:{KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(MyCommand))}");
public async Task Consume(ConsumeContext<AceptedEvent> context)
{
var sendEndpoint = await context.GetSendEndpoint(targetEndpoint);
await sendEndpoint.Send<MyCommand>(new
{
CorrelationId,
context.Message.Item1,
context.Message.Item2
});
}
//...
这里是CommandConsumer.cs
public class CommandConsumer: IConsumer<MyCommand>
{
public ILogger<MyCommand> _logger { get; }
public IService _myService { get; }
public MyCommandConsumer(ILogger<MyCommand> logger, Iservice myService)
{
_logger = logger;
_myService = myService;
}
public async Task Consume(ConsumeContext<MyCommand> context)
{
_logger?.LogInformation($"{context.Message.CorrelationId}");
try
{
await _myService.DoSomething();
await context.Publish<AcceptedEvent>(new
{
CorrelationId = context.CorrelationId.Value,
State = "NewState"
});
}
catch (Exception ex)
{
await context.Publish<RefusedEvent>(new
{
CorrelationId = context.CorrelationId.Value,
State = "Failed",
Error = ex.Message
});
throw;
}
}
}
如果您有任何推荐想法,感谢您的分享和挑战!
解决方案:
正如@ChrisPatterson 所提到的,我使用了 Publish
方法,因为我的命令只有一个消费者并且它按预期工作。
不仅如此,我还修复了 ReceiveEndpoint
的配置:
var repository = new InMemorySagaRepository<MySaga>();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(Configuration["ServiceBus:ConnectionString"]);
cfg.ReceiveEndpoint("queue", cfgEndpoint =>
{
cfgEndpoint.Saga<MySaga>(repository);
});
cfg.ReceiveEndpoint(KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(MyCommand)),
cfgEndpoint =>
{
cfgEndpoint.ConfigureConsumer<CommandConsumer>(context);
});
});
生产者正在发送到这个接收端点:
public readonly Uri targetEndpoint =
new Uri($"queue:{KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(MyCommand))}");
根据您的使用方式,最终将成为“我的命令”。
而 ConfigureEndpoints
将使用消费者 class 端点名称,这相当于:
KebabCaseEndpointNameFormatter.Instance.Consumer<CommandConsumer>()
最终成为“命令”——所以发送端点没有发送到消费者的正确地址。
如果那是该命令的唯一使用者,您可以改用 Publish
,这样您就不需要知道使用者的地址。否则,您需要知道正确的目标地址。
我有一个工作流从控制器(属于 csproj1
)发布一个事件,它将被 MySaga
(属于 .csproj2
)使用,然后是 Saga已发布的消息,它将发送一个新命令,该命令将从 CommandConsumer
(属于 csproj2
)中使用。
问题:
从 MySaga
.
CommandConsumer
中接收 命令
我正在使用 Azure Service Bus 和 Consumer Saga
这是我的 startup.cs
,我认为配置不当:
services.AddMassTransit(x =>
{
x.AddConsumersFromNamespaceContaining<CommandConsumer>();
x.AddSaga<MySaga>().InMemoryRepository();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(Configuration["ServiceBus:ConnectionString"]);
cfg.ConfigureEndpoints(context);
});
});
这里是MySaga.cs
//...
public readonly Uri targetEndpoint = new Uri($"queue:{KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(MyCommand))}");
public async Task Consume(ConsumeContext<AceptedEvent> context)
{
var sendEndpoint = await context.GetSendEndpoint(targetEndpoint);
await sendEndpoint.Send<MyCommand>(new
{
CorrelationId,
context.Message.Item1,
context.Message.Item2
});
}
//...
这里是CommandConsumer.cs
public class CommandConsumer: IConsumer<MyCommand>
{
public ILogger<MyCommand> _logger { get; }
public IService _myService { get; }
public MyCommandConsumer(ILogger<MyCommand> logger, Iservice myService)
{
_logger = logger;
_myService = myService;
}
public async Task Consume(ConsumeContext<MyCommand> context)
{
_logger?.LogInformation($"{context.Message.CorrelationId}");
try
{
await _myService.DoSomething();
await context.Publish<AcceptedEvent>(new
{
CorrelationId = context.CorrelationId.Value,
State = "NewState"
});
}
catch (Exception ex)
{
await context.Publish<RefusedEvent>(new
{
CorrelationId = context.CorrelationId.Value,
State = "Failed",
Error = ex.Message
});
throw;
}
}
}
如果您有任何推荐想法,感谢您的分享和挑战!
解决方案:
正如@ChrisPatterson 所提到的,我使用了 Publish
方法,因为我的命令只有一个消费者并且它按预期工作。
不仅如此,我还修复了 ReceiveEndpoint
的配置:
var repository = new InMemorySagaRepository<MySaga>();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(Configuration["ServiceBus:ConnectionString"]);
cfg.ReceiveEndpoint("queue", cfgEndpoint =>
{
cfgEndpoint.Saga<MySaga>(repository);
});
cfg.ReceiveEndpoint(KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(MyCommand)),
cfgEndpoint =>
{
cfgEndpoint.ConfigureConsumer<CommandConsumer>(context);
});
});
生产者正在发送到这个接收端点:
public readonly Uri targetEndpoint =
new Uri($"queue:{KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(MyCommand))}");
根据您的使用方式,最终将成为“我的命令”。
而 ConfigureEndpoints
将使用消费者 class 端点名称,这相当于:
KebabCaseEndpointNameFormatter.Instance.Consumer<CommandConsumer>()
最终成为“命令”——所以发送端点没有发送到消费者的正确地址。
如果那是该命令的唯一使用者,您可以改用 Publish
,这样您就不需要知道使用者的地址。否则,您需要知道正确的目标地址。