如何使用 RxJava 使用另一个流的元素属性作为标准来过滤一个可观察流?

How to filter one observable stream using another stream's element attribute as criteria using RxJava?

我正在使用两个具有展平数据结构的存储库,例如。 "MenuItemRepo" 和 "IngredientRepo"。它们之间的关系结构是:一个MenuItem可以包含很多Ingredients,一个Ingredient(例如Cheese)可以是很多MenuItem的一部分。数据库表的建模如下:

我能否仅将 RxJava 用于 select 来自 [MenuItem 流] 的 MenuItem 元素,其中它们的 MenuItem id 与 [MenuItem-Ingredient 参考流] 中的特定 Ingredient id 相匹配?

我在一个 Flowable 流中使用 .filter() 运算符来 select 基于其自身属性的元素(例如,获取所有不含麸质的成分):

mIngredientsRepo.getItems()
                .flatMap(Flowable::fromIterable)
                .filter(ingredient-> ingredient.isGlutenFree)
                .toList() ....

这是为了获取所有无麸质成分的列表。但在这种情况下,MenuItem 流不存储成分 ID。本质上我想看看 SQL-like JOIN 过滤是否可行,以及它是否会在 RxJava 中保持优雅。

目前我已经在存储库级别使用 SQLite 连接语句实现了。我想在 RxJava 中探索更直观和可维护的选项。

PS:这种数据结构在本地和远程都有使用,在Firebase中也有使用,这也是结构扁平化的原因之一。

您可以 collect() 将这些成分 ID 放入 HashSet,然后在菜单项流中,使用 filter()contains 检查:

mIngredientsRepo.getItems()
.flatMapIterable(v -> v)
.collect(HashSet::new, (a, b) -> a.add(b.id))
.flatMapPublisher(set -> menuitems.filter(mi -> set.contains(mi.id))