在 Spring 响应式应用程序上复制 request/response 正文?

Copy of the request/response body on a Spring reactive app?

我正在研究访问 HTTP 请求和响应主体的最佳方式,以便在 Spring 反应式应用程序中进行跟踪。

对于以前的版本,我们利用 Servlet 过滤器和 Servlet 请求包装器来使用传入请求的输入流并保留它的副本以异步处理跟踪(我们将它们发送到 Elasticsearch)。

但是对于 Spring 响应式应用程序(使用 webflux),我想知道在解码之前访问请求的最合适方式是什么。有什么想法吗?

事实证明,这可以使用提供的 装饰器 实现:分别为 ServerWebExchangeDecoratorServerHttpRequestDecoratorServerHttpResponseDecorator

这是一个示例请求装饰器,它累积了 DataBuffer 请求的默认订阅者读取的内容:

@Slf4j
public class CachingServerHttpRequestDecorator extends ServerHttpRequestDecorator {

    @Getter
    private final OffsetDateTime timestamp = OffsetDateTime.now();
    private final StringBuilder cachedBody = new StringBuilder();

    CachingServerHttpRequestDecorator(ServerHttpRequest delegate) {
        super(delegate);
    }

    @Override
    public Flux<DataBuffer> getBody() {
        return super.getBody().doOnNext(this::cache);
    }

    @SneakyThrows
    private void cache(DataBuffer buffer) {
        cachedBody.append(UTF_8.decode(buffer.asByteBuffer())
         .toString());
    }

    public String getCachedBody() {
        return cachedBody.toString();
    }

请确保,当您装饰由 WebFilter 传递的 ServerWebExchange 时,您也将 getRequest() 覆盖为 return 请求装饰器:

public final class PartnerServerWebExchangeDecorator extends ServerWebExchangeDecorator {

    private final ServerHttpRequestDecorator requestDecorator;
    private final ServerHttpResponseDecorator responseDecorator;

    public PartnerServerWebExchangeDecorator(ServerWebExchange delegate) {
        super(delegate);
        this.requestDecorator = new PartnerServerHttpRequestDecorator(delegate.getRequest());
        this.responseDecorator = new PartnerServerHttpResponseDecorator(delegate.getResponse());
    }

    @Override
    public ServerHttpRequest getRequest() {
        return requestDecorator;
    }

    @Override
    public ServerHttpResponse getResponse() {
        return responseDecorator;
    }

}

关于过滤器:

@Component
public class TracingFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        return chain.filter(new PartnerServerWebExchangeDecorator(exchange));
    }
}

可以这样使用(注意静态导入的函数):

@Bean
public HttpHandler myRoute(MyHandler handler) {
    final RouterFunction<ServerResponse> routerFunction =
        route(POST("/myResource"), handler::persistNotification);
    return webHandler(toWebHandler(routerFunction))
        .filter(new TracingFilter())
        .build();
}