Webflux 订户
Webflux subscriber
我目前面临一个关于在 switchIfEmpty 函数内保存 redis 的问题。可能与我在响应式编程方面还很陌生,不幸的是我很难找到合适的例子有关。
不过我已经能够解决它,但我很确定有更好的方法来解决它。
这是我现在写的:
public Mono<ResponseEntity<BaseResponse<Content>>> getContent(final String contentId, final String contentType){
return redisRepository.findByKeyAndId(REDIS_KEY_CONTENT, contentId.toString()).cast(Content.class)
.map(contentDTO -> ResponseEntity.status(HttpStatus.OK.value())
.body(new BaseResponse<>(HttpStatus.OK.value(), HttpStatus.OK.getReasonPhrase(), contentDTO)))
//here I have to defer, otherwise It would never wait for the findByKeyAndId
.switchIfEmpty(Mono.defer(() -> {
Mono<ResponseEntity<BaseResponse<Content>>> responseMono = contentService.getContentByIdAndType(contentId, contentType);
//so far I understood I need to consume every stream I have, in order to actually carry out the task otherwise will be there waiting for a consumer.
//once I get what I need from the cmsService I need to put it in cache and till now this is the only way I've been able to do it
responseMono.filter(response -> response.getStatusCodeValue() == HttpStatus.OK.value())
.flatMap(contentResponse -> redisRepository.save(REDIS_KEY_CONTENT, contentId.toString(), contentResponse.getBody().getData()))
.subscribe();
//then I return the Object I firstly retrived thru the cmsService
return responseMono;
}
));
}
关于更好的方法的任何线索或建议?
预先感谢您的帮助!
从最佳实践的角度来看,有些事情并不是很好,而且这可能与您认为的不一样:
- 您正在订阅自己,这通常是出现问题的明显迹象。除了特殊情况,订阅通常应该留给框架。这也是您需要
Mono.defer()
的原因 - 通常,在框架在正确的时间订阅您的发布者之前,什么都不会发生,而您自己管理该发布者的订阅生命周期。
- 框架仍会在那里订阅您的内部发布者,只是您返回的
Mono
不会对其结果做任何事情。因此,您可能会调用 contentService.getContentByIdAndType()
两次,而不仅仅是一次 - 一次是在您订阅时,一次是在框架订阅时。
- 像这样订阅内部发布者会创建一个“即发即弃”类型的模型,这意味着当您的反应式方法 returns 您不知道 Redis 是否真的保存了它,这可能会导致如果您稍后开始依赖该结果,就会出现问题。
- 与上面无关,但是
contentId
已经是一个字符串,你不需要在上面调用toString()
:-)
相反,您可以考虑在 switchIfEmpty()
块中使用 delayUntil
- 如果响应代码正常,这将允许您将值保存到 redis,延迟直到发生这种情况,并在完成后保持原始值。代码可能看起来像这样(如果没有完整的例子很难说这是否完全正确,但它应该给你一个想法):
return redisRepository
.findByKeyAndId(REDIS_KEY_CONTENT, contentId).cast(Content.class)
.map(contentDTO -> ResponseEntity.status(HttpStatus.OK.value()).body(new BaseResponse<>(HttpStatus.OK.value(), HttpStatus.OK.getReasonPhrase(), contentDTO)))
.switchIfEmpty(
contentService.getContentByIdAndType(contentId, contentType)
.delayUntil(response -> response.getStatusCodeValue() == HttpStatus.OK.value() ?
redisRepository.save(REDIS_KEY_CONTENT, contentId, contentResponse.getBody().getData()) :
Mono.empty())
);
我目前面临一个关于在 switchIfEmpty 函数内保存 redis 的问题。可能与我在响应式编程方面还很陌生,不幸的是我很难找到合适的例子有关。
不过我已经能够解决它,但我很确定有更好的方法来解决它。 这是我现在写的:
public Mono<ResponseEntity<BaseResponse<Content>>> getContent(final String contentId, final String contentType){
return redisRepository.findByKeyAndId(REDIS_KEY_CONTENT, contentId.toString()).cast(Content.class)
.map(contentDTO -> ResponseEntity.status(HttpStatus.OK.value())
.body(new BaseResponse<>(HttpStatus.OK.value(), HttpStatus.OK.getReasonPhrase(), contentDTO)))
//here I have to defer, otherwise It would never wait for the findByKeyAndId
.switchIfEmpty(Mono.defer(() -> {
Mono<ResponseEntity<BaseResponse<Content>>> responseMono = contentService.getContentByIdAndType(contentId, contentType);
//so far I understood I need to consume every stream I have, in order to actually carry out the task otherwise will be there waiting for a consumer.
//once I get what I need from the cmsService I need to put it in cache and till now this is the only way I've been able to do it
responseMono.filter(response -> response.getStatusCodeValue() == HttpStatus.OK.value())
.flatMap(contentResponse -> redisRepository.save(REDIS_KEY_CONTENT, contentId.toString(), contentResponse.getBody().getData()))
.subscribe();
//then I return the Object I firstly retrived thru the cmsService
return responseMono;
}
));
}
关于更好的方法的任何线索或建议? 预先感谢您的帮助!
从最佳实践的角度来看,有些事情并不是很好,而且这可能与您认为的不一样:
- 您正在订阅自己,这通常是出现问题的明显迹象。除了特殊情况,订阅通常应该留给框架。这也是您需要
Mono.defer()
的原因 - 通常,在框架在正确的时间订阅您的发布者之前,什么都不会发生,而您自己管理该发布者的订阅生命周期。 - 框架仍会在那里订阅您的内部发布者,只是您返回的
Mono
不会对其结果做任何事情。因此,您可能会调用contentService.getContentByIdAndType()
两次,而不仅仅是一次 - 一次是在您订阅时,一次是在框架订阅时。 - 像这样订阅内部发布者会创建一个“即发即弃”类型的模型,这意味着当您的反应式方法 returns 您不知道 Redis 是否真的保存了它,这可能会导致如果您稍后开始依赖该结果,就会出现问题。
- 与上面无关,但是
contentId
已经是一个字符串,你不需要在上面调用toString()
:-)
相反,您可以考虑在 switchIfEmpty()
块中使用 delayUntil
- 如果响应代码正常,这将允许您将值保存到 redis,延迟直到发生这种情况,并在完成后保持原始值。代码可能看起来像这样(如果没有完整的例子很难说这是否完全正确,但它应该给你一个想法):
return redisRepository
.findByKeyAndId(REDIS_KEY_CONTENT, contentId).cast(Content.class)
.map(contentDTO -> ResponseEntity.status(HttpStatus.OK.value()).body(new BaseResponse<>(HttpStatus.OK.value(), HttpStatus.OK.getReasonPhrase(), contentDTO)))
.switchIfEmpty(
contentService.getContentByIdAndType(contentId, contentType)
.delayUntil(response -> response.getStatusCodeValue() == HttpStatus.OK.value() ?
redisRepository.save(REDIS_KEY_CONTENT, contentId, contentResponse.getBody().getData()) :
Mono.empty())
);