如何使用状态机通过 MassTransit 将消费者连接到 Kafka 主题

How to connect consumer to Kafka topic with MassTransit using state machine

当有两个服务时:

服务 1 托管状态机并生成有关主题的消息。服务 2 应使用此消息。如何正确设置服务 2 以使用消息?

当代码变成这样时,它不起作用:

services.AddMassTransit(mt =>
{
    mt.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context, SnakeCaseEndpointNameFormatter.Instance));

    mt.AddRider(rider =>
    {
        rider.UsingKafka((ctx, kafka) =>
        {
            kafka.Host("kafka_url");
        });

        rider.AddConsumer<OrderConsumer>()
            .Endpoint(e =>
            {
                e.Name = "queue_name";
                e.Temporary = false;
                e.ConcurrentMessageLimit = 8;
            });
    });
});

当我这样做时,它会抛出 System.ArgumentException: 'The consumer type was not found: OrderConsumer'

services.AddMassTransit(mt =>
{
    mt.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context, SnakeCaseEndpointNameFormatter.Instance));

    mt.AddRider(rider =>
    {
        rider.UsingKafka((ctx, kafka) =>
        {
            kafka.Host("kafka_url");

            kafka.TopicEndpoint<Null, OrderMessage>("queue_name", "group_id", cfg =>
            {
                cfg.AutoOffsetReset = AutoOffsetReset.Earliest;
                cfg.ConfigureConsumer<OrderConsumer>(ctx);
            });
        });
    });
});

您需要混合使用您发布的两个示例:

services.AddMassTransit(mt =>
{
    mt.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context, SnakeCaseEndpointNameFormatter.Instance));

    mt.AddRider(rider =>
    {
        rider.AddConsumer<OrderConsumer>()
        
        rider.UsingKafka((ctx, kafka) =>
        {
            kafka.Host("kafka_url");

            kafka.TopicEndpoint<Null, OrderMessage>("queue_name", "group_id", cfg =>
            {
                cfg.AutoOffsetReset = AutoOffsetReset.Earliest;
                cfg.ConfigureConsumer<OrderConsumer>(ctx);
            });
        });
    });
});