ES Rest 高级客户端在空闲一段时间后抛出 SocketTimeoutException

ES Rest High Level Client throws SocketTimeoutException after being idle for sometime

RestHighLevelClient 用于在 spring-boot 应用程序中连接到 ES 6.4(托管在 AWS 上)。当应用程序空闲一段时间,请求到达时,RestHighLevelClient 抛出 SocketTimeoutException:

[Request processing failed; nested exception is org.springframework.data.elasticsearch.ElasticsearchException: Error while bulk for request: org.elasticsearch.action.bulk.BulkRequest@21511b6c] w
java.net.SocketTimeoutException: 5,000 milliseconds timeout on connection http-outgoing-38 [ACTIVE]
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) ~[httpasyncclient-4.1.4.jar!/:4.1.4]
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) ~[httpasyncclient-4.1.4.jar!/:4.1.4]
at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

RestHighLevelClient 使用以下方法创建:

    @Bean
    RestHighLevelClient client() {

        ClientConfiguration clientConfiguration = ClientConfiguration.builder()
                .connectedTo(elasticsearchHostAndPort)
                .build();

        return RestClients.create(clientConfiguration).rest();
    }
使用

spring-data-elasticsearch 版本 3.2.0.M2

任何hints/workarounds?

RestClientBuilder.createHttpClient() 中,套接字超时和连接超时的默认值设置为 30 秒和 10 秒。

您可以通过实施 RestClientBuilder.RequestConfigCallback 并在您的 RestHighLevelClient

上调用 setRequestConfigCallback(...) 来覆盖此默认值

我们做了类似的事情

@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
    return builder.setSocketTimeout(socketTimeout); // try to prevent SocketTimeoutException
}

它也应该可以通过属性进行配置:

spring.elasticsearch.jest.connection-timeout=3000 # Connection timeout in milliseconds.
spring.elasticsearch.jest.multi-threaded=true # Enable connection requests from multiple execution threads.

我也尝试过将 connection/socket 超时设置为 0,如此处和其他地方所建议的那样。 最终还是没有用。

还有一个 solution/workaround,由 spring-data-elasticsearch 推荐 https://jira.spring.io/browse/DATAES-789。 如果出现此类异常,它只会执行内部重试。它并没有真正解决问题,但您的客户不会收到错误消息。相反,空闲时间后的第一个请求将额外花费 5 秒(或您配置的任何超时)。

如果你使用的是springboot-data-elasticsearch version 4+(从Springboot 2.3.0开始),就可以套用这个方案

我可以确认以下解决方案有效(具有我上面提到的限制):


@Configuration
public class ElasticSearchRestClientConfig extends AbstractElasticsearchConfiguration {

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    @Override
    public RestHighLevelClient elasticsearchClient() {
        return restHighLevelClient;
    }

    @Bean
    @Override
    public ElasticsearchCustomConversions elasticsearchCustomConversions() {
        return new ElasticsearchCustomConversions();
    }

    @Override
    public ElasticsearchOperations elasticsearchOperations(ElasticsearchConverter elasticsearchConverter) {
        return new ElasticsearchRestTemplate(elasticsearchClient(), elasticsearchConverter) {
            @Override
            public <T> T execute(ClientCallback<T> callback) {
                int retryCount = 0;
                T t = null;
                while (retryCount <= RestClientBuilder.DEFAULT_MAX_CONN_PER_ROUTE && t == null) {
                    try {
                        t = super.execute(callback);
                    } catch (DataAccessResourceFailureException e) {
                        // retry
                        if (e.getCause() != null && (e.getCause().getCause() instanceof SocketTimeoutException) &&
                                (retryCount < RestClientBuilder.DEFAULT_MAX_CONN_PER_ROUTE)) {
                            retryCount++;
                            log.warn("Elasticsearch client - performing retry {} after caught DataAccessResourceFailureException: {}", retryCount, e.getMessage());
                        }
                        else {
                            throw e;
                        }
                    }
                }
                return t;
            }
        };
    }
 

我是这样解决这个问题的:

@Configuration
@EnableElasticsearchRepositories(basePackages = "com.hamdos.repositories.es")
@ComponentScan(basePackages = { "com.hamdos.services.es" })
@Order(2)
public class Config {

    @Bean
    RestHighLevelClient client() {

        RestClientBuilder builder = RestClient.builder(
                new HttpHost("localhost", 9289))
                .setRequestConfigCallback(
                        new RestClientBuilder.RequestConfigCallback() {
                            @Override
                            public RequestConfig.Builder customizeRequestConfig(
                                    RequestConfig.Builder requestConfigBuilder) {
                                return requestConfigBuilder
                                        .setConnectTimeout(5000)
                                        .setSocketTimeout(60000);
                            }
                        });

        return new RestHighLevelClient(builder);
    }

    @Bean
    public ElasticsearchOperations elasticsearchTemplate() {
        return new ElasticsearchRestTemplate(client());
    }
}

关于系统的信息:

  • spring-数据弹性搜索:4.0.0.RELEASE
  • spring-引导:2.3.0.RELEASE
  • 弹性搜索:7.7.0
 @Bean
override fun elasticsearchClient(): RestHighLevelClient {
    val clientConfiguration = ClientConfiguration
        .builder()
        .connectedTo("localhost:9200")
        .withSocketTimeout(enter time in millisecond)
        .build()`enter code here`
    return RestClients.create(clientConfiguration).rest()
}

只需像上面显示的那样使用 withSocketTimeout(30000) mthode