如何解压缩 Flux<DataBuffer>(以及如何编写)?

How to decompress a Flux<DataBuffer> (and how to write one)?

我需要在没有中间存储的情况下读取和写入压缩 (GZIP) 流。目前,我正在使用 Spring RestTemplate 进行写作,并使用 Apache HTTP 客户端进行读取(请参阅我的回答 以了解为什么 RestTemplate 不能' t 用于读取大流)。实现相当简单,我在响应 InputStream 上打了一个 GZIPInputStream 并继续。

现在,我想改用 Spring 5 WebClient (just because I'm not a fan of status quo). However, WebClient is reactive in nature and deals with Flux<Stuff>; I believe it's possible to get a Flux<DataBuffer>, where DataBuffer 是对 ByteBuffer 的抽象。问题是,我如何即时解压缩它而不必将完整流存储在内存中(OutOfMemoryError,我在看你),或写入本地磁盘?值得一提的是,WebClient 在底层使用了 Netty。

我承认我对(解)压缩知之甚少,但是,我做了我的研究,但是 material 在线可用的 none 似乎特别有用。

compression on java nio direct buffers

Writing GZIP file with nio

Reading a GZIP file from a FileChannel (Java NIO)

(de)compressing files using NIO

Iterable gzip deflate/inflate in Java

public class HttpResponseHeadersHandler extends ChannelInboundHandlerAdapter {
    private final HttpHeaders httpHeaders;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpResponse &&
                !HttpStatus.resolve(((HttpResponse) msg).status().code()).is1xxInformational()) {
            HttpHeaders headers = ((HttpResponse) msg).headers();

            httpHeaders.forEach(e -> {
                log.warn("Modifying {} from: {} to: {}.", e.getKey(), headers.get(e.getKey()), e.getValue());
                headers.set(e.getKey(), e.getValue());
            });
        }
        ctx.fireChannelRead(msg);
    }
}

然后我创建一个 ClientHttpConnectorWebClient 一起使用,并在 afterNettyContextInit 中添加处理程序:

ctx.addHandlerLast(new ReadTimeoutHandler(readTimeoutMillis, TimeUnit.MILLISECONDS));
ctx.addHandlerLast(new Slf4JLoggingHandler());
if (forceDecompression) {
    io.netty.handler.codec.http.HttpHeaders httpHeaders = new ReadOnlyHttpHeaders(
            true,
            CONTENT_ENCODING, GZIP,
            CONTENT_TYPE, APPLICATION_JSON
    );
    HttpResponseHeadersHandler headersModifier = new HttpResponseHeadersHandler(httpHeaders);
    ctx.addHandlerFirst(headersModifier);
}
ctx.addHandlerLast(new HttpContentDecompressor());

当然,对于未通过 GZIP 压缩的响应,这会失败,因此我仅将 WebClient 的这个实例用于特定用例,我确定响应已压缩。

写起来很简单:Spring有一个ResourceEncoder,所以InputStream可以简单地转换成InputStreamResource,瞧!

在这里注意到这一点,因为它让我有点困惑 - API 从 5.1 开始发生了一些变化。

我的设置与 ChannelInboundHandler 的已接受答案类似:

public class GzipJsonHeadersHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpResponse
                && !HttpStatus.resolve(((HttpResponse) msg).status().code()).is1xxInformational()) {
            HttpHeaders headers = ((HttpResponse) msg).headers();
            headers.clear();
            headers.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
            headers.set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
        }
        ctx.fireChannelRead(msg);
    }
}

(为了简单起见,我需要的 header 值只是 hard-coded,否则是相同的。)

然而要注册它是不同的:

WebClient.builder()
    .clientConnector(
            new ReactorClientHttpConnector(
                    HttpClient.from(
                            TcpClient.create()
                                    .doOnConnected(c -> {
                                        c.addHandlerFirst(new HttpContentDecompressor());
                                        c.addHandlerFirst(new HttpResponseHeadersHandler());
                                    })
                    ).compress(true)
            )
    )
    .build();

似乎 Netty 现在维护一个与系统列表分开(和之后)的用户处理程序列表,并且 addHandlerFirst() 只将您的处理程序放在用户列表的前面。因此,它需要显式调用 HttpContentDecompressor 以确保它在您的处理程序插入正确的 headers 之后明确执行。