使用RXJava 2对数据进行分组,每组添加元素并输出一个列表

Group data with RXJava 2, add element to each group and output one list

我有一个 RXJava 2 观察器,它发出 object 类型 somethingWithDate。这些 object 有一个 属性,其中包含一个 java 日期 object 以及一个 boolean isHeader.

我需要用这些 object 做的是:

  1. 按月和年分组(2017 年 1 月的所有日期,2016 年 1 月的所有日期,2016 年 2 月的所有日期,...)
  2. 在组的开头添加一个header(也是一个somethingWithDate
  3. 根据彼此对组进行排序(按日期,在组内排序也很好,但不需要,header 需要在每个组中保持在第一位)
  4. 将所有组中的所有元素合并(?)回一个元素列表 single<List<somethingWithDate>>

下面是一个示例(所有 object 类型 somethingWithDate,但我将只在此处显示重要属性以使其更具可读性):

输入数据:

1.1.2017, 2.1.2017, 1.1.2016, 8.8.2017

结果:

header, 1.1.2016, header, 1.1.2017, 2.1.2017, header, 8.8.2017

我知道1.可以通过使用groupBy()来实现。不过,我还没有找到做 2 的方法。我也知道有一个 toList()toSortedList() 可能用于 3. 和 4. 但我不确定它如何与 groupBy() 中的组一起使用以及我如何收集它们在一个列表中。

伪代码:

inputListObserver
    .groupBy(obj.date.year + obj.date.month)
    .map(group -> group.addAtBeginning(headerElement))
    .sortBy(groupKey)
    .mergeToList()
    ...

我会使用 ConnectableObservable 来共享初始流的排放量。 使用此伪代码,您有 3 个按年份排序的流,带有 header.

ConnectableSoureObservable = sourceObservable.publish()      
Observable.Concat(
                    ConnectableSoureObservable
                                        .filter(Year2015)
                                        .collectInto(newList<somethingWithDate>(initialValue))
                                        .toSortedList()
                                        .flatMapIterable(),
                    ConnectableSoureObservable
                                        .filter(Year2016)
                                        .collectInto(newList<somethingWithDate>(initialValue))
                                        .toSortedList()
                                        .flatMapIterable(),
                    ConnectableSoureObservable
                                        .filter(Year2017)
                                        .collectInto(newList<somethingWithDate>(initialValue))
                                        .toSortedList()
                                        .flatmapIterable())
                    ...
 ConnectableSoureObservable.connect()

我想我终于找到了答案。使用平面图(感谢 @masps 的想法)似乎可以解决问题。这是伪代码:

.groupBy(makeKeyFromMonthAndYear()) // groups everything in months
.sorted(byGroupKey) // sorts the groups
.flatMap( // collect all groups back into one
   .startsWith(generateHeaderElement()) // add a header for each group
   .sorted() // optional, sort the values in a month
)
.toList() // output a Single<List<somethingWithDate>>