ElasticSearch Ingest Pipeline:创建和更新时间戳字段
ElasticSearch Ingest Pipeline: create and update timestamp field
根据 this answer, I have created a Ingest Pipeline 到 运行 针对特定索引在我的索引上创建时间戳字段:
PUT _ingest/pipeline/auto_now_add
{
"description": "Assigns the current date if not yet present and if the index name is whitelisted",
"processors": [
{
"script": {
"source": """
// skip if not whitelisted
if (![ "my_index_1",
"my_index_2"
].contains(ctx['_index'])) { return; }
// always update updated_at
ctx['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
"""
}
}
]
}
然后我将所有索引设置应用为默认管道
PUT _all/_settings
{
"index": {
"default_pipeline": "auto_now_add"
}
}
之后,我开始将我的对象索引到这些索引中。当我查询一个索引项时,我会得到那个 updated_at
字段在索引时更新的项,例如:
{
_index: 'my_index_1',
_type: '_doc',
_id: 'r1285044056',
_version: 11,
_seq_no: 373,
_primary_term: 2,
found: true,
_source: {
updated_at: '2021-07-07 04:35:39',
...
}
}
我现在想要一个 created_at
字段,它只在第一次更新,所以我尝试以这种方式更新上面的脚本:
PUT _ingest/pipeline/auto_now_add
{
"description": "Assigns the current date if not yet present and if the index name is whitelisted",
"processors": [
{
"script": {
"source": """
// skip if not whitelisted
if (![ "my_index_1",
"my_index_2",
"..."
].contains(ctx['_index'])) { return; }
// always update updated_at
ctx['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
// don't overwrite if present
if (ctx != null && ctx['created_at'] != null) { return; }
ctx['created_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
"""
}
}
]
}
但这个解决方案似乎不起作用:条件
if (ctx != null && ctx['created_at'] != null) { return; }
将始终失败,因此导致在索引上的每个对象更新时更新 created_at
,与 updated_at
字段相同,使其无用。
那么,如何防止这种情况发生,并确保该字段 created_at
在 Ingestion Pipeline 创建后存在?
如@Val 在 中所述:
... the ingest pipeline processor(s) will only operate within the context of the document you're sending, not the one stored (if any).
因此,您将无法访问底层 _source
或 doc
,因为摄取管道是为 ingest 阶段,而不是 update 阶段。
您当然可以让您的 auto_now_add
管道自动添加 updated_at
,并且您可以通过检查 created_at
(如果摄取负载中尚未存在)来扩展它ctx.containsKey
— 因为 ctx
本质上是 java Map
:
PUT _ingest/pipeline/auto_now_add
{
"description": "Assigns the current date if not yet present and if the index name is whitelisted",
"processors": [
{
"script": {
"source": """
// skip if not whitelisted
if (![ "my_index_1",
"my_index_2",
"..."
].contains(ctx['_index'])) { return; }
def now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
// guaranteee updated_at
ctx['updated_at'] = now;
// add created_at only if nonexistent in the payload
if (!ctx.containsKey('created_at')) {
ctx['created_at'] = now;
}
"""
}
}
]
}
但是,这仅适用于您第一次摄取文档!
运行:
POST my_index_1/_doc/some_id
{
"some": "param"
}
将产生:
{
"some" : "param",
"updated_at" : "2021-07-08 10:35:13",
"created_at" : "2021-07-08 10:35:13"
}
现在,为了在每次更新文档时自动递增 updated_at
,您还需要一个脚本 — 这次存储在 _scripts
,不是_ingest/pipeline
:
PUT _scripts/incement_update_at__plus_new_params
{
"script": {
"lang": "painless",
"source": """
// add whatever is in the params
ctx._source.putAll(params);
// increment updated_at no matter what was in the params
ctx._source['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
"""
}
}
然后,当您 运行 您的 _update
调用时,请提及上述 script
:
POST my_index_1/_doc/some_id/_update
{
"script": {
"id": "incement_update_at__plus_new_params",
"params": {
"your": "new params"
}
}
}
这将在不触及 created_at
的情况下增加 updated_at
并添加任何其他参数:
{
"some":"param",
"updated_at":"2021-07-08 10:49:44", <--
"created_at":"2021-07-08 10:39:55",
"your":"new params" <--
}
无耻外挂:我讨论pipelines & scripts in great detail in my Elasticsearch Handbook.
根据 this answer, I have created a Ingest Pipeline 到 运行 针对特定索引在我的索引上创建时间戳字段:
PUT _ingest/pipeline/auto_now_add
{
"description": "Assigns the current date if not yet present and if the index name is whitelisted",
"processors": [
{
"script": {
"source": """
// skip if not whitelisted
if (![ "my_index_1",
"my_index_2"
].contains(ctx['_index'])) { return; }
// always update updated_at
ctx['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
"""
}
}
]
}
然后我将所有索引设置应用为默认管道
PUT _all/_settings
{
"index": {
"default_pipeline": "auto_now_add"
}
}
之后,我开始将我的对象索引到这些索引中。当我查询一个索引项时,我会得到那个 updated_at
字段在索引时更新的项,例如:
{
_index: 'my_index_1',
_type: '_doc',
_id: 'r1285044056',
_version: 11,
_seq_no: 373,
_primary_term: 2,
found: true,
_source: {
updated_at: '2021-07-07 04:35:39',
...
}
}
我现在想要一个 created_at
字段,它只在第一次更新,所以我尝试以这种方式更新上面的脚本:
PUT _ingest/pipeline/auto_now_add
{
"description": "Assigns the current date if not yet present and if the index name is whitelisted",
"processors": [
{
"script": {
"source": """
// skip if not whitelisted
if (![ "my_index_1",
"my_index_2",
"..."
].contains(ctx['_index'])) { return; }
// always update updated_at
ctx['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
// don't overwrite if present
if (ctx != null && ctx['created_at'] != null) { return; }
ctx['created_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
"""
}
}
]
}
但这个解决方案似乎不起作用:条件
if (ctx != null && ctx['created_at'] != null) { return; }
将始终失败,因此导致在索引上的每个对象更新时更新 created_at
,与 updated_at
字段相同,使其无用。
那么,如何防止这种情况发生,并确保该字段 created_at
在 Ingestion Pipeline 创建后存在?
如@Val 在
... the ingest pipeline processor(s) will only operate within the context of the document you're sending, not the one stored (if any).
因此,您将无法访问底层 _source
或 doc
,因为摄取管道是为 ingest 阶段,而不是 update 阶段。
您当然可以让您的 auto_now_add
管道自动添加 updated_at
,并且您可以通过检查 created_at
(如果摄取负载中尚未存在)来扩展它ctx.containsKey
— 因为 ctx
本质上是 java Map
:
PUT _ingest/pipeline/auto_now_add
{
"description": "Assigns the current date if not yet present and if the index name is whitelisted",
"processors": [
{
"script": {
"source": """
// skip if not whitelisted
if (![ "my_index_1",
"my_index_2",
"..."
].contains(ctx['_index'])) { return; }
def now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
// guaranteee updated_at
ctx['updated_at'] = now;
// add created_at only if nonexistent in the payload
if (!ctx.containsKey('created_at')) {
ctx['created_at'] = now;
}
"""
}
}
]
}
但是,这仅适用于您第一次摄取文档!
运行:
POST my_index_1/_doc/some_id
{
"some": "param"
}
将产生:
{
"some" : "param",
"updated_at" : "2021-07-08 10:35:13",
"created_at" : "2021-07-08 10:35:13"
}
现在,为了在每次更新文档时自动递增 updated_at
,您还需要一个脚本 — 这次存储在 _scripts
,不是_ingest/pipeline
:
PUT _scripts/incement_update_at__plus_new_params
{
"script": {
"lang": "painless",
"source": """
// add whatever is in the params
ctx._source.putAll(params);
// increment updated_at no matter what was in the params
ctx._source['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
"""
}
}
然后,当您 运行 您的 _update
调用时,请提及上述 script
:
POST my_index_1/_doc/some_id/_update
{
"script": {
"id": "incement_update_at__plus_new_params",
"params": {
"your": "new params"
}
}
}
这将在不触及 created_at
的情况下增加 updated_at
并添加任何其他参数:
{
"some":"param",
"updated_at":"2021-07-08 10:49:44", <--
"created_at":"2021-07-08 10:39:55",
"your":"new params" <--
}
无耻外挂:我讨论pipelines & scripts in great detail in my Elasticsearch Handbook.