如何使用 elasticsearch-dsl-py 使 Elasticsearch 索引保持最新?

How does one keep an Elasticsearch index up-to-date using elasticsearch-dsl-py?

我开发了一个小型个人信息目录,我的客户端可以通过 Django 管理界面访问和更新该目录。该信息需要可搜索,因此我设置了我的 Django 站点以将该数据保存在搜索索引中。我最初使用 Haystack 和 Whoosh 作为搜索索引,但最近我不得不放弃这些工具,转而使用 Elasticsearch 5。

以前,每当目录中的任何内容更新时,代码都会简单地清除整个搜索索引并从头开始重建它。该目录中只有几百个条目,因此性能不佳。不幸的是,尝试在 Elasticsearch 中做同样的事情是非常不可靠的,因为我认为我的代码中存在某种竞争条件。

这是我使用 elasticsearch-py 和 elasticsearch-dsl-py 编写的代码:

import elasticsearch
import time
from django.apps import apps
from django.conf import settings
from elasticsearch.helpers import bulk
from elasticsearch_dsl.connections import connections
from elasticsearch_dsl import DocType, Text, Search

# Create the default Elasticsearch connection using the host specified in settings.py.
elasticsearch_host = "{0}:{1}".format(
    settings.ELASTICSEARCH_HOST['HOST'], settings.ELASTICSEARCH_HOST['PORT']
)
elasticsearch_connection = connections.create_connection(hosts=[elasticsearch_host])


class DepartmentIndex(DocType):
    url = Text()
    name = Text()
    text = Text(analyzer='english')
    content_type = Text()

    class Meta:
        index = 'departmental_directory'


def refresh_index():
    # Erase the existing index.
    try:
        elasticsearch_connection.indices.delete(index=DepartmentIndex().meta.index)
    except elasticsearch.exceptions.NotFoundError:
        # If it doesn't exist, the job's already done.
        pass

    # Wait a few seconds to give enough time for Elasticsearch to accept that the 
    # DepartmentIndex is gone before we try to recreate it.
    time.sleep(3)

    # Rebuild the index from scratch.
    DepartmentIndex.init()
    Department = apps.get_model('departmental_directory', 'Department')
    bulk(
        client=elasticsearch_connection, 
        actions=(b.indexing() for b in Department.objects.all().iterator())
    )

我已将 Django 信号设置为在 Department 被保存时调用 refresh_index()。但是 refresh_index() 由于这个错误经常崩溃:

elasticsearch.exceptions.RequestError: TransportError(400, u'index_already_exists_exception', u'index [departmental_directory/uOQdBukEQBWvMZk83eByug] already exists')

这就是我添加 time.sleep(3) 调用的原因。我假设索引在调用 DepartmentIndex.init() 时尚未完全删除,这导致了错误。

我的猜测是我一直在以完全错误的方式解决这个问题。必须有更好的方法来使 elasticsearch 索引保持最新使用 elasticsearch-dsl-py,但我只是不知道它是什么,而且我无法通过他们的文档弄明白。

在 google 上搜索 "rebuild elasticsearch index from scratch" 会得到大量 "how to reindex your elasticsearch data" 的结果,但这不是我想要的。我需要用我的应用程序数据库中更新的新数据替换这些数据。

也许这会有所帮助:https://github.com/HonzaKral/es-django-example/blob/master/qa/models.py#L137-L146

无论哪种方式,您都希望有 2 种方法:将所有数据批量加载到新索引 (https://github.com/HonzaKral/es-django-example/blob/master/qa/management/commands/index_data.py),并且可以选择使用上述 methods/or 信号进行同步。