如何在文档存在时将文档批量插入 ElasticSearch 而无需更新

how to bulk insert documents to ElasticSearch without updating when document exists

我在 Nest 库中使用弹性搜索。我想知道当文档存在时,如何在不更新的情况下批量插入文档到 ElasticSearch?

这是一个批量 API 调用的示例,它将执行创建操作

private static void Main()
{
    var defaultIndex = "documents";
    var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
    var settings = new ConnectionSettings(pool)
        .DefaultIndex(defaultIndex);

    var client = new ElasticClient(settings);

    if (client.IndexExists(defaultIndex).Exists)
        client.DeleteIndex(defaultIndex);

    client.Index(new MyDocument(1) 
    { 
        Message = "new" 
    }, i => i.Refresh(Refresh.WaitFor));

    var documents = new [] 
    {
        new MyDocument(1) { Message = "updated" },
        new MyDocument(2) { Message = "updated" },
        new MyDocument(3) { Message = "updated" },
    };

    client.Bulk(b => b
        .CreateMany(documents)
        .Refresh(Refresh.WaitFor)
    );

    var getResponse = client.Get<MyDocument>(1);

    Console.WriteLine(getResponse.Source.Message == "new");
}

public class MyDocument 
{
    public MyDocument(int id) => Id = id;

    public int Id { get; set; }  

    public string Message { get; set; }
}

输出将为 true,这意味着 ID 为 1 的文档未在批量调用中创建,因为它已经存在。如果您查看批量响应,它将是类似于

的 HTTP 200 响应
{
  "took" : 1387,
  "errors" : true,
  "items" : [
    {
      "create" : {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "1",
        "status" : 409,
        "error" : {
          "type" : "version_conflict_engine_exception",
          "reason" : "[mydocument][1]: version conflict, document already exists (current version [1])",
          "index_uuid" : "DZIgGMZcSlWRycC1MGhJWQ",
          "shard" : "3",
          "index" : "documents"
        }
      }
    },
    {
      "create" : {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "2",
        "_version" : 1,
        "result" : "created",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 0,
        "_primary_term" : 1,
        "status" : 201
      }
    },
    {
      "create" : {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "3",
        "_version" : 1,
        "result" : "created",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 0,
        "_primary_term" : 1,
        "status" : 201
      }
    }
  ]
}

重要的是,"errors"true 并且第一个 "create" 操作响应表明错误是什么。

另一种使用 .CreateMany(...) 的方法是将 .UpdateMany(...) 与更新插入操作一起使用,在文档存在的情况下指定 "no-op" 操作

client.Bulk(b => b
    .UpdateMany(documents, (d, document) => d
        .Upsert(document)
        .Script(s => s
            .Source("ctx.op = 'none'")
        )
    )
    .Refresh(Refresh.WaitFor)
);

结果是一样的,即Id为1的文档没有被覆盖,只是响应略有不同

{
  "took" : 1307,
  "errors" : false,
  "items" : [
    {
      "update" : {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "1",
        "_version" : 1,
        "result" : "noop",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "status" : 200
      }
    },
    {
      "update" : {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "2",
        "_version" : 1,
        "result" : "created",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 0,
        "_primary_term" : 1,
        "status" : 201
      }
    },
    {
      "update" : {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "3",
        "_version" : 1,
        "result" : "created",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 0,
        "_primary_term" : 1,
        "status" : 201
      }
    }
  ]
}

请注意 "errors" 现在是 false,第一个 "update" 操作是 "noop"