如何使用spark streaming实时更新elasticsearch文档?

How to update elasticsearch documents in real time with spark streaming ?

我正在使用 Spark Streaming 将 HTTP 请求聚合到 HTTP 会话中,并以基于会话 ID 的更新插入模式将会话索引到 Elasticsearch 中。每个会话都包含实时计算和更新的机器人分数。 我想将机器人分数传播到属于同一会话的所有 HTTP 请求。我有办法实时对已经索引的 HTTP 请求执行此类更新吗?

ElasticSearch(目前)不支持 UPDATE WHERE 类型查询。

您必须分两步完成此操作。

  1. 执行查询以获取具有特定会话 ID 的所有文档
  2. 使用部分更新用分数更新每个文档 有关更多详细信息,请参阅 https://www.elastic.co/guide/en/elasticsearch/guide/current/partial-updates.html,但换句话说,类似于

POST /sessions/1/_update { "doc" : { "score": 22 } }

其中 URL 中的 1 是您要更新的文档 ID。 _update 操作将保留任何现有字段并仅更新分数(尽管并不是说 _update 严格来说不是真的,因为它将创建一个包含当前字段值的新文档并删除旧文档,但对于您的情况来说这是无关紧要的语义).