我们可以 update/Upsert 在 mongodb 中记录吗?数据源是kafka
Can we update/Upsert a record in mongodb? data source is kafka
我们可以 update/upsert mongodb 中的记录但是是否有任何方法或函数可以直接在 mongodb 中更新或更新文档并且源系统是 kafka目的地是 mongodb.
我苦苦思索,终于得到了答案。我使用了以下 Mongodb sink connector
在他们的文档上苦思冥想一段时间后,我终于找到了解决方案。
这正是我正在使用的 mongodb 接收器连接器配置
{
"name": "mongodbsync",
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"topics": "alpha-foobar",
"mongodb.connection.uri": "mongodb://localhost:27017/kafkaconnect?w=1&journal=true",
"mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.ProvidedInValueStrategy"
}
我在配置中将 mongodb.writemodel.strategy
留空,因此它采用默认配置
我使用了来自同一连接器 github 的以下文档的用例 2
我正在处理这种情况,将 mysql table 数据与 kafka-jdbc-source connect
传输到 mongodb sink
。
也可以在 official docs 中找到上述策略
如果您有任何疑问,请随时提出。谢谢
是的,我们可以 update/upsert 数据。
对于更新,您必须在 Kafka 连接器中定义一个参数。
并将要更新记录的列列入白名单。 属性如下:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
value.projection.list=tokenNumber
value.projection.type=whitelist
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
我们可以 update/upsert mongodb 中的记录但是是否有任何方法或函数可以直接在 mongodb 中更新或更新文档并且源系统是 kafka目的地是 mongodb.
我苦苦思索,终于得到了答案。我使用了以下 Mongodb sink connector
在他们的文档上苦思冥想一段时间后,我终于找到了解决方案。
这正是我正在使用的 mongodb 接收器连接器配置
{
"name": "mongodbsync",
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"topics": "alpha-foobar",
"mongodb.connection.uri": "mongodb://localhost:27017/kafkaconnect?w=1&journal=true",
"mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.ProvidedInValueStrategy"
}
我在配置中将 mongodb.writemodel.strategy
留空,因此它采用默认配置
我使用了来自同一连接器 github 的以下文档的用例 2
我正在处理这种情况,将 mysql table 数据与 kafka-jdbc-source connect
传输到 mongodb sink
。
也可以在 official docs 中找到上述策略 如果您有任何疑问,请随时提出。谢谢
是的,我们可以 update/upsert 数据。 对于更新,您必须在 Kafka 连接器中定义一个参数。 并将要更新记录的列列入白名单。 属性如下:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
value.projection.list=tokenNumber
value.projection.type=whitelist
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy