使用logstash在弹性搜索中将两个索引组合成第三个索引
Combine two index into third index in elastic search using logstash
我有两个索引
- employee_data
{"code":1, "name":xyz, "city":"Mumbai" }
- transaction_data
{"code":1, "Month":June", payment:78000 }
我想要这样的第三个索引
3)join_index
{"code":1, "name":xyz, "city":"Mumbai", "Month":June", payment:78000 }
怎么可能??
我正在尝试使用 logstash
input {
elasticsearch {
hosts => "localost"
index => "employees_data,transaction_data"
query => '{ "query": { "match": { "code": 1} } }'
scroll => "5m"
docinfo => true
}
}
output {
弹性搜索{
主机 => ["localhost"]
index => "join1"
}
}
据我所知,仅使用 elasticsearch APIs 是不可能发生这种情况的。要处理此问题,您需要为相关文档设置唯一 ID。例如,您在问题中提到的代码可以作为文档的良好 ID。因此,您可以将第一个索引重新索引到第三个索引,并使用 UPDATE API 通过从第二个索引中读取文档来更新它们,并将它们的 ID 更新到第三个索引中。希望能帮到你。
您可以在 employees_data
上使用 elasticsearch input
在您的过滤器中,在 transaction_data
上使用 elasticsearch 过滤器
input {
elasticsearch {
hosts => "localost"
index => "employees_data"
query => '{ "query": { "match_all": { } } }'
sort => "code:desc"
scroll => "5m"
docinfo => true
}
}
filter {
elasticsearch {
hosts => "localhost"
index => "transaction_data"
query => "(code:\"%{[code]}\"
fields => {
"Month" => "Month",
"payment" => "payment"
}
}
}
output {
elasticsearch {
hosts => ["localhost"]
index => "join1"
}
}
然后使用 elasticsearch output
将您的新文档发送到您的第三个索引
您将有 3 个弹性搜索连接,结果可能会有点慢。
但它有效。
您不需要 Logstash 来执行此操作,Elasticsearch 本身通过利用 enrich processor
.
支持
首先,您需要创建一个丰富策略(使用最小的索引,假设它是 employees_data
):
PUT /_enrich/policy/employee-policy
{
"match": {
"indices": "employees_data",
"match_field": "code",
"enrich_fields": ["name", "city"]
}
}
然后您可以执行该策略以创建浓缩索引
POST /_enrich/policy/employee-policy/_execute
创建并填充丰富索引后,下一步需要您创建一个使用上述丰富内容的摄取管道 policy/index:
PUT /_ingest/pipeline/employee_lookup
{
"description" : "Enriching transactions with employee data",
"processors" : [
{
"enrich" : {
"policy_name": "employee-policy",
"field" : "code",
"target_field": "tmp",
"max_matches": "1"
}
},
{
"script": {
"if": "ctx.tmp != null",
"source": "ctx.putAll(ctx.tmp); ctx.remove('tmp');"
}
}
]
}
最后,您现在已准备好使用联接的数据创建目标索引。只需利用 _reindex
API 与我们刚刚创建的摄取管道相结合:
POST _reindex
{
"source": {
"index": "transaction_data"
},
"dest": {
"index": "join1",
"pipeline": "employee_lookup"
}
}
在 运行 之后,join1
索引将包含您所需要的内容,例如:
{
"_index" : "join1",
"_type" : "_doc",
"_id" : "0uA8dXMBU9tMsBeoajlw",
"_score" : 1.0,
"_source" : {
"code":1,
"name": "xyz",
"city": "Mumbai",
"Month": "June",
"payment": 78000
}
}
我有两个索引
- employee_data
{"code":1, "name":xyz, "city":"Mumbai" }
- transaction_data
{"code":1, "Month":June", payment:78000 }
我想要这样的第三个索引 3)join_index
{"code":1, "name":xyz, "city":"Mumbai", "Month":June", payment:78000 }
怎么可能??
我正在尝试使用 logstash
input {
elasticsearch {
hosts => "localost"
index => "employees_data,transaction_data"
query => '{ "query": { "match": { "code": 1} } }'
scroll => "5m"
docinfo => true
}
}
output {
弹性搜索{ 主机 => ["localhost"]
index => "join1"
}
}
据我所知,仅使用 elasticsearch APIs 是不可能发生这种情况的。要处理此问题,您需要为相关文档设置唯一 ID。例如,您在问题中提到的代码可以作为文档的良好 ID。因此,您可以将第一个索引重新索引到第三个索引,并使用 UPDATE API 通过从第二个索引中读取文档来更新它们,并将它们的 ID 更新到第三个索引中。希望能帮到你。
您可以在 employees_data
上使用 elasticsearch input在您的过滤器中,在 transaction_data
上使用 elasticsearch 过滤器input {
elasticsearch {
hosts => "localost"
index => "employees_data"
query => '{ "query": { "match_all": { } } }'
sort => "code:desc"
scroll => "5m"
docinfo => true
}
}
filter {
elasticsearch {
hosts => "localhost"
index => "transaction_data"
query => "(code:\"%{[code]}\"
fields => {
"Month" => "Month",
"payment" => "payment"
}
}
}
output {
elasticsearch {
hosts => ["localhost"]
index => "join1"
}
}
然后使用 elasticsearch output
将您的新文档发送到您的第三个索引您将有 3 个弹性搜索连接,结果可能会有点慢。 但它有效。
您不需要 Logstash 来执行此操作,Elasticsearch 本身通过利用 enrich processor
.
首先,您需要创建一个丰富策略(使用最小的索引,假设它是 employees_data
):
PUT /_enrich/policy/employee-policy
{
"match": {
"indices": "employees_data",
"match_field": "code",
"enrich_fields": ["name", "city"]
}
}
然后您可以执行该策略以创建浓缩索引
POST /_enrich/policy/employee-policy/_execute
创建并填充丰富索引后,下一步需要您创建一个使用上述丰富内容的摄取管道 policy/index:
PUT /_ingest/pipeline/employee_lookup
{
"description" : "Enriching transactions with employee data",
"processors" : [
{
"enrich" : {
"policy_name": "employee-policy",
"field" : "code",
"target_field": "tmp",
"max_matches": "1"
}
},
{
"script": {
"if": "ctx.tmp != null",
"source": "ctx.putAll(ctx.tmp); ctx.remove('tmp');"
}
}
]
}
最后,您现在已准备好使用联接的数据创建目标索引。只需利用 _reindex
API 与我们刚刚创建的摄取管道相结合:
POST _reindex
{
"source": {
"index": "transaction_data"
},
"dest": {
"index": "join1",
"pipeline": "employee_lookup"
}
}
在 运行 之后,join1
索引将包含您所需要的内容,例如:
{
"_index" : "join1",
"_type" : "_doc",
"_id" : "0uA8dXMBU9tMsBeoajlw",
"_score" : 1.0,
"_source" : {
"code":1,
"name": "xyz",
"city": "Mumbai",
"Month": "June",
"payment": 78000
}
}