如何在文档存在时将文档批量插入 ElasticSearch 而无需更新
how to bulk insert documents to ElasticSearch without updating when document exists
我在 Nest 库中使用弹性搜索。我想知道当文档存在时,如何在不更新的情况下批量插入文档到 ElasticSearch?
这是一个批量 API 调用的示例,它将执行创建操作
private static void Main()
{
var defaultIndex = "documents";
var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
var settings = new ConnectionSettings(pool)
.DefaultIndex(defaultIndex);
var client = new ElasticClient(settings);
if (client.IndexExists(defaultIndex).Exists)
client.DeleteIndex(defaultIndex);
client.Index(new MyDocument(1)
{
Message = "new"
}, i => i.Refresh(Refresh.WaitFor));
var documents = new []
{
new MyDocument(1) { Message = "updated" },
new MyDocument(2) { Message = "updated" },
new MyDocument(3) { Message = "updated" },
};
client.Bulk(b => b
.CreateMany(documents)
.Refresh(Refresh.WaitFor)
);
var getResponse = client.Get<MyDocument>(1);
Console.WriteLine(getResponse.Source.Message == "new");
}
public class MyDocument
{
public MyDocument(int id) => Id = id;
public int Id { get; set; }
public string Message { get; set; }
}
输出将为 true
,这意味着 ID 为 1
的文档未在批量调用中创建,因为它已经存在。如果您查看批量响应,它将是类似于
的 HTTP 200 响应
{
"took" : 1387,
"errors" : true,
"items" : [
{
"create" : {
"_index" : "documents",
"_type" : "mydocument",
"_id" : "1",
"status" : 409,
"error" : {
"type" : "version_conflict_engine_exception",
"reason" : "[mydocument][1]: version conflict, document already exists (current version [1])",
"index_uuid" : "DZIgGMZcSlWRycC1MGhJWQ",
"shard" : "3",
"index" : "documents"
}
}
},
{
"create" : {
"_index" : "documents",
"_type" : "mydocument",
"_id" : "2",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1,
"status" : 201
}
},
{
"create" : {
"_index" : "documents",
"_type" : "mydocument",
"_id" : "3",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1,
"status" : 201
}
}
]
}
重要的是,"errors"
是 true
并且第一个 "create"
操作响应表明错误是什么。
另一种使用 .CreateMany(...)
的方法是将 .UpdateMany(...)
与更新插入操作一起使用,在文档存在的情况下指定 "no-op" 操作
client.Bulk(b => b
.UpdateMany(documents, (d, document) => d
.Upsert(document)
.Script(s => s
.Source("ctx.op = 'none'")
)
)
.Refresh(Refresh.WaitFor)
);
结果是一样的,即Id为1
的文档没有被覆盖,只是响应略有不同
{
"took" : 1307,
"errors" : false,
"items" : [
{
"update" : {
"_index" : "documents",
"_type" : "mydocument",
"_id" : "1",
"_version" : 1,
"result" : "noop",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"status" : 200
}
},
{
"update" : {
"_index" : "documents",
"_type" : "mydocument",
"_id" : "2",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1,
"status" : 201
}
},
{
"update" : {
"_index" : "documents",
"_type" : "mydocument",
"_id" : "3",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1,
"status" : 201
}
}
]
}
请注意 "errors"
现在是 false
,第一个 "update"
操作是 "noop"
。
我在 Nest 库中使用弹性搜索。我想知道当文档存在时,如何在不更新的情况下批量插入文档到 ElasticSearch?
这是一个批量 API 调用的示例,它将执行创建操作
private static void Main()
{
var defaultIndex = "documents";
var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
var settings = new ConnectionSettings(pool)
.DefaultIndex(defaultIndex);
var client = new ElasticClient(settings);
if (client.IndexExists(defaultIndex).Exists)
client.DeleteIndex(defaultIndex);
client.Index(new MyDocument(1)
{
Message = "new"
}, i => i.Refresh(Refresh.WaitFor));
var documents = new []
{
new MyDocument(1) { Message = "updated" },
new MyDocument(2) { Message = "updated" },
new MyDocument(3) { Message = "updated" },
};
client.Bulk(b => b
.CreateMany(documents)
.Refresh(Refresh.WaitFor)
);
var getResponse = client.Get<MyDocument>(1);
Console.WriteLine(getResponse.Source.Message == "new");
}
public class MyDocument
{
public MyDocument(int id) => Id = id;
public int Id { get; set; }
public string Message { get; set; }
}
输出将为 true
,这意味着 ID 为 1
的文档未在批量调用中创建,因为它已经存在。如果您查看批量响应,它将是类似于
{
"took" : 1387,
"errors" : true,
"items" : [
{
"create" : {
"_index" : "documents",
"_type" : "mydocument",
"_id" : "1",
"status" : 409,
"error" : {
"type" : "version_conflict_engine_exception",
"reason" : "[mydocument][1]: version conflict, document already exists (current version [1])",
"index_uuid" : "DZIgGMZcSlWRycC1MGhJWQ",
"shard" : "3",
"index" : "documents"
}
}
},
{
"create" : {
"_index" : "documents",
"_type" : "mydocument",
"_id" : "2",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1,
"status" : 201
}
},
{
"create" : {
"_index" : "documents",
"_type" : "mydocument",
"_id" : "3",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1,
"status" : 201
}
}
]
}
重要的是,"errors"
是 true
并且第一个 "create"
操作响应表明错误是什么。
另一种使用 .CreateMany(...)
的方法是将 .UpdateMany(...)
与更新插入操作一起使用,在文档存在的情况下指定 "no-op" 操作
client.Bulk(b => b
.UpdateMany(documents, (d, document) => d
.Upsert(document)
.Script(s => s
.Source("ctx.op = 'none'")
)
)
.Refresh(Refresh.WaitFor)
);
结果是一样的,即Id为1
的文档没有被覆盖,只是响应略有不同
{
"took" : 1307,
"errors" : false,
"items" : [
{
"update" : {
"_index" : "documents",
"_type" : "mydocument",
"_id" : "1",
"_version" : 1,
"result" : "noop",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"status" : 200
}
},
{
"update" : {
"_index" : "documents",
"_type" : "mydocument",
"_id" : "2",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1,
"status" : 201
}
},
{
"update" : {
"_index" : "documents",
"_type" : "mydocument",
"_id" : "3",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1,
"status" : 201
}
}
]
}
请注意 "errors"
现在是 false
,第一个 "update"
操作是 "noop"
。