将 Observable 与反馈合并,将 Observable 与其自身合并
Merging Observables with feedback, merging Observable with Itself
我需要创建 Observable,它将从不同的来源(其他 Observable)收集信息,每个来源都会影响事件值,但该值仍然是基于先前的值(一种状态机)构建的.
我们有带有 int 值和操作代码的消息:
class Message{
Integer value;
String operation;
public Message(Integer value, String operation) {
this.value = value;
this.operation = operation;
}
}
以及此类值的一些来源,初始值:
Observable<Message> dynamicMessage = Observable.just(new Message(1, "+"));
现在有事件源。这些源将发出基于 dynamicMessage 的先前值的值,新的 dynamicMessage 值将出现。实际上它的事件类型是:
class Changer1 {
String operation;
public Changer1(String operation) {
this.operation = operation;
}
}
class Changer2 {
Integer value;
public Changer2(Integer value) {
this.value = value;
}
}
Changer1 负责更改操作。 Changer2 负责更改值。
这些值的来源:
static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
.delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
.delay(2, TimeUnit.SECONDS));
static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS));
现在我需要更改 dynamicMessage Observable 并接收来自更改器的尊重消息。这是一个图表:
我也尝试编写一个程序,我是如何看待解决方案的,但是它因 OutOfMemoryException 而挂起,紧跟在第一个值之后:
消息{value=1, operation='+'}
清单:
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
public class RxDilemma {
static class Message{
Integer value;
String operation;
public Message(Integer value, String operation) {
this.value = value;
this.operation = operation;
}
@Override
public String toString() {
return "Message{" +
"value=" + value +
", operation='" + operation + '\'' +
'}';
}
}
static class Changer1 {
String operation;
public Changer1(String operation) {
this.operation = operation;
}
@Override
public String toString() {
return "Changer1{" +
"operation='" + operation + '\'' +
'}';
}
}
static class Changer2 {
Integer value;
public Changer2(Integer value) {
this.value = value;
}
@Override
public String toString() {
return "Changer2{" +
"value=" + value +
'}';
}
}
static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
.delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
.delay(2, TimeUnit.SECONDS));
static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS));
static Observable<Message> dynamicMessage = Observable.just(new Message(1, "+")).mergeWith(changers1.flatMap(new Func1<Changer1, Observable<Message>>() {
@Override
public Observable<Message> call(final Changer1 changer) {
return dynamicMessage.last().map(new Func1<Message, Message>() {
@Override
public Message call(Message message) {
message.operation = changer.operation;
return message;
}
});
}
})).mergeWith(changers2.flatMap(new Func1<Changer2, Observable<Message>>() {
@Override
public Observable<Message> call(final Changer2 changer2) {
return dynamicMessage.last().map(new Func1<Message, Message>() {
@Override
public Message call(Message message) {
if("+".equalsIgnoreCase(message.operation)){
message.value = message.value+changer2.value;
} else if("-".equalsIgnoreCase(message.operation)){
message.value = message.value-changer2.value;
}
return message;
}
});
}
}));
public static void main(String[] args) throws InterruptedException {
dynamicMessage.subscribe(new Action1<Message>() {
@Override
public void call(Message message) {
System.out.println(message);
}
});
Thread.sleep(5000000);
}
}
所以我的预期输出是:
消息{值=1,操作='+'}
消息{value=1, operation='-'}
消息{value=-1, operation='-'}
消息{value=1, operation='+'}
消息{value=2, operation='+'}
我也想过用CombineLatest合并所有,但是后来我发现CombineLatest没有提供更改了哪个元素,没有这个信息我不知道消息中需要做哪些更改。
有帮助吗?
换句话说,你想减少 scan 你的动作反映出一个状态的变化。有一个运算符,正确命名为 reduce
scan
,它完全符合您的要求。它接受每个发生的事件,并让您通过添加该转换的先前结果来转换它,这将是您的状态。
interface Changer {
State apply(State state);
}
class ValueChanger implements Changer {
...
State apply(State state){
// implement your logic of changing state by this action and return a new one
}
...
}
class OperationChanger implements Changer {
...
State apply(State state){
// implement your logic of changing state by this action and return a new one
}
...
}
Observable<ValueChanger> valueChangers
Observable<OperationChanger> operationChangers
Observable.merge(valueChangers, operationChangers)
.scan(initialState, (state, changer) -> changer.apply(state))
.subscribe(state -> {
// called every time a change happens
});
请注意,我稍微更改了您的命名以使其更有意义。另外,我在这里使用 Java8 lambda 表达式。如果您不能在您的项目中使用它们,只需将它们换成匿名 类。
编辑:使用 scan
运算符而不是 reduce
,因为它会传递每个结果,而不是只发出最终结果
已进行一些更改,希望能满足您的需求。请查看并返回结果。
static class Message {
Integer value;
String operation;
public Message(Integer value, String operation) {
this.value = value;
this.operation = operation;
}
@Override
public String toString() {
return "Message{" + "value=" + value + ", operation='" + operation
+ '\'' + '}';
}
public Message applyChange(Object changer){
if(changer instanceof Changer1){
this.operation = ((Changer1)changer).operation;
}else if(changer instanceof Changer2){
int v2 = ((Changer2)changer).value;
if("+".equals(operation)){
value += v2;
}else if("-".equals(operation)){
value -= v2;
}
}
return this;
}
}
static class Changer1{
String operation;
public Changer1(String operation) {
this.operation = operation;
}
@Override
public String toString() {
return "Changer1{" + "operation='" + operation + '\'' + '}';
}
}
static class Changer2{
Integer value;
public Changer2(Integer value) {
this.value = value;
}
@Override
public String toString() {
return "Changer2{" + "value=" + value + '}';
}
}
static Observable<? extends Object> changers1 = Observable
.just(new Changer1("-"))
.delay(1, TimeUnit.SECONDS)
.concatWith(
Observable.just(new Changer1("+")).delay(2,
TimeUnit.SECONDS));
static Observable<? extends Object> changers2 = Observable
.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS)
.concatWith(
Observable.just(new Changer2(2)).delay(2, TimeUnit.SECONDS));
static Observable<Message> dynamicMessage = Observable
.just(new Message(1, "+"))
.flatMap(m -> ((Observable<Object>)changers1).mergeWith((Observable<Object>)changers2).map(c -> m.applyChange(c)));
public static void main(String[] args) throws InterruptedException {
dynamicMessage.toBlocking().subscribe(v -> System.out.println(v));
}
变化:
Message
有一个新方法,每次发出 Changer
时都会调用它,它会更改 Message
对象的值。使用它是因为您只有一个 Message
对象,它实际上是初始值。
public Message applyChange(Object changer){
if(changer instanceof Changer1){
this.operation = ((Changer1)changer).operation;
}else if(changer instanceof Changer2){
int v2 = ((Changer2)changer).value;
if("+".equals(operation)){
value += v2;
}else if("-".equals(operation)){
value -= v2;
}
}
return this;
}
dynamicMessage
根据需要修改-
static Observable<Message> dynamicMessage = Observable
.just(new Message(1, "+"))
.flatMap(m -> ((Observable<Object>)changers1)
.mergeWith((Observable<Object>)changers2)
.map(c -> m.applyChange(c)));
好的,我这里有两个答案。其中一个正在运行,但它存在设计问题,即强制手动控制并发执行。另一个不是按设计工作的(也许经过一些操作使 reduce
运算符不适用于所有但对于每对数据它将起作用)。但我在这里发布自己的答案是因为我找到了更优雅、更正确的解决方案。
但在回答之前,请先看一下问题。为什么会挂起?答案是:因为运算符 last()
正在阻塞。所以我需要强制它是non-blocking。基于此处的另一个答案:,我可以为此使用 BehaviorSubject
class。
这是我的工作代码:
final BehaviorSubject<Message> subject = BehaviorSubject.create(new Message(1, "+"));
Observable<Message> observable = Observable.<Changer>merge(changers1, changers2).map(new Func1<Changer, Message>() {
@Override
public Message call(Changer changer) {
Message value = subject.getValue();
Message v = value.applyChange(changer);
subject.onNext(v);
return v;
}
});
(您可以在其他答案中找到缺少的部分代码 and )
不过,我不会将答案标记为正确的,我相信有人可能想以更适当的方式自己解决这个难题。
编辑:找到了正确的方法,请参阅标记的答案。
我需要创建 Observable,它将从不同的来源(其他 Observable)收集信息,每个来源都会影响事件值,但该值仍然是基于先前的值(一种状态机)构建的.
我们有带有 int 值和操作代码的消息:
class Message{
Integer value;
String operation;
public Message(Integer value, String operation) {
this.value = value;
this.operation = operation;
}
}
以及此类值的一些来源,初始值:
Observable<Message> dynamicMessage = Observable.just(new Message(1, "+"));
现在有事件源。这些源将发出基于 dynamicMessage 的先前值的值,新的 dynamicMessage 值将出现。实际上它的事件类型是:
class Changer1 {
String operation;
public Changer1(String operation) {
this.operation = operation;
}
}
class Changer2 {
Integer value;
public Changer2(Integer value) {
this.value = value;
}
}
Changer1 负责更改操作。 Changer2 负责更改值。
这些值的来源:
static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
.delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
.delay(2, TimeUnit.SECONDS));
static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS));
现在我需要更改 dynamicMessage Observable 并接收来自更改器的尊重消息。这是一个图表:
我也尝试编写一个程序,我是如何看待解决方案的,但是它因 OutOfMemoryException 而挂起,紧跟在第一个值之后:
消息{value=1, operation='+'}
清单:
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
public class RxDilemma {
static class Message{
Integer value;
String operation;
public Message(Integer value, String operation) {
this.value = value;
this.operation = operation;
}
@Override
public String toString() {
return "Message{" +
"value=" + value +
", operation='" + operation + '\'' +
'}';
}
}
static class Changer1 {
String operation;
public Changer1(String operation) {
this.operation = operation;
}
@Override
public String toString() {
return "Changer1{" +
"operation='" + operation + '\'' +
'}';
}
}
static class Changer2 {
Integer value;
public Changer2(Integer value) {
this.value = value;
}
@Override
public String toString() {
return "Changer2{" +
"value=" + value +
'}';
}
}
static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
.delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
.delay(2, TimeUnit.SECONDS));
static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS));
static Observable<Message> dynamicMessage = Observable.just(new Message(1, "+")).mergeWith(changers1.flatMap(new Func1<Changer1, Observable<Message>>() {
@Override
public Observable<Message> call(final Changer1 changer) {
return dynamicMessage.last().map(new Func1<Message, Message>() {
@Override
public Message call(Message message) {
message.operation = changer.operation;
return message;
}
});
}
})).mergeWith(changers2.flatMap(new Func1<Changer2, Observable<Message>>() {
@Override
public Observable<Message> call(final Changer2 changer2) {
return dynamicMessage.last().map(new Func1<Message, Message>() {
@Override
public Message call(Message message) {
if("+".equalsIgnoreCase(message.operation)){
message.value = message.value+changer2.value;
} else if("-".equalsIgnoreCase(message.operation)){
message.value = message.value-changer2.value;
}
return message;
}
});
}
}));
public static void main(String[] args) throws InterruptedException {
dynamicMessage.subscribe(new Action1<Message>() {
@Override
public void call(Message message) {
System.out.println(message);
}
});
Thread.sleep(5000000);
}
}
所以我的预期输出是:
消息{值=1,操作='+'}
消息{value=1, operation='-'}
消息{value=-1, operation='-'}
消息{value=1, operation='+'}
消息{value=2, operation='+'}
我也想过用CombineLatest合并所有,但是后来我发现CombineLatest没有提供更改了哪个元素,没有这个信息我不知道消息中需要做哪些更改。 有帮助吗?
换句话说,你想减少 scan 你的动作反映出一个状态的变化。有一个运算符,正确命名为 reduce
scan
,它完全符合您的要求。它接受每个发生的事件,并让您通过添加该转换的先前结果来转换它,这将是您的状态。
interface Changer {
State apply(State state);
}
class ValueChanger implements Changer {
...
State apply(State state){
// implement your logic of changing state by this action and return a new one
}
...
}
class OperationChanger implements Changer {
...
State apply(State state){
// implement your logic of changing state by this action and return a new one
}
...
}
Observable<ValueChanger> valueChangers
Observable<OperationChanger> operationChangers
Observable.merge(valueChangers, operationChangers)
.scan(initialState, (state, changer) -> changer.apply(state))
.subscribe(state -> {
// called every time a change happens
});
请注意,我稍微更改了您的命名以使其更有意义。另外,我在这里使用 Java8 lambda 表达式。如果您不能在您的项目中使用它们,只需将它们换成匿名 类。
编辑:使用 scan
运算符而不是 reduce
,因为它会传递每个结果,而不是只发出最终结果
已进行一些更改,希望能满足您的需求。请查看并返回结果。
static class Message {
Integer value;
String operation;
public Message(Integer value, String operation) {
this.value = value;
this.operation = operation;
}
@Override
public String toString() {
return "Message{" + "value=" + value + ", operation='" + operation
+ '\'' + '}';
}
public Message applyChange(Object changer){
if(changer instanceof Changer1){
this.operation = ((Changer1)changer).operation;
}else if(changer instanceof Changer2){
int v2 = ((Changer2)changer).value;
if("+".equals(operation)){
value += v2;
}else if("-".equals(operation)){
value -= v2;
}
}
return this;
}
}
static class Changer1{
String operation;
public Changer1(String operation) {
this.operation = operation;
}
@Override
public String toString() {
return "Changer1{" + "operation='" + operation + '\'' + '}';
}
}
static class Changer2{
Integer value;
public Changer2(Integer value) {
this.value = value;
}
@Override
public String toString() {
return "Changer2{" + "value=" + value + '}';
}
}
static Observable<? extends Object> changers1 = Observable
.just(new Changer1("-"))
.delay(1, TimeUnit.SECONDS)
.concatWith(
Observable.just(new Changer1("+")).delay(2,
TimeUnit.SECONDS));
static Observable<? extends Object> changers2 = Observable
.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS)
.concatWith(
Observable.just(new Changer2(2)).delay(2, TimeUnit.SECONDS));
static Observable<Message> dynamicMessage = Observable
.just(new Message(1, "+"))
.flatMap(m -> ((Observable<Object>)changers1).mergeWith((Observable<Object>)changers2).map(c -> m.applyChange(c)));
public static void main(String[] args) throws InterruptedException {
dynamicMessage.toBlocking().subscribe(v -> System.out.println(v));
}
变化:
Message
有一个新方法,每次发出 Changer
时都会调用它,它会更改 Message
对象的值。使用它是因为您只有一个 Message
对象,它实际上是初始值。
public Message applyChange(Object changer){
if(changer instanceof Changer1){
this.operation = ((Changer1)changer).operation;
}else if(changer instanceof Changer2){
int v2 = ((Changer2)changer).value;
if("+".equals(operation)){
value += v2;
}else if("-".equals(operation)){
value -= v2;
}
}
return this;
}
dynamicMessage
根据需要修改-
static Observable<Message> dynamicMessage = Observable
.just(new Message(1, "+"))
.flatMap(m -> ((Observable<Object>)changers1)
.mergeWith((Observable<Object>)changers2)
.map(c -> m.applyChange(c)));
好的,我这里有两个答案。其中一个正在运行,但它存在设计问题,即强制手动控制并发执行。另一个不是按设计工作的(也许经过一些操作使 reduce
运算符不适用于所有但对于每对数据它将起作用)。但我在这里发布自己的答案是因为我找到了更优雅、更正确的解决方案。
但在回答之前,请先看一下问题。为什么会挂起?答案是:因为运算符 last()
正在阻塞。所以我需要强制它是non-blocking。基于此处的另一个答案:BehaviorSubject
class。
这是我的工作代码:
final BehaviorSubject<Message> subject = BehaviorSubject.create(new Message(1, "+"));
Observable<Message> observable = Observable.<Changer>merge(changers1, changers2).map(new Func1<Changer, Message>() {
@Override
public Message call(Changer changer) {
Message value = subject.getValue();
Message v = value.applyChange(changer);
subject.onNext(v);
return v;
}
});
(您可以在其他答案中找到缺少的部分代码
编辑:找到了正确的方法,请参阅标记的答案。