RxJS 减少不会继续
RxJS reduce doesn't continue
为什么 flatMap 不会触发下游缩减?
我得到的代码如下:
handleFiles.flatMap(files =>
Rx.Observable.from(files).
flatMap((file, i) => fileReader(file, i)).
reduce((form, file, i) => {
form.append('file[' + i + ']', result);
console.log('reduce step', file);
return form;
}, new FormData()).
tap(console.log.bind(console, 'after reduce'))
).
subscribe(console.log.bind(console, 'response'));
问题是 'after reduce' 水龙头从未被击中。为什么?
日志是这样的:
reduce step [data]
reduce step [data]
截图:
问题不在flatMap
; reduce
的工作方式。
reduce
读取整个流并将其缩减为单个值,仅在源流关闭时发出。如果您的 from(files)
流没有结束,那么 reduce
将永远不会输出它的值。
请尝试使用 scan
;它发出每个中间步骤,似乎就是您要找的东西。
如果 files 是一个数组,那么如果从 fileReader 返回的 observable 终止,则 reduce 应该终止。因此,对于这段代码,问题在于 fileReader 返回了一个未完成的可观察对象。
这是一个使用 reduce 和非终止可观察对象的示例;
使用窗口时间:
import { fromEvent, interval, timer } from 'rxjs';
import { reduce, filter, windowTime, map, mergeMap } from 'rxjs/operators';
const interval$ = interval(1000);
const observable = interval$.pipe(
windowTime(2000), // each window is 2s
mergeMap(window$ => window$.pipe(
reduce((a,x) => { // reduce to array
return [...a,x];
}, []),
filter(x => !!x.length) // in the background timer is still running so suppress empty events
)), // flatten the Observable-of-Observables
);
const subscription = observable.subscribe(x => console.log(x));
setTimeout(() => subscription.unsubscribe(), 10000);
使用缓冲时间:
import { fromEvent } from 'rxjs';
import { bufferTime, filter, map } from 'rxjs/operators';
let count = 1;
const clicks = fromEvent(document, 'click');
const observable = clicks.pipe(
bufferTime(1000), // batch into array every 1s
filter(x => !!x.length), // ignore events without clicks
map(x => x.reduce((a,y) => ({...a, [count++]: y}), {})),
);
observable.subscribe(x => console.log(x));
使用审计时间:
import { fromEvent } from 'rxjs';
import { tap, auditTime, map } from 'rxjs/operators';
let buffer = [];
const clicks = fromEvent(document, 'click');
const observable = clicks.pipe(
tap((event) => buffer.push(event)),
auditTime(1000), // buffer every 1s after 1st click is detected
map((_lastEvent) => { // ignore last event
const events = buffer; // save off buffer
buffer = []; // clear buffer
return events.reduce((a,e,i) => ({...a, [i]: e}),{});
}),
);
observable.subscribe((events) => console.log(events));
使用 takeUntil 并重复:
注意:take/repeat 将重置 observable(即间隔计数器保持为 0,事件可能会丢失)
import { fromEvent, timer, interval } from 'rxjs';
import { takeUntil, reduce, repeat, filter } from 'rxjs/operators';
const interval$ = interval(1000);
const timer$ = timer(2000);
const observable = interval$.pipe(
takeUntil(timer$), // unsubscribe from stream every 2s so reduce terminates
reduce((acc, event) => [...acc, event], []), // reduce to array of events
filter(x => !!x.length), // suppress emission of empty stream
repeat(), // resubscribe to stream
);
// console will only show array of [0] since takeUntil stops right when interval emits
const subscription = observable.subscribe(x => console.log(x));
setTimeout(() => subscription.unsubscribe(), 10000);
为什么 flatMap 不会触发下游缩减?
我得到的代码如下:
handleFiles.flatMap(files =>
Rx.Observable.from(files).
flatMap((file, i) => fileReader(file, i)).
reduce((form, file, i) => {
form.append('file[' + i + ']', result);
console.log('reduce step', file);
return form;
}, new FormData()).
tap(console.log.bind(console, 'after reduce'))
).
subscribe(console.log.bind(console, 'response'));
问题是 'after reduce' 水龙头从未被击中。为什么?
日志是这样的:
reduce step [data]
reduce step [data]
截图:
问题不在flatMap
; reduce
的工作方式。
reduce
读取整个流并将其缩减为单个值,仅在源流关闭时发出。如果您的 from(files)
流没有结束,那么 reduce
将永远不会输出它的值。
请尝试使用 scan
;它发出每个中间步骤,似乎就是您要找的东西。
如果 files 是一个数组,那么如果从 fileReader 返回的 observable 终止,则 reduce 应该终止。因此,对于这段代码,问题在于 fileReader 返回了一个未完成的可观察对象。
这是一个使用 reduce 和非终止可观察对象的示例;
使用窗口时间:
import { fromEvent, interval, timer } from 'rxjs';
import { reduce, filter, windowTime, map, mergeMap } from 'rxjs/operators';
const interval$ = interval(1000);
const observable = interval$.pipe(
windowTime(2000), // each window is 2s
mergeMap(window$ => window$.pipe(
reduce((a,x) => { // reduce to array
return [...a,x];
}, []),
filter(x => !!x.length) // in the background timer is still running so suppress empty events
)), // flatten the Observable-of-Observables
);
const subscription = observable.subscribe(x => console.log(x));
setTimeout(() => subscription.unsubscribe(), 10000);
使用缓冲时间:
import { fromEvent } from 'rxjs';
import { bufferTime, filter, map } from 'rxjs/operators';
let count = 1;
const clicks = fromEvent(document, 'click');
const observable = clicks.pipe(
bufferTime(1000), // batch into array every 1s
filter(x => !!x.length), // ignore events without clicks
map(x => x.reduce((a,y) => ({...a, [count++]: y}), {})),
);
observable.subscribe(x => console.log(x));
使用审计时间:
import { fromEvent } from 'rxjs';
import { tap, auditTime, map } from 'rxjs/operators';
let buffer = [];
const clicks = fromEvent(document, 'click');
const observable = clicks.pipe(
tap((event) => buffer.push(event)),
auditTime(1000), // buffer every 1s after 1st click is detected
map((_lastEvent) => { // ignore last event
const events = buffer; // save off buffer
buffer = []; // clear buffer
return events.reduce((a,e,i) => ({...a, [i]: e}),{});
}),
);
observable.subscribe((events) => console.log(events));
使用 takeUntil 并重复:
注意:take/repeat 将重置 observable(即间隔计数器保持为 0,事件可能会丢失)
import { fromEvent, timer, interval } from 'rxjs';
import { takeUntil, reduce, repeat, filter } from 'rxjs/operators';
const interval$ = interval(1000);
const timer$ = timer(2000);
const observable = interval$.pipe(
takeUntil(timer$), // unsubscribe from stream every 2s so reduce terminates
reduce((acc, event) => [...acc, event], []), // reduce to array of events
filter(x => !!x.length), // suppress emission of empty stream
repeat(), // resubscribe to stream
);
// console will only show array of [0] since takeUntil stops right when interval emits
const subscription = observable.subscribe(x => console.log(x));
setTimeout(() => subscription.unsubscribe(), 10000);