NestJS:当Kafka消费者失败时如何重试?
NestJS: How to retry when Kafka consumer fails?
可能会出现这样的情况,即使从 Kafka 收到了消息,但由于某些原因(数据库已关闭,webhook 已离线或...)仍然无法处理消息。所以我期望通过抛出 Error
或 RpcException
我可以自动指示消息需要重试。然而它并没有发生,Kafka 认为消息已处理,即使它失败了。
@MessagePattern('hello.world')
readMessage(@Payload() message: any, @Ctx() context: KafkaContext) {
const originalMessage = context.getMessage();
console.log(originalMessage );
throw new RpcException('need to retry');
}
首先您需要为 RpcException
创建一个异常过滤器:
import {
ArgumentsHost,
Catch,
Logger,
RpcExceptionFilter,
} from '@nestjs/common';
import { RpcException } from '@nestjs/microservices';
import { Observable, throwError } from 'rxjs';
@Catch(RpcException)
export class KafkaExceptionFilter implements RpcExceptionFilter<RpcException> {
private readonly logger = new Logger(KafkaExceptionFilter.name);
catch(exception: RpcException, host: ArgumentsHost): Observable<any> {
this.logger.warn(exception);
throw exception.getError();
}
}
然后在您的 kafka 控制器方法上使用 @UseFilters(new KafkaExceptionFilter())
。 请注意,全局添加时它不起作用。
可能会出现这样的情况,即使从 Kafka 收到了消息,但由于某些原因(数据库已关闭,webhook 已离线或...)仍然无法处理消息。所以我期望通过抛出 Error
或 RpcException
我可以自动指示消息需要重试。然而它并没有发生,Kafka 认为消息已处理,即使它失败了。
@MessagePattern('hello.world')
readMessage(@Payload() message: any, @Ctx() context: KafkaContext) {
const originalMessage = context.getMessage();
console.log(originalMessage );
throw new RpcException('need to retry');
}
首先您需要为 RpcException
创建一个异常过滤器:
import {
ArgumentsHost,
Catch,
Logger,
RpcExceptionFilter,
} from '@nestjs/common';
import { RpcException } from '@nestjs/microservices';
import { Observable, throwError } from 'rxjs';
@Catch(RpcException)
export class KafkaExceptionFilter implements RpcExceptionFilter<RpcException> {
private readonly logger = new Logger(KafkaExceptionFilter.name);
catch(exception: RpcException, host: ArgumentsHost): Observable<any> {
this.logger.warn(exception);
throw exception.getError();
}
}
然后在您的 kafka 控制器方法上使用 @UseFilters(new KafkaExceptionFilter())
。 请注意,全局添加时它不起作用。