ElasticSearch Scroll API 多线程

ElasticSearch Scroll API with multi threading

首先,我想让大家知道,我知道ElasticSearch ScrollAPI的基本工作逻辑。要使用 Scroll API,首先,我们需要使用一些滚动值调用 search 方法,例如 1m,然后它会 return 一个 _scroll_id ,它将被用于 Scroll 上的下一个连续调用,直到所有文档 returns在循环内。但问题是我只想在多线程基础上使用相同的进程,而不是串行使用。例如:

如果我有 300000 个文档,那么我想 process/get 这样

所以我的问题是,我没有找到任何方法来设置滚动 from 值 API 如何使用线程使滚动过程更快。不以序列化方式处理文档。

我的示例python代码

if index_name is not None and doc_type is not None and body is not None:
   es = init_es()
   page = es.search(index_name,doc_type, scroll = '30s',size = 10, body = body)
   sid = page['_scroll_id']
   scroll_size = page['hits']['total']

   # Start scrolling
   while (scroll_size > 0):

       print("Scrolling...")
       page = es.scroll(scroll_id=sid, scroll='30s')
       # Update the scroll ID
       sid = page['_scroll_id']

       print("scroll id: " + sid)

       # Get the number of results that we returned in the last scroll
       scroll_size = len(page['hits']['hits'])
       print("scroll size: " + str(scroll_size))

       print("scrolled data :" )
       print(page['aggregations'])

你试过sliced scroll吗?根据链接文档:

For scroll queries that return a lot of documents it is possible to split the scroll in multiple slices which can be consumed independently.

Each scroll is independent and can be processed in parallel like any scroll request.

我自己没有用过这个(我需要处理的最大结果集是 ~50k 文档)但这似乎是你要找的。

滚动必须是同步的,这是逻辑。

您可以使用多线程,这正是 elasticsearch 擅长的原因:并行性。

一个elasticsearch索引,由分片组成,这是你数据的物理存储。分片可以在同一节点上,也可以不在同一节点上(更好)。

另一方面,搜索API提供了一个很好的选择:_preference(https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-preference.html)

回到您的应用:

  1. 获取索引分片(和节点)列表
  2. 通过分片创建线程
  3. 对每个线程进行滚动搜索

瞧瞧!

此外,您可以使用 elasticsearch4hadoop 插件,它对 Spark / PIG / map-reduce / Hive 完全适用。

您应该为此使用切片滚动,请参阅 https://github.com/elastic/elasticsearch-dsl-py/issues/817#issuecomment-372271460 中的操作方法 python。

我遇到了和你一样的问题,但是文档大小是140万。我不得不使用并发方法并使用10个线程进行数据写入。

我用Java线程池写的代码,你可以在Python中找到类似的方法。

    public class ControllerRunnable implements  Runnable {
        private String i_res;
        private String i_scroll_id;
        private int i_index;
        private JSONArray i_hits;
        private JSONObject i_result;

        ControllerRunnable(int index_copy, String _scroll_id_copy) {
            i_index = index_copy;
            i_scroll_id = _scroll_id_copy;
        }

        @Override
        public void run(){
            try {
                s_logger.debug("index:{}", i_index );
                String nexturl = m_scrollUrl.replace("--", i_scroll_id);
                s_logger.debug("nexturl:{}", nexturl);
                i_res = get(nexturl);

                s_logger.debug("i_res:{}", i_res);

                i_result = JSONObject.parseObject(i_res);
                if (i_result == null) {
                    s_logger.info("controller thread parsed result object NULL, res:{}", i_res);
                    s_counter++;
                    return;
                }
                i_scroll_id = (String) i_result.get("_scroll_id");
                i_hits = i_result.getJSONObject("hits").getJSONArray("hits");
                s_logger.debug("hits content:{}\n", i_hits.toString());

                s_logger.info("hits_size:{}", i_hits.size());

                if (i_hits.size() > 0) {
                    int per_thread_data_num = i_hits.size() / s_threadnumber;
                    for (int i = 0; i < s_threadnumber; i++) {
                        Runnable worker = new DataRunnable(i * per_thread_data_num,
                                (i + 1) * per_thread_data_num);
                        m_executor.execute(worker);
                    }
                    // Wait until all threads are finish
                    m_executor.awaitTermination(1, TimeUnit.SECONDS);
                } else {
                    s_counter++;
                    return;
                }
            } catch (Exception e) {
                s_logger.error(e.getMessage(),e);
            }
        }
    }