如何从 MongoDB 中的 ChangeStream 过滤对特定字段的更新
How do you filter updates to specific fields from ChangeStream in MongoDB
我正在设置 ChangeStream 以在集合中的文档发生更改时通知我,以便我可以将该文档的 "LastModified" 元素更新到事件发生时。由于此更新将导致在 ChangeStream 上发生新事件,因此我需要过滤掉这些更新以防止无限循环(更新 LastModified 元素,因为 LastModified 元素刚刚更新...)。
当我指定确切的字段时,我有以下代码在工作:
ChangeStreamOptions options = new ChangeStreamOptions();
options.ResumeAfter = resumeToken;
string filter = "{ $and: [ { operationType: { $in: ['replace','insert','update'] } }, { 'updateDescription.updatedFields.LastModified': { $exists: false } } ] }";
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match(filter);
var cursor = collection.Watch(pipeline, options, cancelToken);
但是,我不想对 "updateDescription.updatedFields.LastModified" 进行硬编码,而是想提供一个我不希望存在于 updatedFields 文档中的元素名称列表。
我尝试过:
string filter = "{ $and: [ { operationType: { $in: ['replace','insert','update'] } }, { 'updateDescription.updatedFields': { $nin: [ 'LastModified' ] } } ] }";
但这没有按预期工作(我仍然收到 LastModified 更改的更新事件。
我最初使用的是 Filter Builder:
FilterDefinitionBuilder<ChangeStreamDocument<BsonDocument>> filterBuilder = Builders<ChangeStreamDocument<BsonDocument>>.Filter;
FilterDefinition<ChangeStreamDocument<BsonDocument>> filter = filterBuilder.In("operationType", new string[] { "replace", "insert", "update" }); //Only include the change if it was one of these types. Available types are: insert, update, replace, delete, invalidate
filter &= filterBuilder.Nin("updateDescription.updatedFields", ChangedFieldsToIgnore); //If this is an update, only include it if the field(s) updated contains 1+ fields not in the ChangedFieldsToIgnore list
其中 ChangedFieldsToIgnore 是一个包含我不想为其获取事件的字段名称的列表。
任何人都可以帮助我了解我需要使用的语法吗?或者我是否需要围绕我的 ChangedFieldsToIgnore 列表创建一个循环并在过滤器中为每个项目创建一个新条目“$exists:false”? (这似乎不是很有效)。
编辑:
我根据@wan-bachtiar 的回答尝试了以下代码,但我在 enumerator.MoveNext() 调用中遇到异常:
var match1 = new BsonDocument { { "$match", new BsonDocument { { "operationType", new BsonDocument { { "$in", new BsonArray(new string[] { "replace", "insert", "update" }) } } } } } };
var match2 = new BsonDocument { { "$addFields", new BsonDocument { { "tmpfields", new BsonDocument { { "$objectToArray", "$updateDescription.updatedFields" } } } } } };
var match3 = new BsonDocument { { "$match", new BsonDocument { { "tmpfields.k", new BsonDocument { { "$nin", new BsonArray(updatedFieldsToIgnore) } } } } } };
var pipeline = new[] { match1, match2, match3 };
var cursor = collection.Watch<ChangeStreamDocument<BsonDocument>>(pipeline, options, Profile.CancellationToken);
enumerator = cursor.ToEnumerable().GetEnumerator();
enumerator.MoveNext();
ChangeStreamDocument<BsonDocument> doc = enumerator.Current;
例外情况是:"{"Invalid field name: \"tmpfields\"."}"
我怀疑问题可能是我收到的 "replace" 和 "insert" 事件不包含 updateDescription 字段,因此 $addFields/$objectToArray 失败。我太新了,无法弄清楚语法,但我想我需要使用一个过滤器:
{ $match: { "operationType": { $in: ["replace", "insert"] } } }
OR
{ $eq: { "operationTYpe": "update" }} AND { $addFields....}
此外,C# 驱动程序似乎不包含有助于 $addFields 和 $objectToArray 操作的生成器。我只能使用 new BsonDocument {...}
方法来构建管道变量。
ChangedFieldsToIgnore is a List containing the field names that I do not want to get events for.
如果您想根据多个键进行过滤(updatedFields
是否包含某些字段),如果您先将键转换为值,则更容易。
您可以使用聚合运算符 $objectToArray 将 updatedFields
中包含的文档转换为值。例如:
pipeline = [{"$addFields": {
"tmpfields":{
"$objectToArray":"$updateDescription.updatedFields"}
}},
{"$match":{"tmpfields.k":{
"$nin":["LastModified", "AnotherUnwantedField"]}}}
];
上述聚合管道添加了一个名为 tmpfields
的临时字段。这个新字段将 updateDescription.updatedFields
的内容转换为 {name:value}
到 [{k:name, v:value}]
。一旦我们将这些键作为值,我们就可以使用 $nin
作为过滤器数组。
已更新
您收到 tmpfields
无效异常的原因是因为结果被转换为 ChangeStreamDocument 模型,该模型没有名为 tmpfields
的可识别字段。
在这种情况下,当不同的操作没有字段 updateDescription.updatedFields
时,tmpfields
的值将只是 null
。
下面是 MongoDB ChangeStream .Net/C# 使用 MongoDB .Net driver v2.5 的示例,以及修改输出更改流的聚合管道。
这个例子不是类型安全的,并且 return BsonDocument
:
var database = client.GetDatabase("database");
var collection = database.GetCollection<BsonDocument>("collection");
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
// Aggregation Pipeline
var addFields = new BsonDocument {
{ "$addFields", new BsonDocument {
{ "tmpfields", new BsonDocument {
{ "$objectToArray",
"$updateDescription.updatedFields" }
} }
} } };
var match = new BsonDocument {
{ "$match", new BsonDocument {
{ "tmpfields.k", new BsonDocument {
{ "$nin", new BsonArray{"LastModified", "Unwanted"} }
} } } } };
var pipeline = new[] { addFields, match };
// ChangeStreams
var cursor = collection.Watch<BsonDocument>(pipeline, options);
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine(change.ToJson());
}
我在下面写了这段代码,因为我遇到了和你一样的问题。无需摆弄 BsonObjects ...
//The operationType can be one of the following: insert, update, replace, delete, invalidate
//ignore the field lastrun as we would end in an endles loop
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<ATask>>()
.Match("{ operationType: { $in: [ 'replace', 'update' ] } }")
.Match(@"{ ""updateDescription.updatedFields.LastRun"" : { $exists: false } }")
.Match(@"{ ""updateDescription.updatedFields.IsRunning"" : { $exists: false } }");
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var changeStream = Collection.Watch(pipeline, options);
while (changeStream.MoveNext())
{
var next = changeStream.Current;
foreach (var obj in next)
yield return obj.FullDocument;
}
我正在设置 ChangeStream 以在集合中的文档发生更改时通知我,以便我可以将该文档的 "LastModified" 元素更新到事件发生时。由于此更新将导致在 ChangeStream 上发生新事件,因此我需要过滤掉这些更新以防止无限循环(更新 LastModified 元素,因为 LastModified 元素刚刚更新...)。
当我指定确切的字段时,我有以下代码在工作:
ChangeStreamOptions options = new ChangeStreamOptions();
options.ResumeAfter = resumeToken;
string filter = "{ $and: [ { operationType: { $in: ['replace','insert','update'] } }, { 'updateDescription.updatedFields.LastModified': { $exists: false } } ] }";
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match(filter);
var cursor = collection.Watch(pipeline, options, cancelToken);
但是,我不想对 "updateDescription.updatedFields.LastModified" 进行硬编码,而是想提供一个我不希望存在于 updatedFields 文档中的元素名称列表。
我尝试过:
string filter = "{ $and: [ { operationType: { $in: ['replace','insert','update'] } }, { 'updateDescription.updatedFields': { $nin: [ 'LastModified' ] } } ] }";
但这没有按预期工作(我仍然收到 LastModified 更改的更新事件。
我最初使用的是 Filter Builder:
FilterDefinitionBuilder<ChangeStreamDocument<BsonDocument>> filterBuilder = Builders<ChangeStreamDocument<BsonDocument>>.Filter;
FilterDefinition<ChangeStreamDocument<BsonDocument>> filter = filterBuilder.In("operationType", new string[] { "replace", "insert", "update" }); //Only include the change if it was one of these types. Available types are: insert, update, replace, delete, invalidate
filter &= filterBuilder.Nin("updateDescription.updatedFields", ChangedFieldsToIgnore); //If this is an update, only include it if the field(s) updated contains 1+ fields not in the ChangedFieldsToIgnore list
其中 ChangedFieldsToIgnore 是一个包含我不想为其获取事件的字段名称的列表。
任何人都可以帮助我了解我需要使用的语法吗?或者我是否需要围绕我的 ChangedFieldsToIgnore 列表创建一个循环并在过滤器中为每个项目创建一个新条目“$exists:false”? (这似乎不是很有效)。
编辑:
我根据@wan-bachtiar 的回答尝试了以下代码,但我在 enumerator.MoveNext() 调用中遇到异常:
var match1 = new BsonDocument { { "$match", new BsonDocument { { "operationType", new BsonDocument { { "$in", new BsonArray(new string[] { "replace", "insert", "update" }) } } } } } };
var match2 = new BsonDocument { { "$addFields", new BsonDocument { { "tmpfields", new BsonDocument { { "$objectToArray", "$updateDescription.updatedFields" } } } } } };
var match3 = new BsonDocument { { "$match", new BsonDocument { { "tmpfields.k", new BsonDocument { { "$nin", new BsonArray(updatedFieldsToIgnore) } } } } } };
var pipeline = new[] { match1, match2, match3 };
var cursor = collection.Watch<ChangeStreamDocument<BsonDocument>>(pipeline, options, Profile.CancellationToken);
enumerator = cursor.ToEnumerable().GetEnumerator();
enumerator.MoveNext();
ChangeStreamDocument<BsonDocument> doc = enumerator.Current;
例外情况是:"{"Invalid field name: \"tmpfields\"."}"
我怀疑问题可能是我收到的 "replace" 和 "insert" 事件不包含 updateDescription 字段,因此 $addFields/$objectToArray 失败。我太新了,无法弄清楚语法,但我想我需要使用一个过滤器:
{ $match: { "operationType": { $in: ["replace", "insert"] } } }
OR
{ $eq: { "operationTYpe": "update" }} AND { $addFields....}
此外,C# 驱动程序似乎不包含有助于 $addFields 和 $objectToArray 操作的生成器。我只能使用 new BsonDocument {...}
方法来构建管道变量。
ChangedFieldsToIgnore is a List containing the field names that I do not want to get events for.
如果您想根据多个键进行过滤(updatedFields
是否包含某些字段),如果您先将键转换为值,则更容易。
您可以使用聚合运算符 $objectToArray 将 updatedFields
中包含的文档转换为值。例如:
pipeline = [{"$addFields": {
"tmpfields":{
"$objectToArray":"$updateDescription.updatedFields"}
}},
{"$match":{"tmpfields.k":{
"$nin":["LastModified", "AnotherUnwantedField"]}}}
];
上述聚合管道添加了一个名为 tmpfields
的临时字段。这个新字段将 updateDescription.updatedFields
的内容转换为 {name:value}
到 [{k:name, v:value}]
。一旦我们将这些键作为值,我们就可以使用 $nin
作为过滤器数组。
已更新
您收到 tmpfields
无效异常的原因是因为结果被转换为 ChangeStreamDocument 模型,该模型没有名为 tmpfields
的可识别字段。
在这种情况下,当不同的操作没有字段 updateDescription.updatedFields
时,tmpfields
的值将只是 null
。
下面是 MongoDB ChangeStream .Net/C# 使用 MongoDB .Net driver v2.5 的示例,以及修改输出更改流的聚合管道。
这个例子不是类型安全的,并且 return BsonDocument
:
var database = client.GetDatabase("database");
var collection = database.GetCollection<BsonDocument>("collection");
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
// Aggregation Pipeline
var addFields = new BsonDocument {
{ "$addFields", new BsonDocument {
{ "tmpfields", new BsonDocument {
{ "$objectToArray",
"$updateDescription.updatedFields" }
} }
} } };
var match = new BsonDocument {
{ "$match", new BsonDocument {
{ "tmpfields.k", new BsonDocument {
{ "$nin", new BsonArray{"LastModified", "Unwanted"} }
} } } } };
var pipeline = new[] { addFields, match };
// ChangeStreams
var cursor = collection.Watch<BsonDocument>(pipeline, options);
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine(change.ToJson());
}
我在下面写了这段代码,因为我遇到了和你一样的问题。无需摆弄 BsonObjects ...
//The operationType can be one of the following: insert, update, replace, delete, invalidate
//ignore the field lastrun as we would end in an endles loop
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<ATask>>()
.Match("{ operationType: { $in: [ 'replace', 'update' ] } }")
.Match(@"{ ""updateDescription.updatedFields.LastRun"" : { $exists: false } }")
.Match(@"{ ""updateDescription.updatedFields.IsRunning"" : { $exists: false } }");
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var changeStream = Collection.Watch(pipeline, options);
while (changeStream.MoveNext())
{
var next = changeStream.Current;
foreach (var obj in next)
yield return obj.FullDocument;
}