缓存 Spring 5 WebFlux

Caching in Spring 5 WebFlux

在 Spring 5 中有什么方法可以缓存来自 WebClient 的 Flux 吗?我试过了,但没有缓存任何东西。

@RestController
@SpringBootApplication
@EnableCaching
public class GatewayApplication {

 @PostMapping(value ="/test", produces = "application/json")
 public Flux<String> handleRequest(@RequestBody String body) {
    return getHspadQuery(body);
 }

 @Cacheable("testCache")
 private Flux<String> getData (String body) {
    return WebClient.create().post()
            .uri("http://myurl")
            .body(BodyInserters.fromObject(body))
            .retrieve().bodyToFlux(String.class).cache();
 }
}

当我提出第三个请求时,它永远不会完成。然后在后续请求中我得到响应,但服务器抛出以下内容:

2018-04-09 12:36:23.920 ERROR 11488 --- [ctor-http-nio-4] r.ipc.netty.channel.ChannelOperations    : [HttpServer] Error processing connection. Requesting close the channel
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.emit(FluxBufferPredicate.java:292) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:251) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:205) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:180) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:646) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:523) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onSubscribe(FluxFlatMap.java:897) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:6873) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:372) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.ipc.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:211) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at reactor.ipc.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:326) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
...

而且它从不缓存任何东西。

如有任何帮助,我们将不胜感激。

谢谢。

目前,@Cacheable 不适用于 Flux(以及一般的 Reactor)。 但是对于您的示例,每次调用该方法时,您都会创建一个新的 Flux 实例,因此它自然不会缓存任何内容。

为了能够缓存结果,您需要将 Flux 转换为列表实例,或者继续重复使用一个 Flux 实例

最后我用Mono解决了。我想使用 Flux 是可能的,例如使用 reduce。

@RestController
@SpringBootApplication
public class Application {

@Autowired
CacheManager manager;


private WebClient client;

@PostConstruct
public void setup() {
    client = WebClient.builder()
            .baseUrl("http://myurl")
            .exchangeStrategies(ExchangeStrategies.withDefaults())
            .build();
}

@Bean
public CacheManager cacheManager() {
    SimpleCacheManager cacheManager = new SimpleCacheManager();
    cacheManager.setCaches(Arrays.asList(new ConcurrentMapCache("hspad")));
    return cacheManager;
}

@PostMapping(value = "/hspad/grahql", produces = "application/json")
public Mono<ResponseEntity<String>> hspadService(@RequestBody String body) {
    return getHspadQuery(body);
}

private Mono<ResponseEntity<String>> getHspadQuery (String body) {
    Mono<ResponseEntity<String>> mono;
    Optional<Cache.ValueWrapper> value = Optional.ofNullable(cacheManager().getCache("hspad").get(body));

    if(value.isPresent()) {
        mono = Mono.just(ResponseEntity.ok(value.get().get().toString()));
    } else {
        mono = client.post()
                .body(BodyInserters.fromObject(body))
                .retrieve().bodyToMono(String.class).map(response ->
                {
        // Care blocking operation! (use cacheManager -not found yet- prepared for reactive)                        cacheManager().getCache("hspad").putIfAbsent(body,response);
                    return ResponseEntity.ok(response);
                });
    }
    return mono;
}

public static void main(String[] args) {
    SpringApplication.run(GatewayApplication.class, args);
}
}

有一个反应器 cache add-on 可以与 Spring CacheManager 一起使用。 但是,正如已接受答案中的评论所指出的,目前,Spring 缓存 API(gets 和 puts)仍然处于阻塞状态。在解决这个问题之前,我们只能让程序完全响应。

这是 java 中的示例代码片段。 完整的示例项目是 here in github。

@Service
public class CatServiceImpl implements CatService {
    private static final String CACHE_NAME = "sr";
    private static final String KEY = "k";
    @Autowired
    private WebClient client;

    @Autowired
    private CacheManager cacheManager;

    @SuppressWarnings("unchecked")
    private Function<String, Mono<List<Signal<CatDto>>>> reader = k -> Mono
            .justOrEmpty((Optional.ofNullable((List<CatDto>) (cacheManager.getCache(CACHE_NAME).get(k, List.class)))))
            .flatMap(v -> Flux.fromIterable(v).materialize().collectList());

    private BiFunction<String, List<Signal<CatDto>>, Mono<Void>> writer = (k, sigs) -> Flux.fromIterable(sigs)
            .dematerialize().collectList().doOnNext(l -> cacheManager.getCache(CACHE_NAME).put(k, l)).then();

    @Override
    public Flux<CatDto> search() {
        Flux<CatDto> fromServer = client.get().retrieve().bodyToFlux(CatDto.class);

        return CacheFlux.lookup(reader, KEY).onCacheMissResume(fromServer).andWriteWith(writer);
    }

}

我使用 reactor cache add-on 的注释和基于 aop 的 spring 反应器缓存的替代方法,直到 spring 框架的 @Cacheable 注释支持反应式缓存。

https://github.com/pkgonan/reactor-cache

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface MonoCacheable {

    String value() default "";

}

abstract class AbstractSpringCache<T> {

    protected Cache cache;
    protected Class<T> type;

    protected AbstractSpringCache(Cache cache, Class<T> type) {
        this.cache = cache;
        this.type = type;
    }
}

public class SpringMonoCache<T> extends AbstractSpringCache<T> implements MonoCache<T> {

    public SpringMonoCache(Cache cache, Class<T> type) {
        super(cache, type);
    }

    @Override
    public Mono<T> find(Mono<T> retriever, String key) {
        return CacheMono.lookup(reader, key)
                .onCacheMissResume(retriever)
                .andWriteWith(writer);
    }

    /** Mono Cache reader function **/
    private Function<String, Mono<Signal<? extends T>>> reader = k -> Mono
            .fromCallable(() -> cache.get(k, type))
            .subscribeOn(Schedulers.elastic())
            .flatMap(t -> Mono.justOrEmpty(Signal.next(t)));

    /** Mono Cache writer function **/
    private BiFunction<String, Signal<? extends T>, Mono<Void>> writer = (k, signal) -> Mono
            .fromRunnable(() -> Optional.ofNullable(signal.get())
                    .ifPresent(o -> cache.put(k, o)))
            .subscribeOn(Schedulers.elastic())
            .then();
}

@Aspect
@Component
class ReactorAnnotationCacheAspect {

    ...

    @Around("annotationOfAnyMonoCacheable() && " +
            "executionOfAnyPublicMonoMethod()")
    final Object around(final ProceedingJoinPoint joinPoint) throws Throwable {
        ...

        try {
            return reactorCacheAspectSupport.execute(aspectJInvoker, method, args);
        } catch(...) {}...

        return joinPoint.proceed(args);
    }

    @Pointcut(value = "@annotation(reactor.cache.spring.annotation.MonoCacheable)")
    private void annotationOfAnyMonoCacheable() {}

    @Pointcut(value = "execution(public reactor.core.publisher.Mono *(..))")
    private void executionOfAnyPublicMonoMethod() {}
}


class ReactorCacheAspectSupport {

    private final CacheManager cacheManager;
    ...

    Object execute(final CacheOperationInvoker invoker, final Method method, final Object[] args) {
        ...
        return execute(cache, invoker.invoke(), key, returnType);
    }

    private Object execute(final Cache cache, final Object proceed, final String key, final Class<?> type) {
        ...
        final ReactorCache cacheResolver = getCacheResolver(cache, type);
        return cacheResolver.find(proceed, key);
    }

    ...
}