配置 spring.codec.max-in-memory-size 使用 ReactiveElasticsearchClient 时
configure spring.codec.max-in-memory-size When using ReactiveElasticsearchClient
我正在使用来自 spring-data-elasticsearch 3.2.3 的 ReactiveElasticsearchClient 和 spring-boot 2.2.0。升级到 spring-boot 2.2.2 时,我得到 org.springframework.core.io.buffer.DataBufferLimitException:超出缓冲区最大字节数限制:262144。
已指示修复使用 spring.codec.max-in-memory-size 但我仍然遇到相同的异常。
下面是整个异常:
org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144
at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:101)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoCollect] :
reactor.core.publisher.Flux.collect(Flux.java:3273)
org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:553)
Error has been observed at the following site(s):
|_ Flux.collect ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:553)
|_ Mono.filter ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:554)
|_ Mono.map ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:555)
|_ Mono.map ⇢ at org.springframework.core.codec.AbstractDataBufferDecoder.decodeToMono(AbstractDataBufferDecoder.java:96)
|_ checkpoint ⇢ Body from POST http://localhost:9200/_bulk?timeout=1m [DefaultClientResponse]
|_ Mono.map ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:669)
|_ Mono.doOnNext ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:670)
|_ Mono.flatMap ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:671)
|_ Mono.flatMapMany ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.sendRequest(DefaultReactiveElasticsearchClient.java:591)
|_ Flux.publishNext ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.bulk(DefaultReactiveElasticsearchClient.java:448)
|_ Flux.flatMap ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:32)
|_ Flux.map ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:33)
|_ Flux.reduce ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:34)
|_ Mono.zip ⇢ at com.energisme.ds.reactive.aggregation.service.AggregateSensorFlowService.nonIndexDifferenceAggregateSensorData(AggregateSensorFlowService.java:178)
|_ Mono.map ⇢ at com.energisme.ds.reactive.aggregation.service.AggregateSensorFlowService.nonIndexDifferenceAggregateSensorData(AggregateSensorFlowService.java:179)
Stack trace:
at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:101)
at org.springframework.core.io.buffer.LimitedDataBufferList.updateCount(LimitedDataBufferList.java:94)
at org.springframework.core.io.buffer.LimitedDataBufferList.add(LimitedDataBufferList.java:59)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onNext(MonoCollect.java:119)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:571)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:89)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:313)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:427)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:281)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:1050)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
任何人都可以告诉我我做错了什么或者这是一个错误吗?
谢谢
使用简单的反应 WebClient
我 运行 遇到了同样的问题(从 2.1.9 到 2.2.1。)我没有运气设置 spring.codec.max-in-memory-size
后来找到了暗示这不是解决问题的方法:
… On the client side, the limit can be changed in WebClient.Builder.
(source, including dead link :-S )
我还没有找到 WebClient.Builder
从哪里得到 default 256K limit1。但是,以下内容使我能够将缓冲区大小限制提高到 16M:
WebClient.builder()
.…
.exchangeStrategies(ExchangeStrategies.builder()
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(16 * 1024 * 1024))
.build())
.build();
所以,在我看来(不知道 spring-data-elasticsearch
的复杂性)如果 你可以以某种方式得到返回的 WebClient
从 WebClientProvider
你应该能够改变它以包括上面的 ExchangeStrategies
。
也许您可以根据(绝对未经测试!)提供您自己的 DefaultWebClientProvider
覆盖:
class MyDefaultWebClientProvider extends DefaultWebClientProvider {
@Override
public WebClient get(InetSocketAddress endpoint) {
return super.get(endpoint)
.mutate() // Obtain WebClient.Builder instance.
.exchangeStrategies(ExchangeStrategies.builder()
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(16 * 1024 * 1024))
.build())
.build();
}
}
YMMV.
更新 #1:
1) 现在我找到了。 和解释了为什么设置spring.codec.max-in-memory-size
没有效果; 属性 是所有默认编解码器使用的基础 class 中的 hardcoded at 256K,请参见。 BaseDefaultCodecs
.
几天前,我实现了自定义 WebClient
的可能性,检查 corresponding Jira issue。这将在 Spring Data Elasticsearch 3.2.4 中可用,并且已经在当前的 master 分支中。
配置代码如下所示:
@Configuration
public class ReactiveRestClientConfig extends AbstractReactiveElasticsearchConfiguration {
@Override
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
final ClientConfiguration clientConfiguration = ClientConfiguration.builder() //
.connectedTo("localhost:9200") //
.withWebClientConfigurer(webClient -> {
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs()
.maxInMemorySize(-1))
.build();
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
})
.build();
return ReactiveRestClients.create(clientConfiguration);
}
}
从 Spring Boot 2.3.0 开始,there is now a dedicated configuration property 用于 Reactive Elasticsearch REST 客户端。
您可以使用以下配置属性为客户端设置特定的内存限制。
spring.data.elasticsearch.client.reactive.max-in-memory-size=
已经存在的 spring.codec.max-in-memory-size
属性 是独立的,只会影响应用程序中的其他 WebClient
个实例。
或:
final Consumer<ClientCodecConfigurer> consumer = configurer -> {
final ClientCodecConfigurer.ClientDefaultCodecs codecs = configurer.defaultCodecs();
codecs.maxInMemorySize(maxBufferMb * 1024 * 1024);
};
WebClient.builder().codecs(consumer).build();
我正在使用来自 spring-data-elasticsearch 3.2.3 的 ReactiveElasticsearchClient 和 spring-boot 2.2.0。升级到 spring-boot 2.2.2 时,我得到 org.springframework.core.io.buffer.DataBufferLimitException:超出缓冲区最大字节数限制:262144。
已指示修复使用 spring.codec.max-in-memory-size 但我仍然遇到相同的异常。
下面是整个异常:
org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144
at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:101)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoCollect] :
reactor.core.publisher.Flux.collect(Flux.java:3273)
org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:553)
Error has been observed at the following site(s):
|_ Flux.collect ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:553)
|_ Mono.filter ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:554)
|_ Mono.map ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:555)
|_ Mono.map ⇢ at org.springframework.core.codec.AbstractDataBufferDecoder.decodeToMono(AbstractDataBufferDecoder.java:96)
|_ checkpoint ⇢ Body from POST http://localhost:9200/_bulk?timeout=1m [DefaultClientResponse]
|_ Mono.map ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:669)
|_ Mono.doOnNext ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:670)
|_ Mono.flatMap ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:671)
|_ Mono.flatMapMany ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.sendRequest(DefaultReactiveElasticsearchClient.java:591)
|_ Flux.publishNext ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.bulk(DefaultReactiveElasticsearchClient.java:448)
|_ Flux.flatMap ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:32)
|_ Flux.map ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:33)
|_ Flux.reduce ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:34)
|_ Mono.zip ⇢ at com.energisme.ds.reactive.aggregation.service.AggregateSensorFlowService.nonIndexDifferenceAggregateSensorData(AggregateSensorFlowService.java:178)
|_ Mono.map ⇢ at com.energisme.ds.reactive.aggregation.service.AggregateSensorFlowService.nonIndexDifferenceAggregateSensorData(AggregateSensorFlowService.java:179)
Stack trace:
at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:101)
at org.springframework.core.io.buffer.LimitedDataBufferList.updateCount(LimitedDataBufferList.java:94)
at org.springframework.core.io.buffer.LimitedDataBufferList.add(LimitedDataBufferList.java:59)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onNext(MonoCollect.java:119)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:571)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:89)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:313)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:427)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:281)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:1050)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
任何人都可以告诉我我做错了什么或者这是一个错误吗?
谢谢
使用简单的反应 WebClient
我 运行 遇到了同样的问题(从 2.1.9 到 2.2.1。)我没有运气设置 spring.codec.max-in-memory-size
后来找到了暗示这不是解决问题的方法:
… On the client side, the limit can be changed in WebClient.Builder.
(source, including dead link :-S )
我还没有找到 WebClient.Builder
从哪里得到 default 256K limit1。但是,以下内容使我能够将缓冲区大小限制提高到 16M:
WebClient.builder()
.…
.exchangeStrategies(ExchangeStrategies.builder()
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(16 * 1024 * 1024))
.build())
.build();
所以,在我看来(不知道 spring-data-elasticsearch
的复杂性)如果 你可以以某种方式得到返回的 WebClient
从 WebClientProvider
你应该能够改变它以包括上面的 ExchangeStrategies
。
也许您可以根据(绝对未经测试!)提供您自己的 DefaultWebClientProvider
覆盖:
class MyDefaultWebClientProvider extends DefaultWebClientProvider {
@Override
public WebClient get(InetSocketAddress endpoint) {
return super.get(endpoint)
.mutate() // Obtain WebClient.Builder instance.
.exchangeStrategies(ExchangeStrategies.builder()
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(16 * 1024 * 1024))
.build())
.build();
}
}
YMMV.
更新 #1:
1) 现在我找到了。 和解释了为什么设置spring.codec.max-in-memory-size
没有效果; 属性 是所有默认编解码器使用的基础 class 中的 hardcoded at 256K,请参见。 BaseDefaultCodecs
.
几天前,我实现了自定义 WebClient
的可能性,检查 corresponding Jira issue。这将在 Spring Data Elasticsearch 3.2.4 中可用,并且已经在当前的 master 分支中。
配置代码如下所示:
@Configuration
public class ReactiveRestClientConfig extends AbstractReactiveElasticsearchConfiguration {
@Override
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
final ClientConfiguration clientConfiguration = ClientConfiguration.builder() //
.connectedTo("localhost:9200") //
.withWebClientConfigurer(webClient -> {
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs()
.maxInMemorySize(-1))
.build();
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
})
.build();
return ReactiveRestClients.create(clientConfiguration);
}
}
从 Spring Boot 2.3.0 开始,there is now a dedicated configuration property 用于 Reactive Elasticsearch REST 客户端。
您可以使用以下配置属性为客户端设置特定的内存限制。
spring.data.elasticsearch.client.reactive.max-in-memory-size=
已经存在的 spring.codec.max-in-memory-size
属性 是独立的,只会影响应用程序中的其他 WebClient
个实例。
或:
final Consumer<ClientCodecConfigurer> consumer = configurer -> {
final ClientCodecConfigurer.ClientDefaultCodecs codecs = configurer.defaultCodecs();
codecs.maxInMemorySize(maxBufferMb * 1024 * 1024);
};
WebClient.builder().codecs(consumer).build();