从内部 flatMap 通知
Notify from inner flatMap
这是一个相当复杂的示例:
主要:
this.runInstructionAndGetResult().subscribe({
next: val => console.log(`NEXT VALUE: ${val}`),
error: val => console.log(`ERROR VALUE: ${val}`),
complete: val => console.log(`COMPLETE`)
});
观察值:
public runInstructionAndGetResult(): Observable<string> {
return this.runAnInstruction()
.flatMap((data) => {
console.info("flatMap of runAnInstruction:", data);
return this.getInstructionExecutionStatusInPolling()
.filter(data => data != "Polling")
.take(1)
.flatMap((data) => {
console.info("flatMap of getInstructionExecutionStatusInPolling:", data);
return this.getInstructionResult();
}).map((data) => {
console.info("Map of getInstructionResult:", data);
return data;
});
});
}
public runAnInstruction(): Observable<string> {
return Observable.of("StartRun");
}
public getInstructionResult(): Observable<string> {
return Observable.of("FinalResult");
}
public getInstructionExecutionStatusInPolling(): Observable<string> {
return Observable.interval(1000)
.concatMap(data => {
return this.getInstructionExecutionStatus();
});
}
public getInstructionExecutionStatus(): Observable<string> {
return Observable.of("Polling", "Terminate");
}
这里插播:
https://plnkr.co/edit/c1cahMtVARQnLgnHWlEe?p=preview
主要问题是我只想收到有关 "evolution" 外部内部流的通知。
现在只有当所有内部 flatMap 都完成时,我们才会在 main 上有 "next" 事件。
如何获得通知? 如何在轮询期间向主流发送显式值?
谢谢。
我找到了一个解决方案来分享。
此处插件已更新:
https://plnkr.co/edit/c1cahMtVARQnLgnHWlEe?p=preview
基本上我创建了一个简单的可观察对象:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/create.md
然后我以编程方式调用下一个方法并最终完成:
public runInstructionAndGetResult(): Observable<string> {
return Observable.create((ops)=> {
ops.next(1);
this.runAnInstruction()
.concatMap((data) => {
ops.next(2);
console.info("flatMap of runAnInstruction:", data);
return this.getInstructionExecutionStatusInPolling()
.filter(data => data != "Polling")
.take(1)
.concatMap((data) => {
ops.next(3);
console.info("flatMap of getInstructionExecutionStatusInPolling:", data);
return this.getInstructionResult();
}).map((data) => {
console.info("Map of getInstructionResult:", data);
ops.next(4);
ops.complete();
return data;
});
}).subscribe();
});
}
这是一个相当复杂的示例:
主要:
this.runInstructionAndGetResult().subscribe({
next: val => console.log(`NEXT VALUE: ${val}`),
error: val => console.log(`ERROR VALUE: ${val}`),
complete: val => console.log(`COMPLETE`)
});
观察值:
public runInstructionAndGetResult(): Observable<string> {
return this.runAnInstruction()
.flatMap((data) => {
console.info("flatMap of runAnInstruction:", data);
return this.getInstructionExecutionStatusInPolling()
.filter(data => data != "Polling")
.take(1)
.flatMap((data) => {
console.info("flatMap of getInstructionExecutionStatusInPolling:", data);
return this.getInstructionResult();
}).map((data) => {
console.info("Map of getInstructionResult:", data);
return data;
});
});
}
public runAnInstruction(): Observable<string> {
return Observable.of("StartRun");
}
public getInstructionResult(): Observable<string> {
return Observable.of("FinalResult");
}
public getInstructionExecutionStatusInPolling(): Observable<string> {
return Observable.interval(1000)
.concatMap(data => {
return this.getInstructionExecutionStatus();
});
}
public getInstructionExecutionStatus(): Observable<string> {
return Observable.of("Polling", "Terminate");
}
这里插播: https://plnkr.co/edit/c1cahMtVARQnLgnHWlEe?p=preview
主要问题是我只想收到有关 "evolution" 外部内部流的通知。
现在只有当所有内部 flatMap 都完成时,我们才会在 main 上有 "next" 事件。
如何获得通知? 如何在轮询期间向主流发送显式值?
谢谢。
我找到了一个解决方案来分享。
此处插件已更新:
https://plnkr.co/edit/c1cahMtVARQnLgnHWlEe?p=preview
基本上我创建了一个简单的可观察对象:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/create.md
然后我以编程方式调用下一个方法并最终完成:
public runInstructionAndGetResult(): Observable<string> {
return Observable.create((ops)=> {
ops.next(1);
this.runAnInstruction()
.concatMap((data) => {
ops.next(2);
console.info("flatMap of runAnInstruction:", data);
return this.getInstructionExecutionStatusInPolling()
.filter(data => data != "Polling")
.take(1)
.concatMap((data) => {
ops.next(3);
console.info("flatMap of getInstructionExecutionStatusInPolling:", data);
return this.getInstructionResult();
}).map((data) => {
console.info("Map of getInstructionResult:", data);
ops.next(4);
ops.complete();
return data;
});
}).subscribe();
});
}