当前队列负载如何调用Elastic Search?

How to call Elastic Search for current queue load?

在广泛查询 ES 时,我得到

Failed to execute [org.elasticsearch.action.search.SearchRequest@59e634e2] lastShard [true]
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 1000) on org.elasticsearch.search.
action.SearchServiceTransportAction@75bd024b
        at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
        at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:79)
        at org.elasticsearch.search.action.SearchServiceTransportAction.execute(SearchServiceTransportAction.java:551)
        at org.elasticsearch.search.action.SearchServiceTransportAction.sendExecuteQuery(SearchServiceTransportAction.java:228)
        at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction$AsyncAction.sendExecuteFirstPhase(TransportSearchQueryThenFetchAction.java:83)

非常定期。

我现在的计划是 pause 查询请求,直到队列负载低于 x。您可以查询客户端的 stats

client.admin().cluster().threadPool().stats().iterator();

但由于我的客户端不是数据节点(我想这就是原因),我得到 queue=0 返回,而服务器节点抛出上述错误。

我知道为什么会抛出这个错误,我知道如何更新设置,但这只会推迟这个错误,并产生其他错误...

如何询问集群节点他们的队列负载是多少?

PS:我正在使用 Java Api

我已经尝试过,没有要求的结果,空白行表示另一次尝试,除非另有说明

//Nodes stats
final NodesStatsResponse nodesStatsResponse = client.admin().cluster().prepareNodesStats().execute().actionGet();
final NodeStats nodeStats = nodesStatsResponse.getNodes()[0];
final String nodeId = nodeStats.getNode().getId(); // need this later on

// same as before, but with explicit NodesStatsRequest (with id)
final NodesStatsResponse response = client.admin().cluster().nodesStats(new NodesStatsRequest(nodeId)).actionGet();
final NodeStats[] nodeStats2 = response.getNodes();
for (NodeStats nodeStats3 : nodeStats2) {
    Stats stats = nodeStats3.getThreadPool().iterator().next();
}

// Cluster?
final ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequestBuilder(client.admin().cluster()).request();
final ClusterStatsResponse clusterStatsResponse = client.admin().cluster().clusterStats(clusterStatsRequest).actionGet();
final ClusterStatsNodes clusterStatsNodes = clusterStatsResponse.getNodesStats();

// Nodes info?
final NodesInfoResponse infoResponse = client.admin().cluster().nodesInfo(new NodesInfoRequest(nodeId)).actionGet();// here
final NodeInfo[] nodeInfos = infoResponse.getNodes();
for (final NodeInfo nodeInfo : nodeInfos) {
    final ThreadPoolInfo info = nodeInfo.getThreadPool();
    final Iterator<Info> infoIterator = info.iterator();
    while (infoIterator.hasNext()) {
        final Info realInfo = infoIterator.next();
        SizeValue sizeValue = realInfo.getQueueSize();
        // is no == null, then (¿happens?, was expecting a nullpointer, but Thread disappeared)
        if (sizeValue == null) 
            continue;
        // normal queue size, no load (oddly found 1000 (expected), and one of 200 in one node?)
        final long queueSize = sizeValue.getSingles(); 
    }
}

问题是某些进程需要立即调用(例如用户请求),而其他进程可能会在数据库太忙时等待(后台进程)。我最好将一定数量的队列分配给处理即时请求的进程,将另一部分分配给后台进程(但我没有看到这个选项)。

更新 看起来,我没想到的是,当单独搜索的总数超过 1000 时(当 x 分片或 x 索引除以 1000/x 时,单个批量查询会出现查询过载)搜索)。因此,除非您可以进行 single 查询,否则批量处理不是一个选项。因此,当您一次定位 700 个搜索结果时(考虑到上述声明),您需要知道队列中是否有超过 300 个项目,因为它会抛出东西。

总结一下:

假设每次调用的负载是最大值 bulkrequest,因此我无法合并请求。那么,如何在 elasticsearch 开始抛出上述异常之前 开始暂停请求。所以我可以暂停应用程序的一部分,但不能暂停另一部分?如果我知道队列已满,比如说,在中途,后台进程必须休眠一段时间。我如何知道(近似)队列负载?

希望这是不是答案,来源https://www.elastic.co/guide/en/elasticsearch/guide/current/_monitoring_individual_nodes.html#_threadpool_section:

Bulk Rejections

If you are going to encounter queue rejections, it will most likely be caused by bulk indexing requests. It is easy to send many bulk requests to Elasticsearch by using concurrent import processes. More is better, right?

In reality, each cluster has a certain limit at which it can not keep up with ingestion. Once this threshold is crossed, the queue will quickly fill up, and new bulks will be rejected.

This is a good thing. Queue rejections are a useful form of back pressure. They let you know that your cluster is at maximum capacity, which is much better than sticking data into an in-memory queue. Increasing the queue size doesn’t increase performance; it just hides the problem. If your cluster can process only 10,000 docs per second, it doesn’t matter whether the queue is 100 or 10,000,000—your cluster can still process only 10,000 docs per second.

The queue simply hides the performance problem and carries a real risk of data-loss. Anything sitting in a queue is by definition not processed yet. If the node goes down, all those requests are lost forever. Furthermore, the queue eats up a lot of memory, which is not ideal.

It is much better to handle queuing in your application by gracefully handling the back pressure from a full queue. When you receive bulk rejections, you should take these steps:

Pause the import thread for 3–5 seconds. Extract the rejected actions from the bulk response, since it is probable that many of the actions were successful. The bulk response will tell you which succeeded and which were rejected. Send a new bulk request with just the rejected actions. Repeat from step 1 if rejections are encountered again. Using this procedure, your code naturally adapts to the load of your cluster and naturally backs off.

Rejections are not errors: they just mean you should try again later.

尤其是这个When you receive bulk rejections, you should take these steps我不喜欢。我们应该能够在正手上处理迎面而来的问题。

您查看队列使用情况的方式是错误的,因为您查看的统计数据不正确。

看看这段代码:

    final NodesStatsResponse response = client.admin().cluster().prepareNodesStats().setThreadPool(true).execute().actionGet();
    final NodeStats[] nodeStats2 = response.getNodes();

    for (NodeStats nodeStats3 : nodeStats2) {
        ThreadPoolStats stats = nodeStats3.getThreadPool();

        if (stats != null)
            for (ThreadPoolStats.Stats threadPoolStat : stats) {
                System.out.println("node `" + nodeStats3.getNode().getName() + "`" + " has pool `" + threadPoolStat.getName() + "` with current queue size " + threadPoolStat.getQueue());
            }
    }

首先你需要 setThreadPool(true) 才能取回线程池统计信息,否则它将是 null

其次,您需要 ThreadPoolStats 而不是 ThreadPoolInfo 用于线程池设置。

所以,这是您的第二次尝试,但未完成。 1000 您看到的是设置本身(最大队列大小),而不是实际负载。