将大量数据索引到elasticsearch

Index large amount of data into elasticsearch

我在 HBase 中有超过 60 亿条社交媒体数据(包括 content/time/author 和其他可能的字段),48 台服务器的 4100 个区域,我现在需要将这些数据刷新到 Elasticsearch。

我很清楚 ES 的 bulk API,并且在 Java 中使用 bulk 和 MapReduce 仍然需要很多天(至少一周左右)。我可以改用 spark,但我认为它不会有太大帮助。

我想知道是否还有其他技巧可以将这些大数据写入 ElasticSearch?喜欢手动写入 es 索引文件并使用某种恢复来加载本地文件系统中的文件?

感谢任何可能的建议,谢谢。

==============

关于我的集群环境的一些细节:

spark 1.3.1 独立版(我可以在 yarn 上更改它以使用 Spark 1.6.2 或 1.6.3)

Hadoop 2.7.1 (HDP 2.4.2.258)

弹性搜索 2.3.3

AFAIK Spark 是索引以下 2 个选项的最佳选择。 下面是我提供的方法:

划分(输入扫描标准)并征服 60 亿社交媒体数据:

我建议创建多个 Spark/Mapreduce 具有不同搜索条件的作业(根据类别或其他内容将 60 亿社交媒体数据分成 6 块)并并行触发它们。 例如,基于数据捕获时间范围(scan.setTimeRange(t1,t2))或使用一些模糊行逻辑(FuzzyRowFilter),肯定会加快速度。

流媒体方法的种类:

你也可以考虑当你通过 spark 或 mapreduce 插入数据时,你可以同时为它们创建索引。

例如在 SOLR 的情况下: clouder 有 NRT hbase lily 索引器...即当 hbase table 基于 WAL 填充时(预写日志) 条目同时创建 solr 索引。检查 Elastic 搜索是否有类似的东西。

即使它不适用于 ES,也不必费心,同时使用您可以自己创建的 Spark/Mapreduce 程序自行摄取数据。

选项 1:

我建议如果你对 spark 没问题,这是一个很好的解决方案 Spark 支持原生集成来自 hadoop 2.1 的 ES。 见

elasticsearch-hadoop provides native integration between Elasticsearch and Apache Spark, in the form of an RDD (Resilient Distributed Dataset) (or Pair RDD to be precise) that can read data from Elasticsearch. The RDD is offered in two flavors: one for Scala (which returns the data as Tuple2 with Scala collections) and one for Java (which returns the data as Tuple2 containing java.util collections).

选项 2:如您所知,比 spark 慢一点

正在向 Elasticsearch 写入数据 使用 elasticsearch-hadoop,Map/Reduce 作业可以将数据写入 Elasticsearch,使其可以通过索引进行搜索。 elasticsearch-hadoop 支持(所谓的)新旧 Hadoop API。

我找到了一个实用的技巧来提高批量索引的性能。

我可以在我的客户端中计算哈希路由,并确保包含所有索引请求的每个批量请求都具有相同的路由。根据路由结果和带ip的分片信息,我直接将批量请求发送到相应的分片节点。此技巧可以避免批量重新路由成本并减少可能导致 EsRejectedException 的批量请求线程池占用。

比如我在不同的机器上有48个节点。假设我向任意节点发送一个包含3000个索引请求的批量请求,这些索引请求将根据路由重新路由到其他节点(通常是所有节点)。并且客户端线程必须等待整个过程完成,包括处理本地批量和等待其他节点的批量响应。然而,没有重新路由阶段,网络成本就没有了(除了转发到副本节点),客户端只需要等待更少的时间。同时,假设我只有 1 个副本,批量线程的总占用量只有 2 个。 (客户端-> 主分片和主分片-> 副本分片)

路由哈希:

shard_num = murmur3_hash (_routing) % num_primary_shards

试着看看:org.elasticsearch.cluster.routing.Murmur3HashFunction

客户端可以通过请求cat api获取分片和索引别名。

碎片信息url:cat shards

别名映射 url:cat aliases

一些关注:

  1. ES 可能会更改不同版本的默认哈希函数,这意味着客户端代码可能不兼容版本。
  2. 这个技巧是基于哈希结果基本平衡的假设。
  3. 客户端应该考虑到相应分片节点的连接超时等容错。