Elasticsearch 无法将来自 pymongo 的日期时间字段解析为对象

Elasticsearch fails in parsing datetime field coming from pymongo as object

我正在尝试使用 pymongo 和 Python 客户端 elasticsearch[=32 将数据从 mongoDB 流式传输到 Elasticsearch =].

我设置了一个映射,我在这里报告与我感兴趣的领域相关的片段:

"updated_at": { "type": "date", "format": "dateOptionalTime" }

我的脚本使用 pymongo 从 MongoDB 抓取每个文档并尝试将其索引到 Elasticsearch 中作为

from elasticsearch import Elasticsearch
from pymongo import MongoClient

mongo_client = MongoClient('localhost', 27017)
es_client = Elasticsearch(hosts=[{"host": "localhost", "port": 9200}])
db = mongo_client['my_db']
collection = db['my_collection']

for doc in collection.find():
    es_client.index(
         index='index_name', 
         doc_type='my_type', 
         id=str(doc['_id']), 
         body=json.dumps(doc, default=json_util.default)
    )

我在运行遇到的问题是:

elasticsearch.exceptions.RequestError: TransportError(400, u'MapperParsingException[failed to parse [updated_at]]; nested: ElasticsearchIllegalArgumentException[unknown property [$date]]; ')

我认为问题的根源在于 pymongo 将字段 updated_at 序列化为 datetime.datetime 对象,正如我所见我在 for 循环中打印文档:

u'updated_at': datetime.datetime(2014, 8, 31, 17, 18, 13, 17000)

这与 Elasticsearch 查找映射中指定的 date 类型的对象相冲突。

有什么解决办法吗?

您走对了,您的 Python datetime 需要序列化为 ISO 8601-compliant 日期字符串。因此,您需要在 json.dumps() 调用中添加一个 CustomEncoder。首先,将您的 CustomEncoder 声明为 JSONEncoder 的子类,它将处理 datetimetime 属性的转换,但将其余部分委托给其超类:

class CustomEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.strftime('%Y-%m-%dT%H:%M:%S%z')
        if isinstance(obj, time):
            return obj.strftime('%H:%M:%S')
        if hasattr(obj, 'to_json'):
            return obj.to_json()
        return super(CustomEncoder, self).default(obj)

然后您可以在 json.dumps 调用中使用它,如下所示:

...
body=json.dumps(doc, default=json_util.default, cls=CustomEncoder)
...

我猜你的问题是你正在使用

body=json.dumps(doc, default=json_util.default)

但你应该使用

body=doc

这样做对我有用,因为 elasticsearch 似乎正在处理将字典别名化为 JSON 文档(当然,假设 doc 是字典,我猜它是)。

至少在我使用的 elasticsearch 版本中 (2.x),datetime.datetime 是正确别名的,不需要映射。例如,这对我有用:

doc = {"updated_on": datetime.now(timezone.utc)}
res = es.index(index=es_index, doc_type='my_type',
               id=1, body=doc)

并且被 Kibana 识别为日期。

您可以使用:

from elasticsearch_dsl.serializer import serializer

serializer.dumps(your_dict)

your_dict替换为您的Document().prepare()document.to_dict()