如何从 MongoDB 中的 ChangeStream 过滤对特定字段的更新

How do you filter updates to specific fields from ChangeStream in MongoDB

我正在设置 ChangeStream 以在集合中的文档发生更改时通知我,以便我可以将该文档的 "LastModified" 元素更新到事件发生时。由于此更新将导致在 ChangeStream 上发生新事件,因此我需要过滤掉这些更新以防止无限循环(更新 LastModified 元素,因为 LastModified 元素刚刚更新...)。

当我指定确切的字段时,我有以下代码在工作:

ChangeStreamOptions options = new ChangeStreamOptions();
options.ResumeAfter = resumeToken;

string filter = "{ $and: [ { operationType: { $in: ['replace','insert','update'] } }, { 'updateDescription.updatedFields.LastModified': { $exists: false } } ] }";
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match(filter);

var cursor = collection.Watch(pipeline, options, cancelToken);

但是,我不想对 "updateDescription.updatedFields.LastModified" 进行硬编码,而是想提供一个我不希望存在于 updatedFields 文档中的元素名称列表。

我尝试过:

string filter = "{ $and: [ { operationType: { $in: ['replace','insert','update'] } }, { 'updateDescription.updatedFields': { $nin: [ 'LastModified' ] } } ] }";

但这没有按预期工作(我仍然收到 LastModified 更改的更新事件。

我最初使用的是 Filter Builder:

FilterDefinitionBuilder<ChangeStreamDocument<BsonDocument>> filterBuilder = Builders<ChangeStreamDocument<BsonDocument>>.Filter;
FilterDefinition<ChangeStreamDocument<BsonDocument>> filter = filterBuilder.In("operationType", new string[] { "replace", "insert", "update" });  //Only include the change if it was one of these types.  Available types are: insert, update, replace, delete, invalidate
filter &= filterBuilder.Nin("updateDescription.updatedFields", ChangedFieldsToIgnore); //If this is an update, only include it if the field(s) updated contains 1+ fields not in the ChangedFieldsToIgnore list

其中 ChangedFieldsToIgnore 是一个包含我不想为其获取事件的字段名称的列表。

任何人都可以帮助我了解我需要使用的语法吗?或者我是否需要围绕我的 ChangedFieldsToIgnore 列表创建一个循环并在过滤器中为每个项目创建一个新条目“$exists:false”? (这似乎不是很有效)。

编辑:

我根据@wan-bachtiar 的回答尝试了以下代码,但我在 enumerator.MoveNext() 调用中遇到异常:

var match1 = new BsonDocument { { "$match", new BsonDocument { { "operationType", new BsonDocument { { "$in", new BsonArray(new string[] { "replace", "insert", "update" }) } } } } } };
var match2 = new BsonDocument { { "$addFields", new BsonDocument { { "tmpfields", new BsonDocument { { "$objectToArray", "$updateDescription.updatedFields" } } } } } };
var match3 = new BsonDocument { { "$match", new BsonDocument { { "tmpfields.k", new BsonDocument { { "$nin", new BsonArray(updatedFieldsToIgnore) } } } } } };
var pipeline = new[] { match1, match2, match3 };

var cursor = collection.Watch<ChangeStreamDocument<BsonDocument>>(pipeline, options, Profile.CancellationToken);
enumerator = cursor.ToEnumerable().GetEnumerator();

enumerator.MoveNext();
ChangeStreamDocument<BsonDocument> doc = enumerator.Current;

例外情况是:"{"Invalid field name: \"tmpfields\"."}"

我怀疑问题可能是我收到的 "replace" 和 "insert" 事件不包含 updateDescription 字段,因此 $addFields/$objectToArray 失败。我太新了,无法弄清楚语法,但我想我需要使用一个过滤器:

{ $match: { "operationType": { $in: ["replace", "insert"] } } }
OR
{ $eq: { "operationTYpe": "update" }} AND { $addFields....}

此外,C# 驱动程序似乎不包含有助于 $addFields 和 $objectToArray 操作的生成器。我只能使用 new BsonDocument {...} 方法来构建管道变量。

ChangedFieldsToIgnore is a List containing the field names that I do not want to get events for.

如果您想根据多个键进行过滤(updatedFields 是否包含某些字段),如果您先将键转换为值,则更容易。

您可以使用聚合运算符 $objectToArrayupdatedFields 中包含的文档转换为值。例如:

pipeline = [{"$addFields": {
             "tmpfields":{
               "$objectToArray":"$updateDescription.updatedFields"}
            }}, 
            {"$match":{"tmpfields.k":{
                       "$nin":["LastModified", "AnotherUnwantedField"]}}}
];

上述聚合管道添加了一个名为 tmpfields 的临时字段。这个新字段将 updateDescription.updatedFields 的内容转换为 {name:value}[{k:name, v:value}]。一旦我们将这些键作为值,我们就可以使用 $nin 作为过滤器数组。

已更新

您收到 tmpfields 无效异常的原因是因为结果被转换为 ChangeStreamDocument 模型,该模型没有名为 tmpfields 的可识别字段。

在这种情况下,当不同的操作没有字段 updateDescription.updatedFields 时,tmpfields 的值将只是 null

下面是 MongoDB ChangeStream .Net/C# 使用 MongoDB .Net driver v2.5 的示例,以及修改输出更改流的聚合管道。

这个例子不是类型安全的,并且 return BsonDocument :

var database = client.GetDatabase("database");            
var collection = database.GetCollection<BsonDocument>("collection");

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };

// Aggregation Pipeline
var addFields = new BsonDocument { 
                    { "$addFields", new BsonDocument { 
                       { "tmpfields", new BsonDocument { 
                         { "$objectToArray", 
                           "$updateDescription.updatedFields" } 
                       } } 
                 } } };
var match = new BsonDocument { 
                { "$match", new BsonDocument { 
                  { "tmpfields.k", new BsonDocument { 
                    { "$nin", new BsonArray{"LastModified", "Unwanted"} } 
            } } } } };

var pipeline = new[] { addFields, match };

// ChangeStreams
var cursor = collection.Watch<BsonDocument>(pipeline, options);

foreach (var change in cursor.ToEnumerable())
{
    Console.WriteLine(change.ToJson());
}

我在下面写了这段代码,因为我遇到了和你一样的问题。无需摆弄 BsonObjects ...

//The operationType can be one of the following: insert, update, replace, delete, invalidate
//ignore the field lastrun as we would end in an endles loop
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<ATask>>()
    .Match("{ operationType: { $in: [ 'replace', 'update' ] } }")
    .Match(@"{ ""updateDescription.updatedFields.LastRun"" : { $exists: false } }")
    .Match(@"{ ""updateDescription.updatedFields.IsRunning"" : { $exists: false } }");

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var changeStream = Collection.Watch(pipeline, options);    

while (changeStream.MoveNext())
{
    var next = changeStream.Current;
    foreach (var obj in next)
        yield return obj.FullDocument;
}