NestJS:当Kafka消费者失败时如何重试?

NestJS: How to retry when Kafka consumer fails?

可能会出现这样的情况,即使从 Kafka 收到了消息,但由于某些原因(数据库已关闭,webhook 已离线或...)仍然无法处理消息。所以我期望通过抛出 ErrorRpcException 我可以自动指示消息需要重试。然而它并没有发生,Kafka 认为消息已处理,即使它失败了。

  @MessagePattern('hello.world')
  readMessage(@Payload() message: any, @Ctx() context: KafkaContext) {
    const originalMessage = context.getMessage();
    console.log(originalMessage );
    throw new RpcException('need to retry');
  }

首先您需要为 RpcException 创建一个异常过滤器:

import {
  ArgumentsHost,
  Catch,
  Logger,
  RpcExceptionFilter,
} from '@nestjs/common';
import { RpcException } from '@nestjs/microservices';
import { Observable, throwError } from 'rxjs';

@Catch(RpcException)
export class KafkaExceptionFilter implements RpcExceptionFilter<RpcException> {
  private readonly logger = new Logger(KafkaExceptionFilter.name);
  catch(exception: RpcException, host: ArgumentsHost): Observable<any> {
    this.logger.warn(exception);
    throw exception.getError();
  }
}

然后在您的 kafka 控制器方法上使用 @UseFilters(new KafkaExceptionFilter())请注意,全局添加时它不起作用