ElasticSearch Scroll API 多线程
ElasticSearch Scroll API with multi threading
首先,我想让大家知道,我知道ElasticSearch ScrollAPI的基本工作逻辑。要使用 Scroll API,首先,我们需要使用一些滚动值调用 search 方法,例如 1m,然后它会 return 一个 _scroll_id ,它将被用于 Scroll 上的下一个连续调用,直到所有文档 returns在循环内。但问题是我只想在多线程基础上使用相同的进程,而不是串行使用。例如:
如果我有 300000 个文档,那么我想 process/get 这样
- 第一个线程将处理初始 100000 个文档
- 第二个线程将处理下一个 100000 个文档
- 第 3 个线程将处理 剩余 100000 个文档
所以我的问题是,我没有找到任何方法来设置滚动 from 值 API 如何使用线程使滚动过程更快。不以序列化方式处理文档。
我的示例python代码
if index_name is not None and doc_type is not None and body is not None:
es = init_es()
page = es.search(index_name,doc_type, scroll = '30s',size = 10, body = body)
sid = page['_scroll_id']
scroll_size = page['hits']['total']
# Start scrolling
while (scroll_size > 0):
print("Scrolling...")
page = es.scroll(scroll_id=sid, scroll='30s')
# Update the scroll ID
sid = page['_scroll_id']
print("scroll id: " + sid)
# Get the number of results that we returned in the last scroll
scroll_size = len(page['hits']['hits'])
print("scroll size: " + str(scroll_size))
print("scrolled data :" )
print(page['aggregations'])
你试过sliced scroll吗?根据链接文档:
For scroll queries that return a lot of documents it is possible to
split the scroll in multiple slices which can be consumed
independently.
和
Each scroll is independent and can be processed in parallel like any
scroll request.
我自己没有用过这个(我需要处理的最大结果集是 ~50k 文档)但这似乎是你要找的。
滚动必须是同步的,这是逻辑。
您可以使用多线程,这正是 elasticsearch 擅长的原因:并行性。
一个elasticsearch索引,由分片组成,这是你数据的物理存储。分片可以在同一节点上,也可以不在同一节点上(更好)。
另一方面,搜索API提供了一个很好的选择:_preference
(https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-preference.html)
回到您的应用:
- 获取索引分片(和节点)列表
- 通过分片创建线程
- 对每个线程进行滚动搜索
瞧瞧!
此外,您可以使用 elasticsearch4hadoop 插件,它对 Spark / PIG / map-reduce / Hive 完全适用。
您应该为此使用切片滚动,请参阅 https://github.com/elastic/elasticsearch-dsl-py/issues/817#issuecomment-372271460 中的操作方法 python。
我遇到了和你一样的问题,但是文档大小是140万。我不得不使用并发方法并使用10个线程进行数据写入。
我用Java线程池写的代码,你可以在Python中找到类似的方法。
public class ControllerRunnable implements Runnable {
private String i_res;
private String i_scroll_id;
private int i_index;
private JSONArray i_hits;
private JSONObject i_result;
ControllerRunnable(int index_copy, String _scroll_id_copy) {
i_index = index_copy;
i_scroll_id = _scroll_id_copy;
}
@Override
public void run(){
try {
s_logger.debug("index:{}", i_index );
String nexturl = m_scrollUrl.replace("--", i_scroll_id);
s_logger.debug("nexturl:{}", nexturl);
i_res = get(nexturl);
s_logger.debug("i_res:{}", i_res);
i_result = JSONObject.parseObject(i_res);
if (i_result == null) {
s_logger.info("controller thread parsed result object NULL, res:{}", i_res);
s_counter++;
return;
}
i_scroll_id = (String) i_result.get("_scroll_id");
i_hits = i_result.getJSONObject("hits").getJSONArray("hits");
s_logger.debug("hits content:{}\n", i_hits.toString());
s_logger.info("hits_size:{}", i_hits.size());
if (i_hits.size() > 0) {
int per_thread_data_num = i_hits.size() / s_threadnumber;
for (int i = 0; i < s_threadnumber; i++) {
Runnable worker = new DataRunnable(i * per_thread_data_num,
(i + 1) * per_thread_data_num);
m_executor.execute(worker);
}
// Wait until all threads are finish
m_executor.awaitTermination(1, TimeUnit.SECONDS);
} else {
s_counter++;
return;
}
} catch (Exception e) {
s_logger.error(e.getMessage(),e);
}
}
}
首先,我想让大家知道,我知道ElasticSearch ScrollAPI的基本工作逻辑。要使用 Scroll API,首先,我们需要使用一些滚动值调用 search 方法,例如 1m,然后它会 return 一个 _scroll_id ,它将被用于 Scroll 上的下一个连续调用,直到所有文档 returns在循环内。但问题是我只想在多线程基础上使用相同的进程,而不是串行使用。例如:
如果我有 300000 个文档,那么我想 process/get 这样
- 第一个线程将处理初始 100000 个文档
- 第二个线程将处理下一个 100000 个文档
- 第 3 个线程将处理 剩余 100000 个文档
所以我的问题是,我没有找到任何方法来设置滚动 from 值 API 如何使用线程使滚动过程更快。不以序列化方式处理文档。
我的示例python代码
if index_name is not None and doc_type is not None and body is not None:
es = init_es()
page = es.search(index_name,doc_type, scroll = '30s',size = 10, body = body)
sid = page['_scroll_id']
scroll_size = page['hits']['total']
# Start scrolling
while (scroll_size > 0):
print("Scrolling...")
page = es.scroll(scroll_id=sid, scroll='30s')
# Update the scroll ID
sid = page['_scroll_id']
print("scroll id: " + sid)
# Get the number of results that we returned in the last scroll
scroll_size = len(page['hits']['hits'])
print("scroll size: " + str(scroll_size))
print("scrolled data :" )
print(page['aggregations'])
你试过sliced scroll吗?根据链接文档:
For scroll queries that return a lot of documents it is possible to split the scroll in multiple slices which can be consumed independently.
和
Each scroll is independent and can be processed in parallel like any scroll request.
我自己没有用过这个(我需要处理的最大结果集是 ~50k 文档)但这似乎是你要找的。
滚动必须是同步的,这是逻辑。
您可以使用多线程,这正是 elasticsearch 擅长的原因:并行性。
一个elasticsearch索引,由分片组成,这是你数据的物理存储。分片可以在同一节点上,也可以不在同一节点上(更好)。
另一方面,搜索API提供了一个很好的选择:_preference
(https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-preference.html)
回到您的应用:
- 获取索引分片(和节点)列表
- 通过分片创建线程
- 对每个线程进行滚动搜索
瞧瞧!
此外,您可以使用 elasticsearch4hadoop 插件,它对 Spark / PIG / map-reduce / Hive 完全适用。
您应该为此使用切片滚动,请参阅 https://github.com/elastic/elasticsearch-dsl-py/issues/817#issuecomment-372271460 中的操作方法 python。
我遇到了和你一样的问题,但是文档大小是140万。我不得不使用并发方法并使用10个线程进行数据写入。
我用Java线程池写的代码,你可以在Python中找到类似的方法。
public class ControllerRunnable implements Runnable {
private String i_res;
private String i_scroll_id;
private int i_index;
private JSONArray i_hits;
private JSONObject i_result;
ControllerRunnable(int index_copy, String _scroll_id_copy) {
i_index = index_copy;
i_scroll_id = _scroll_id_copy;
}
@Override
public void run(){
try {
s_logger.debug("index:{}", i_index );
String nexturl = m_scrollUrl.replace("--", i_scroll_id);
s_logger.debug("nexturl:{}", nexturl);
i_res = get(nexturl);
s_logger.debug("i_res:{}", i_res);
i_result = JSONObject.parseObject(i_res);
if (i_result == null) {
s_logger.info("controller thread parsed result object NULL, res:{}", i_res);
s_counter++;
return;
}
i_scroll_id = (String) i_result.get("_scroll_id");
i_hits = i_result.getJSONObject("hits").getJSONArray("hits");
s_logger.debug("hits content:{}\n", i_hits.toString());
s_logger.info("hits_size:{}", i_hits.size());
if (i_hits.size() > 0) {
int per_thread_data_num = i_hits.size() / s_threadnumber;
for (int i = 0; i < s_threadnumber; i++) {
Runnable worker = new DataRunnable(i * per_thread_data_num,
(i + 1) * per_thread_data_num);
m_executor.execute(worker);
}
// Wait until all threads are finish
m_executor.awaitTermination(1, TimeUnit.SECONDS);
} else {
s_counter++;
return;
}
} catch (Exception e) {
s_logger.error(e.getMessage(),e);
}
}
}