rxjava2 基于发射次数的分支逻辑
rxjava2 branching logic based on number of emissions
我想根据上游的排放量来分支逻辑。
准确的说,我要:
- 当上游 为空时不会发生任何事情
- 当上游只发出一个值然后完成
时触发一个分支
- 当上游发出多个值然后完成时触发一个分支。
我一直在绞尽脑汁想办法解决这个问题,我想出了一些可行但看起来非常冗长的方法。我想知道是否有更简单的方法来做到这一点。
此解决方案基于 RxJava2Extensions 项目中的 valve 运算符。
方案大纲如下:
- 使用
publish(foo)
多次订阅上游
- 对逻辑的两个分支使用
merge
- 对于'more than one emission logic'使用最初关闭的
valve
并在第二次排放时打开它,如果没有排放或只有一次排放则打开阀门。通过打破阀门我的意思是终止控制 Publisher
- 对于 'just one emission logic' 使用最初关闭的
valve
。使用 ambArray
在没有排放或第二次排放时打开阀门,或者在只有一次排放时打开阀门。
所以这似乎可行,但我担心的是:
- 它的功能看起来设计过度了。这可以编码得更简单、更干净吗?
- 整个阀门破坏业务将触发我正在接受的异常,但可能还有其他与阀门无关的异常,我可能应该在这里区分并让它们向下传播。
[编辑]阀门破裂很重要,这样单一排放逻辑的阀门就不会累积用于多重排放逻辑的排放,也不会那样占用内存[/编辑]
代码如下:
Flowable.just(1,2,3,4,5) // +1 emissions
//Flowable.just(1) // 1 emission
//Flowable.empty() // 0 emissions
.publish( //publish so that you get connectableFlowable inside
f ->
Flowable.merge( //merge for the logic split
f.compose(
valve(f.scan(0, (sum, i) -> sum + 1) //scan to emit progressive count
.filter(i -> i > 1) //filter for when count > 1
.take(1) //take just first such count
.concatMap(__ -> Flowable.<Boolean>never().startWith(true)) //and open the valve
.switchIfEmpty(Flowable.empty()), //break the valve if there was just 1 element
false) //start with the valve closed
)
.onErrorResumeNext(Flowable.empty()) //swallow the broken valve exception???
.map(__ -> "more than one elements!"), //here goes logic for +1 emissions
f.compose(
valve(
Flowable.ambArray(
f.scan(0, (sum, i) -> sum + 1) //do progressive counts
.switchIfEmpty(Flowable.never()) //if there was no elements then never end this guy
.filter(i -> i > 1) //filter > 1
.take(1) //take just first one
.concatMap(
__ -> Flowable.<Boolean>empty()) //if there was > 1 element then emit empty and break the valve so we
//don't accumulate byte arrays that are meant for multipart upload
,
f.count() //count the stream
.map(c -> c == 1) //open valve if the count was 1
.toFlowable()
.concatWith(Flowable.never()) //and keep the stream opened forever
),
false
)
)
.onErrorResumeNext(Flowable.empty())
.map(i -> "just one element") //here goes logic for just one emission
)
)
.doOnNext(i -> System.out.println("haya! " + i))
.blockingSubscribe();
}
正如我怀疑的那样,我把它弄得太复杂了。我以这种方式解决了这个问题:
public static <U, D> FlowableTransformer<U, D> singleMultipleBranching(
FlowableTransformer<U, D> singleBranchTransformer,
FlowableTransformer<U, D> manyBranchTransformer
)
{
return
fl ->
fl.replay( //replay so that you get connectableFlowable inside
f -> f.buffer(2)
.take(1)
.switchMap(
buf -> {
switch (buf.size()) {
case 1:
return f.compose(
singleBranchTransformer);
case 2:
return f.compose(
manyBranchTransformer);
default:
return Flowable.empty();
}
}
)
);
}
我想根据上游的排放量来分支逻辑。
准确的说,我要:
- 当上游 为空时不会发生任何事情
- 当上游只发出一个值然后完成 时触发一个分支
- 当上游发出多个值然后完成时触发一个分支。
我一直在绞尽脑汁想办法解决这个问题,我想出了一些可行但看起来非常冗长的方法。我想知道是否有更简单的方法来做到这一点。
此解决方案基于 RxJava2Extensions 项目中的 valve 运算符。
方案大纲如下:
- 使用
publish(foo)
多次订阅上游 - 对逻辑的两个分支使用
merge
- 对于'more than one emission logic'使用最初关闭的
valve
并在第二次排放时打开它,如果没有排放或只有一次排放则打开阀门。通过打破阀门我的意思是终止控制Publisher
- 对于 'just one emission logic' 使用最初关闭的
valve
。使用ambArray
在没有排放或第二次排放时打开阀门,或者在只有一次排放时打开阀门。
所以这似乎可行,但我担心的是:
- 它的功能看起来设计过度了。这可以编码得更简单、更干净吗?
- 整个阀门破坏业务将触发我正在接受的异常,但可能还有其他与阀门无关的异常,我可能应该在这里区分并让它们向下传播。 [编辑]阀门破裂很重要,这样单一排放逻辑的阀门就不会累积用于多重排放逻辑的排放,也不会那样占用内存[/编辑]
代码如下:
Flowable.just(1,2,3,4,5) // +1 emissions
//Flowable.just(1) // 1 emission
//Flowable.empty() // 0 emissions
.publish( //publish so that you get connectableFlowable inside
f ->
Flowable.merge( //merge for the logic split
f.compose(
valve(f.scan(0, (sum, i) -> sum + 1) //scan to emit progressive count
.filter(i -> i > 1) //filter for when count > 1
.take(1) //take just first such count
.concatMap(__ -> Flowable.<Boolean>never().startWith(true)) //and open the valve
.switchIfEmpty(Flowable.empty()), //break the valve if there was just 1 element
false) //start with the valve closed
)
.onErrorResumeNext(Flowable.empty()) //swallow the broken valve exception???
.map(__ -> "more than one elements!"), //here goes logic for +1 emissions
f.compose(
valve(
Flowable.ambArray(
f.scan(0, (sum, i) -> sum + 1) //do progressive counts
.switchIfEmpty(Flowable.never()) //if there was no elements then never end this guy
.filter(i -> i > 1) //filter > 1
.take(1) //take just first one
.concatMap(
__ -> Flowable.<Boolean>empty()) //if there was > 1 element then emit empty and break the valve so we
//don't accumulate byte arrays that are meant for multipart upload
,
f.count() //count the stream
.map(c -> c == 1) //open valve if the count was 1
.toFlowable()
.concatWith(Flowable.never()) //and keep the stream opened forever
),
false
)
)
.onErrorResumeNext(Flowable.empty())
.map(i -> "just one element") //here goes logic for just one emission
)
)
.doOnNext(i -> System.out.println("haya! " + i))
.blockingSubscribe();
}
正如我怀疑的那样,我把它弄得太复杂了。我以这种方式解决了这个问题:
public static <U, D> FlowableTransformer<U, D> singleMultipleBranching(
FlowableTransformer<U, D> singleBranchTransformer,
FlowableTransformer<U, D> manyBranchTransformer
)
{
return
fl ->
fl.replay( //replay so that you get connectableFlowable inside
f -> f.buffer(2)
.take(1)
.switchMap(
buf -> {
switch (buf.size()) {
case 1:
return f.compose(
singleBranchTransformer);
case 2:
return f.compose(
manyBranchTransformer);
default:
return Flowable.empty();
}
}
)
);
}