两种通量之间的差异

Difference between two flux

我有两个顺序不重要的通量,我想得到一个具有差异的合成通量。我该怎么做?

示例:

Flux<Tweet> remoteTweets = Flux.just(
        new Tweet("tag1",new TweetID("text","name"),"userimage","country","place"),
        new Tweet("tag2",new TweetID("text","name"),"userimage","country","place")
);

Flux<Tweet> localTweets = Flux.just(
        new Tweet("tag1",new TweetID("text","name"),"userimage","country","place")
);

预期结果:tag2

如果 localTweets 是一个有限流并且适合内存,那么下面的工作:

Flux<Tweet> remoteTweets = Flux.just(
        new Tweet("tag1", new TweetID("text", "name"), "userimage", "country", "place"),
        new Tweet("tag2", new TweetID("text", "name"), "userimage", "country", "place")
);

Flux<Tweet> localTweets = Flux.just(
        new Tweet("tag1", new TweetID("text", "name"), "userimage", "country", "place")
).cache(); // note cache operator here, to avoid mutiple subscription

remoteTweets
        .filterWhen(remoteTweet -> localTweets.hasElement(remoteTweet).map(hasElement -> !hasElement))
        .subscribe(System.out::println);

如果流是有限的,但不适合内存,那么您应该离开 cache 运算符。这意味着 localTweets Flux 将被订阅多次。

如果流是无限的,您应该应用一些窗口化策略(例如,只检查最近 10 分钟的推文)。