如何解压缩 Flux<DataBuffer>(以及如何编写)?
How to decompress a Flux<DataBuffer> (and how to write one)?
我需要在没有中间存储的情况下读取和写入压缩 (GZIP) 流。目前,我正在使用 Spring RestTemplate
进行写作,并使用 Apache HTTP 客户端进行读取(请参阅我的回答 以了解为什么 RestTemplate
不能' t 用于读取大流)。实现相当简单,我在响应 InputStream
上打了一个 GZIPInputStream
并继续。
现在,我想改用 Spring 5 WebClient (just because I'm not a fan of status quo). However, WebClient
is reactive in nature and deals with Flux<Stuff>
; I believe it's possible to get a Flux<DataBuffer>
, where DataBuffer 是对 ByteBuffer
的抽象。问题是,我如何即时解压缩它而不必将完整流存储在内存中(OutOfMemoryError
,我在看你),或写入本地磁盘?值得一提的是,WebClient
在底层使用了 Netty。
- 另见 Reactor Netty issue-251。
- 也与 Spring 集成有关 issue-2300。
我承认我对(解)压缩知之甚少,但是,我做了我的研究,但是 material 在线可用的 none 似乎特别有用。
compression on java nio direct buffers
Writing GZIP file with nio
Reading a GZIP file from a FileChannel (Java NIO)
(de)compressing files using NIO
Iterable gzip deflate/inflate in Java
public class HttpResponseHeadersHandler extends ChannelInboundHandlerAdapter {
private final HttpHeaders httpHeaders;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpResponse &&
!HttpStatus.resolve(((HttpResponse) msg).status().code()).is1xxInformational()) {
HttpHeaders headers = ((HttpResponse) msg).headers();
httpHeaders.forEach(e -> {
log.warn("Modifying {} from: {} to: {}.", e.getKey(), headers.get(e.getKey()), e.getValue());
headers.set(e.getKey(), e.getValue());
});
}
ctx.fireChannelRead(msg);
}
}
然后我创建一个 ClientHttpConnector
与 WebClient
一起使用,并在 afterNettyContextInit
中添加处理程序:
ctx.addHandlerLast(new ReadTimeoutHandler(readTimeoutMillis, TimeUnit.MILLISECONDS));
ctx.addHandlerLast(new Slf4JLoggingHandler());
if (forceDecompression) {
io.netty.handler.codec.http.HttpHeaders httpHeaders = new ReadOnlyHttpHeaders(
true,
CONTENT_ENCODING, GZIP,
CONTENT_TYPE, APPLICATION_JSON
);
HttpResponseHeadersHandler headersModifier = new HttpResponseHeadersHandler(httpHeaders);
ctx.addHandlerFirst(headersModifier);
}
ctx.addHandlerLast(new HttpContentDecompressor());
当然,对于未通过 GZIP 压缩的响应,这会失败,因此我仅将 WebClient
的这个实例用于特定用例,我确定响应已压缩。
写起来很简单:Spring有一个ResourceEncoder
,所以InputStream
可以简单地转换成InputStreamResource
,瞧!
在这里注意到这一点,因为它让我有点困惑 - API 从 5.1 开始发生了一些变化。
我的设置与 ChannelInboundHandler
的已接受答案类似:
public class GzipJsonHeadersHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpResponse
&& !HttpStatus.resolve(((HttpResponse) msg).status().code()).is1xxInformational()) {
HttpHeaders headers = ((HttpResponse) msg).headers();
headers.clear();
headers.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
headers.set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
}
ctx.fireChannelRead(msg);
}
}
(为了简单起见,我需要的 header 值只是 hard-coded,否则是相同的。)
然而要注册它是不同的:
WebClient.builder()
.clientConnector(
new ReactorClientHttpConnector(
HttpClient.from(
TcpClient.create()
.doOnConnected(c -> {
c.addHandlerFirst(new HttpContentDecompressor());
c.addHandlerFirst(new HttpResponseHeadersHandler());
})
).compress(true)
)
)
.build();
似乎 Netty 现在维护一个与系统列表分开(和之后)的用户处理程序列表,并且 addHandlerFirst()
只将您的处理程序放在用户列表的前面。因此,它需要显式调用 HttpContentDecompressor
以确保它在您的处理程序插入正确的 headers 之后明确执行。
我需要在没有中间存储的情况下读取和写入压缩 (GZIP) 流。目前,我正在使用 Spring RestTemplate
进行写作,并使用 Apache HTTP 客户端进行读取(请参阅我的回答 RestTemplate
不能' t 用于读取大流)。实现相当简单,我在响应 InputStream
上打了一个 GZIPInputStream
并继续。
现在,我想改用 Spring 5 WebClient (just because I'm not a fan of status quo). However, WebClient
is reactive in nature and deals with Flux<Stuff>
; I believe it's possible to get a Flux<DataBuffer>
, where DataBuffer 是对 ByteBuffer
的抽象。问题是,我如何即时解压缩它而不必将完整流存储在内存中(OutOfMemoryError
,我在看你),或写入本地磁盘?值得一提的是,WebClient
在底层使用了 Netty。
- 另见 Reactor Netty issue-251。
- 也与 Spring 集成有关 issue-2300。
我承认我对(解)压缩知之甚少,但是,我做了我的研究,但是 material 在线可用的 none 似乎特别有用。
compression on java nio direct buffers
Writing GZIP file with nio
Reading a GZIP file from a FileChannel (Java NIO)
(de)compressing files using NIO
Iterable gzip deflate/inflate in Java
public class HttpResponseHeadersHandler extends ChannelInboundHandlerAdapter {
private final HttpHeaders httpHeaders;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpResponse &&
!HttpStatus.resolve(((HttpResponse) msg).status().code()).is1xxInformational()) {
HttpHeaders headers = ((HttpResponse) msg).headers();
httpHeaders.forEach(e -> {
log.warn("Modifying {} from: {} to: {}.", e.getKey(), headers.get(e.getKey()), e.getValue());
headers.set(e.getKey(), e.getValue());
});
}
ctx.fireChannelRead(msg);
}
}
然后我创建一个 ClientHttpConnector
与 WebClient
一起使用,并在 afterNettyContextInit
中添加处理程序:
ctx.addHandlerLast(new ReadTimeoutHandler(readTimeoutMillis, TimeUnit.MILLISECONDS));
ctx.addHandlerLast(new Slf4JLoggingHandler());
if (forceDecompression) {
io.netty.handler.codec.http.HttpHeaders httpHeaders = new ReadOnlyHttpHeaders(
true,
CONTENT_ENCODING, GZIP,
CONTENT_TYPE, APPLICATION_JSON
);
HttpResponseHeadersHandler headersModifier = new HttpResponseHeadersHandler(httpHeaders);
ctx.addHandlerFirst(headersModifier);
}
ctx.addHandlerLast(new HttpContentDecompressor());
当然,对于未通过 GZIP 压缩的响应,这会失败,因此我仅将 WebClient
的这个实例用于特定用例,我确定响应已压缩。
写起来很简单:Spring有一个ResourceEncoder
,所以InputStream
可以简单地转换成InputStreamResource
,瞧!
在这里注意到这一点,因为它让我有点困惑 - API 从 5.1 开始发生了一些变化。
我的设置与 ChannelInboundHandler
的已接受答案类似:
public class GzipJsonHeadersHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpResponse
&& !HttpStatus.resolve(((HttpResponse) msg).status().code()).is1xxInformational()) {
HttpHeaders headers = ((HttpResponse) msg).headers();
headers.clear();
headers.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
headers.set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
}
ctx.fireChannelRead(msg);
}
}
(为了简单起见,我需要的 header 值只是 hard-coded,否则是相同的。)
然而要注册它是不同的:
WebClient.builder()
.clientConnector(
new ReactorClientHttpConnector(
HttpClient.from(
TcpClient.create()
.doOnConnected(c -> {
c.addHandlerFirst(new HttpContentDecompressor());
c.addHandlerFirst(new HttpResponseHeadersHandler());
})
).compress(true)
)
)
.build();
似乎 Netty 现在维护一个与系统列表分开(和之后)的用户处理程序列表,并且 addHandlerFirst()
只将您的处理程序放在用户列表的前面。因此,它需要显式调用 HttpContentDecompressor
以确保它在您的处理程序插入正确的 headers 之后明确执行。