使用 python 的 Elastic Search 批量更新,如何将新数据附加到数组字段
Elastic Search Bulk Update using python, how to append a array field with new data
如何使用 python 中的 bulkupdate 更新弹性搜索中的字段。我尝试了很多方法都出错了。在某些情况下,我收到文档丢失错误,我该如何同时更新和更新插入。并且附加到字段不是 working.elasticsearch==7.9.1 是我在 python
中使用的包
for i in range(0, length, steps):
end_index = length-1 if i+steps>length else i+steps
temp_list = test_data[i: end_index]
bulk_file = ''
actions = [{
"_index": "test-data",
"_opt_type":"update",
"_type": "test-test-data",
"_id": test_row ['testId'],
"doc":{"script": {
"source": "ctx._source.DataIds.add(params.DataIds)",
"lang": "painless",
"params": {
"DataIds":test_row ['DataIds']
}
}}
}
for test_row in temp_list
]
helpers.bulk(es, actions)
我得到的错误是这个
{'update': {'_index': 'test-data', '_type': 'products', '_id': '333', 'status': 400, 'error': {'type': 'illegal_argument_exception', 'reason': 'failed
to execute script', 'caused_by': {'type': 'script_exception', 'reason': 'runtime error', 'script_stack': ['ctx._source.dataIds.add(params.dataIds)', '
^---- HERE'], 'script': 'if (ctx._source.dataIds == null) { ctx._source.dataIds = []; } ctx._source.dataIds.add(params.dataIds)', 'lang': 'painless', 'position': {'offse
t': 105, 'start': 71, 'end': 118}, 'caused_by': {'type': 'illegal_argument_exception', 'reason': 'dynamic method [java.lang.String, add/1] not found'}}}, 'data': {'upsert': {}, 'scripted_up
sert': True, 'script': {'source': 'if (ctx._source.dataIds == null) { ctx._source.dataIds = []; } ctx._source.dataIds.add(params.dataIds)', 'lang': 'painless', 'params': {'c
dataIds': 'set123'}}}}}])
upsert
via script 的正确方法是没有 doc
而只有 script
部分。如果要在同一命令中更新插入和更新,还需要 upsert
部分。它是这样的:
actions = [{
"_op_type":"update",
"_index": "test-data",
"_type": "test-test-data",
"_id": test_row ['testId'],
"upsert": {
"DataIds": test_row ['DataIds']
},
"script": {
"source": "ctx._source.DataIds.add(params.DataIds)",
"lang": "painless",
"params": {
"DataIds":test_row ['DataIds']
}
}
} for test_row in temp_list
]
另一种方法是 scripted_upsert
actions = [{
"_op_type":"update",
"_index": "test-data",
"_type": "test-test-data",
"_id": test_row ['testId'],
"upsert": {},
"scripted_upsert": true,
"script": {
"source": "if (ctx._source.DataIds == null) { ctx._source.DataIds = []; } ctx._source.DataIds.add(params.DataIds)",
"lang": "painless",
"params": {
"DataIds":test_row ['DataIds']
}
}
} for test_row in temp_list
]
如何使用 python 中的 bulkupdate 更新弹性搜索中的字段。我尝试了很多方法都出错了。在某些情况下,我收到文档丢失错误,我该如何同时更新和更新插入。并且附加到字段不是 working.elasticsearch==7.9.1 是我在 python
中使用的包for i in range(0, length, steps):
end_index = length-1 if i+steps>length else i+steps
temp_list = test_data[i: end_index]
bulk_file = ''
actions = [{
"_index": "test-data",
"_opt_type":"update",
"_type": "test-test-data",
"_id": test_row ['testId'],
"doc":{"script": {
"source": "ctx._source.DataIds.add(params.DataIds)",
"lang": "painless",
"params": {
"DataIds":test_row ['DataIds']
}
}}
}
for test_row in temp_list
]
helpers.bulk(es, actions)
我得到的错误是这个
{'update': {'_index': 'test-data', '_type': 'products', '_id': '333', 'status': 400, 'error': {'type': 'illegal_argument_exception', 'reason': 'failed
to execute script', 'caused_by': {'type': 'script_exception', 'reason': 'runtime error', 'script_stack': ['ctx._source.dataIds.add(params.dataIds)', '
^---- HERE'], 'script': 'if (ctx._source.dataIds == null) { ctx._source.dataIds = []; } ctx._source.dataIds.add(params.dataIds)', 'lang': 'painless', 'position': {'offse
t': 105, 'start': 71, 'end': 118}, 'caused_by': {'type': 'illegal_argument_exception', 'reason': 'dynamic method [java.lang.String, add/1] not found'}}}, 'data': {'upsert': {}, 'scripted_up
sert': True, 'script': {'source': 'if (ctx._source.dataIds == null) { ctx._source.dataIds = []; } ctx._source.dataIds.add(params.dataIds)', 'lang': 'painless', 'params': {'c
dataIds': 'set123'}}}}}])
upsert
via script 的正确方法是没有 doc
而只有 script
部分。如果要在同一命令中更新插入和更新,还需要 upsert
部分。它是这样的:
actions = [{
"_op_type":"update",
"_index": "test-data",
"_type": "test-test-data",
"_id": test_row ['testId'],
"upsert": {
"DataIds": test_row ['DataIds']
},
"script": {
"source": "ctx._source.DataIds.add(params.DataIds)",
"lang": "painless",
"params": {
"DataIds":test_row ['DataIds']
}
}
} for test_row in temp_list
]
另一种方法是 scripted_upsert
actions = [{
"_op_type":"update",
"_index": "test-data",
"_type": "test-test-data",
"_id": test_row ['testId'],
"upsert": {},
"scripted_upsert": true,
"script": {
"source": "if (ctx._source.DataIds == null) { ctx._source.DataIds = []; } ctx._source.DataIds.add(params.DataIds)",
"lang": "painless",
"params": {
"DataIds":test_row ['DataIds']
}
}
} for test_row in temp_list
]