将大量数据索引到elasticsearch
Index large amount of data into elasticsearch
我在 HBase 中有超过 60 亿条社交媒体数据(包括 content/time/author 和其他可能的字段),48 台服务器的 4100 个区域,我现在需要将这些数据刷新到 Elasticsearch。
我很清楚 ES 的 bulk API,并且在 Java 中使用 bulk 和 MapReduce 仍然需要很多天(至少一周左右)。我可以改用 spark,但我认为它不会有太大帮助。
我想知道是否还有其他技巧可以将这些大数据写入 ElasticSearch?喜欢手动写入 es 索引文件并使用某种恢复来加载本地文件系统中的文件?
感谢任何可能的建议,谢谢。
==============
关于我的集群环境的一些细节:
spark 1.3.1 独立版(我可以在 yarn 上更改它以使用 Spark 1.6.2 或 1.6.3)
Hadoop 2.7.1 (HDP 2.4.2.258)
弹性搜索 2.3.3
AFAIK Spark 是索引以下 2 个选项的最佳选择。
下面是我提供的方法:
划分(输入扫描标准)并征服 60 亿社交媒体数据:
我建议创建多个 Spark/Mapreduce 具有不同搜索条件的作业(根据类别或其他内容将 60 亿社交媒体数据分成 6 块)并并行触发它们。
例如,基于数据捕获时间范围(scan.setTimeRange(t1,t2))或使用一些模糊行逻辑(FuzzyRowFilter),肯定会加快速度。
或
流媒体方法的种类:
你也可以考虑当你通过 spark 或 mapreduce 插入数据时,你可以同时为它们创建索引。
例如在 SOLR 的情况下: clouder 有 NRT hbase lily 索引器...即当 hbase table 基于 WAL 填充时(预写日志) 条目同时创建 solr 索引。检查 Elastic 搜索是否有类似的东西。
即使它不适用于 ES,也不必费心,同时使用您可以自己创建的 Spark/Mapreduce 程序自行摄取数据。
选项 1:
我建议如果你对 spark 没问题,这是一个很好的解决方案
Spark 支持原生集成来自 hadoop 2.1 的 ES。
见
- 这里的样本与 1.3 不同的 spark 版本 on wards
- More samples other than Hbase
选项 2:如您所知,比 spark 慢一点
我找到了一个实用的技巧来提高批量索引的性能。
我可以在我的客户端中计算哈希路由,并确保包含所有索引请求的每个批量请求都具有相同的路由。根据路由结果和带ip的分片信息,我直接将批量请求发送到相应的分片节点。此技巧可以避免批量重新路由成本并减少可能导致 EsRejectedException 的批量请求线程池占用。
比如我在不同的机器上有48个节点。假设我向任意节点发送一个包含3000个索引请求的批量请求,这些索引请求将根据路由重新路由到其他节点(通常是所有节点)。并且客户端线程必须等待整个过程完成,包括处理本地批量和等待其他节点的批量响应。然而,没有重新路由阶段,网络成本就没有了(除了转发到副本节点),客户端只需要等待更少的时间。同时,假设我只有 1 个副本,批量线程的总占用量只有 2 个。 (客户端-> 主分片和主分片-> 副本分片)
路由哈希:
shard_num = murmur3_hash (_routing) % num_primary_shards
试着看看:org.elasticsearch.cluster.routing.Murmur3HashFunction
客户端可以通过请求cat api获取分片和索引别名。
碎片信息url:cat shards
别名映射 url:cat aliases
一些关注:
- ES 可能会更改不同版本的默认哈希函数,这意味着客户端代码可能不兼容版本。
- 这个技巧是基于哈希结果基本平衡的假设。
- 客户端应该考虑到相应分片节点的连接超时等容错。
我在 HBase 中有超过 60 亿条社交媒体数据(包括 content/time/author 和其他可能的字段),48 台服务器的 4100 个区域,我现在需要将这些数据刷新到 Elasticsearch。
我很清楚 ES 的 bulk API,并且在 Java 中使用 bulk 和 MapReduce 仍然需要很多天(至少一周左右)。我可以改用 spark,但我认为它不会有太大帮助。
我想知道是否还有其他技巧可以将这些大数据写入 ElasticSearch?喜欢手动写入 es 索引文件并使用某种恢复来加载本地文件系统中的文件?
感谢任何可能的建议,谢谢。
==============
关于我的集群环境的一些细节:
spark 1.3.1 独立版(我可以在 yarn 上更改它以使用 Spark 1.6.2 或 1.6.3)
Hadoop 2.7.1 (HDP 2.4.2.258)
弹性搜索 2.3.3
AFAIK Spark 是索引以下 2 个选项的最佳选择。 下面是我提供的方法:
划分(输入扫描标准)并征服 60 亿社交媒体数据:
我建议创建多个 Spark/Mapreduce 具有不同搜索条件的作业(根据类别或其他内容将 60 亿社交媒体数据分成 6 块)并并行触发它们。 例如,基于数据捕获时间范围(scan.setTimeRange(t1,t2))或使用一些模糊行逻辑(FuzzyRowFilter),肯定会加快速度。
或
流媒体方法的种类:
你也可以考虑当你通过 spark 或 mapreduce 插入数据时,你可以同时为它们创建索引。
例如在 SOLR 的情况下: clouder 有 NRT hbase lily 索引器...即当 hbase table 基于 WAL 填充时(预写日志) 条目同时创建 solr 索引。检查 Elastic 搜索是否有类似的东西。
即使它不适用于 ES,也不必费心,同时使用您可以自己创建的 Spark/Mapreduce 程序自行摄取数据。
选项 1:
我建议如果你对 spark 没问题,这是一个很好的解决方案 Spark 支持原生集成来自 hadoop 2.1 的 ES。 见
- 这里的样本与 1.3 不同的 spark 版本 on wards
- More samples other than Hbase
选项 2:如您所知,比 spark 慢一点
我找到了一个实用的技巧来提高批量索引的性能。
我可以在我的客户端中计算哈希路由,并确保包含所有索引请求的每个批量请求都具有相同的路由。根据路由结果和带ip的分片信息,我直接将批量请求发送到相应的分片节点。此技巧可以避免批量重新路由成本并减少可能导致 EsRejectedException 的批量请求线程池占用。
比如我在不同的机器上有48个节点。假设我向任意节点发送一个包含3000个索引请求的批量请求,这些索引请求将根据路由重新路由到其他节点(通常是所有节点)。并且客户端线程必须等待整个过程完成,包括处理本地批量和等待其他节点的批量响应。然而,没有重新路由阶段,网络成本就没有了(除了转发到副本节点),客户端只需要等待更少的时间。同时,假设我只有 1 个副本,批量线程的总占用量只有 2 个。 (客户端-> 主分片和主分片-> 副本分片)
路由哈希:
shard_num = murmur3_hash (_routing) % num_primary_shards
试着看看:org.elasticsearch.cluster.routing.Murmur3HashFunction
客户端可以通过请求cat api获取分片和索引别名。
碎片信息url:cat shards
别名映射 url:cat aliases
一些关注:
- ES 可能会更改不同版本的默认哈希函数,这意味着客户端代码可能不兼容版本。
- 这个技巧是基于哈希结果基本平衡的假设。
- 客户端应该考虑到相应分片节点的连接超时等容错。