将数据从 Neptune 复制到 AWS ElasticsearchService 时应用自定义转换逻辑

Apply customized transformation logic when replicate data from Neptune to AWS ElasticsearchService

Amazon Neptune now supports Full-Text Search Using Amazon Elasticsearch Service。它会自动将数据从 Neptune 复制到 Elasticsearch。我的问题是:Neptune 是否支持在复制期间自定义转换逻辑?例如,我在 Neptune 中有一个像 Brand(id=123, name="Calvin Klein") 这样的顶点,我想将自定义的转换逻辑应用于该顶点,以便该顶点将转换为 Elasticsearch 中的文档 {id:123, name:"Calvin Klein", normalizedName:"calvinklein"}

是否支持自定义逻辑?

是的,Neptune ES 复制过程确实允许用户拥有自己的转换逻辑,用于在 Elastic Search 中存储数据。但是必须小心,因为更改 ES 文档可能会破坏 NeptuneGremlin 通过 ES 进行的文本搜索支持,因为它需要文档中的所有原始字段。如果需要,可以随时向弹性搜索文档添加新字段。

Please refer Neptune data model for Elastic search

可以在 Gremlin FTS 查询中使用自定义 ES 字段吗?

如果自定义 ES 字段的存储方式与 gremlin 属性 存储在 ES 文档中的方式相同,即作为 "predicates" 字段中的嵌套字段,则可以在 Gremlin FTS 查询中使用自定义 ES 字段。

Ex: if {normalizedName:"calvinklein"} is to be added as custom field then ES document should store it as: { ....... // Original fields "predicates": { ....... // Original Properties "normalizedName":[ { "value": "calvinklein" } ] }

此字段可以像任何其他 Gremlin 属性 一样仅使用 Neptune FTS 查询进行搜索。 (字段不存在于 Neptune 中,但存在于 ES 中)

如何添加自定义转换逻辑?

您引用的示例使用基于 Python 的 AWS Lambda 处理程序,使用 NeptuneStreams 将数据从 Neptune 复制到 Elastic Search。它从 NeptuneStreams 获取更改日志并将它们转换为 ES upsert 请求。

可以提供自己的 Lambda 轮询处理程序实现来处理流记录。详情请关注下方博客post:

https://aws.amazon.com/blogs/database/capture-graph-changes-using-neptune-streams/

根据上面的博客post请按照以下步骤操作:

  1. 创建自定义流处理程序 class,它继承自轮询框架的 AbstractHandler 并实现 handle_records() 方法。
  2. 完成流处理程序的创作后,以 ZIP 存档的形式创建部署包并将其上传到 S3。
  3. 要使用新的 zip 存档,请在 CFN 模板中为您的处理程序包提供 LambdaS3Bucket 和 LambdaS3Key。
  4. 此外,提供您的 StreamRecordsHandler 的名称,即您创建的自定义处理程序。