如何等待 RxJS 中的一致状态?
How to wait for a consistent state in RxJS?
我有一个取书和与之关联的借书证的代码:
// mimic http requests
const fetchBook = (bookId: number) => {
const title = 'Book' + bookId;
return timer(200).pipe(mapTo({ bookId, title }));
}
const fetchLibraryCard = (bookId: number) => {
const borrowerName = 'Borrower of Book' + bookId;
return timer(300).pipe(mapTo({ borrowerName }));
}
const bookId$ = new Subject<number>();
const book$ = bookId$.pipe(
switchMap(bookId => fetchBook(bookId)),
shareReplay(1)
);
// e.g. 'Refresh library card' button
const libraryCardUpdater$ = new BehaviorSubject<void>(undefined);
const libraryCard$ = combineLatest([bookId$, libraryCardUpdater$]).pipe(
switchMap(([bookId]) => fetchLibraryCard(bookId)),
shareReplay(1)
);
combineLatest([book$, libraryCard$]).subscribe(([book, libraryCard]) => {
console.log('book:', book.title, '| borrower:', libraryCard.borrowerName)
})
bookId$.next(1);
setTimeout(() => bookId$.next(2), 500);
setTimeout(() => libraryCardUpdater$.next(), 1000);
setTimeout(() => bookId$.next(3), 1500);
订阅者状态不一致的问题:
book: Book1 | borrower: Borrower of Book1 <-- OK
book: Book2 | borrower: Borrower of Book1 <-- Not OK
book: Book2 | borrower: Borrower of Book2 <-- OK
book: Book2 | borrower: Borrower of Book2 <-- OK, but redundant
book: Book3 | borrower: Borrower of Book2 <-- Not OK
book: Book3 | borrower: Borrower of Book3 <-- OK
我想在 bookId$
改变的同时将 undefined
推到 libraryCard$
。
但是如何以被动的方式做到这一点呢?
更新:
借书证应始终与取回的图书一致(或在加载时 undefined
)。 bookId$
可以随时通过用户操作进行更改。借书证也可以由用户随时手动更新 (libraryCardUpdater$
)。 libraryCardUpdater$
发射应该重新取卡,但不应该重新取书
更新2:
我刚刚意识到借书证可以在书后顺序取回。这是可以接受的,虽然对于最终用户来说不是完美的解决方案。
你必须扭转局面。您的真实来源必须是 bookId$
,并且从构建的可观察对象中您可以获得这本书和 libraryCard
:
const bookId$ = new ReplaySubject<number>(1);
const libraryCardUpdater$ = new Subject<void>();
const libraryCardBook$ = combineLatest([
bookId$.pipe(
distinctUntilChanged(),
switchMap(bookId => fetchBook(bookId))
),
libraryCardUpdater$.pipe(
switchMap(() => this.bookId$),
switchMap((bookId) => fetchLibraryCard(bookId))
)
]).pipe(
map(([ book, libraryCard ]) => ({ book, libraryCard })),
startWith({ book: undefined, libraryCard: undefined }),
shareReplay(1)
);
const book$ = libraryCardBook$.pipe(map(({ book }) => book);
const libraryCard$ = libraryCardBook$.pipe(map(({ libraryCard }) => libraryCard);
而不是 combineLatest()
(当任何可观察值发出值时发出),我会使用 zip()
,当所有可观察值发出它们的值时发出。
此外,我已经更新了您的一些代码,使内容更加丰富declarative/reactive。虽然我知道它们是 API 调用的占位符,但当您可以从 bookId
主题中 switchMap()
时,就不需要这些函数了。
const bookId = new Subject<number>();
const book$ = bookId.pipe(
switchMap((bookId) =>
timer(200).pipe(mapTo({ bookId, title: `Book${bookId}` }))
),
shareReplay(1)
);
const libraryCard$ = bookId.pipe(
switchMap((bookId) =>
timer(300).pipe(mapTo({ borrowerName: `Borrower of Book${bookId}` }))
),
shareReplay(1)
);
zip(book$, libraryCard$).subscribe(([book, libraryCard]) => {
console.log("book:", book.title, "| borrower:", libraryCard.borrowerName);
});
bookId.next(1);
setTimeout(() => bookId.next(2), 500);
setTimeout(() => bookId.next(2), 1000); // library card refresh
setTimeout(() => bookId$.next(3), 1500);
这是我对此的看法:
const bookId$ = new Subject<number>();
const libraryCardUpdater$ = new Subject<void>();
const result$ = bookId$.pipe(
switchMap(() => {
const book$ = concat(of(null), fetchBook(bookId));
const libraryCard$ = concat(
of(null),
concat(of(null), libraryCardUpdater$).pipe(
concatMap(() => fetchLibraryCard(bookId))
)
);
return combineLatest([book$, libraryCard$]);
})
);
这将保证以下内容:
- 每当
bookId$
发生变化时,我们首先按预期发射 一次 :[null, null]
- 然后,
fetchBook
和 fetchLibraryCard
调用之间存在竞争。所以我们可以以 [someBook, null]
或 [null, someLibraryCard]
结束。在某个时候,它们都应该在那里,并且会有 [someBook, someLibraryCard]
的另一个发射
- 在我们有
[someBook, null]
和 bookId
变化的情况下,我们将立即发出 [null, null]
。 bookId
更改 的所有情况都相同
- 至于
libraryCardUpdater$
,它只会触发 fetchLibraryCard
而不会触发其他
希望符合您的要求,如果我遗漏了任何内容,随时乐意编辑我的答案
在 https://thinkrx.io/rxjs/ 中测试您的代码可以得到
最后一行与您的console.logs相同。
更改为 withLatestFrom
而不是 combineLatest
会删除不同步的 book/card (#2 - 1/2/
& #5 - 2/3
)
这是代码,有改动
- 标签缩写
- 时间除以 10
- 将 id 添加到
cardUpdater$
并在开始时将 Subject()
与显式 .next()
一起使用(化妆品 - 仍然适用于原始 BehaviorSubject)。
const { rxObserver } = require('api/v0.3');
const rx = require('rxjs');
const { timer } = rx;
const { switchMap, map, mapTo, combineLatest, withLatestFrom, shareReplay }
= require('rxjs/operators');
// mimic http requests
const fetchBook = (bookId) => {
return timer(20).pipe(mapTo({ bookId, title: 'b' + bookId }));
}
const fetchLibraryCard = (bookId) => {
return timer(30).pipe(mapTo({ name: `c${bookId}` }));
}
const bookId$ = new rx.Subject();
const book$ = bookId$.pipe(
switchMap(bookId => fetchBook(bookId)),
shareReplay(1)
);
// e.g. 'Refresh library card' button
const cardUpdater$ = new rx.Subject();
const libraryCard$ = bookId$.combineLatest(cardUpdater$)
.pipe(
switchMap(([bookId, cardId]) => fetchLibraryCard(bookId)),
shareReplay(1)
)
const combined$ = libraryCard$.withLatestFrom(book$)
.pipe(
map(([card,book]) => `b${book.title[1]}|c${card.name[1]}`),
)
// Marbles
bookId$.subscribe(rxObserver('id'))
book$.map(book=>book.title).subscribe(rxObserver('book'))
cardUpdater$.subscribe(rxObserver('card update'))
libraryCard$.map(card=>card.name).subscribe(rxObserver('libraryCard$'))
combined$.subscribe(rxObserver('combined'))
// Events
bookId$.next(1);
cardUpdater$.next(1)
setTimeout(() => bookId$.next(2), 50);
setTimeout(() => cardUpdater$.next(2), 100);
setTimeout(() => bookId$.next(3), 150);
让我困惑的一件事是您要删除的发射。
book: Book2 | borrower: Borrower of Book2 <-- OK, but redundant
由cardUpdater$
事件触发,可以在combined$
中用distinctUntilChanged()
移除,但这样做会使卡片刷新变得多余。
感觉你想要一个cardId
卡刷新时改变,并在新卡上重新发行相同的书。
像这样的东西有更正交的感觉
const { rxObserver } = require('api/v0.3');
const rx = require('rxjs');
const { timer } = rx;
const { switchMap, map, mapTo, combineLatest, withLatestFrom, shareReplay }
= require('rxjs/operators');
const fetchBook = (bookId) => {
return timer(20).pipe(mapTo({ bookId, title: 'b' + bookId }));
}
const fetchLibraryCard = (cardId) => {
return timer(30).pipe(mapTo({ name: `c${cardId}` }));
}
const bookId$ = new rx.Subject();
const book$ = bookId$.pipe(
switchMap(bookId => fetchBook(bookId)),
shareReplay(1)
);
const cardUpdater$ = new rx.Subject();
const card$ = cardUpdater$.pipe(
switchMap(cardId => fetchLibraryCard(cardId)),
shareReplay(1)
);
const issue$ = book$.merge(card$).pipe(
switchMap(() => card$.withLatestFrom(book$)),
map(([card,book]) => `${book.title}|${card.name}`),
)
// Marbles
bookId$.subscribe(rxObserver('id'))
book$.map(book=>book.title).subscribe(rxObserver('book'))
cardUpdater$.subscribe(rxObserver('card update'))
card$.map(card=>card.name).subscribe(rxObserver('libraryCard$'))
issue$.subscribe(rxObserver('combined'))
// Events
bookId$.next(1);
cardUpdater$.next(1)
setTimeout(() => bookId$.next(2), 50);
setTimeout(() => cardUpdater$.next(2), 100);
setTimeout(() => bookId$.next(3), 150);
顺序(不是同时)获取使事情变得更容易:
const libraryCard$ = combineLatest([book$, libraryCardUpdater$]).pipe(
switchMap(([book]) => concat(of({ borrowerName: <string>undefined }), fetchLibraryCard(book.bookId))),
shareReplay(1)
);
我唯一需要做的就是添加零去抖动器:
combineLatest([book$, libraryCard$]).pipe(debounceTime(0)).subscribe(...
zip
的例子
// mimic http requests
const fetchBook = (bookId: number) =>
const title = 'Book' + bookId;
return timer(200).pipe(mapTo({ bookId, title: `Book${bookId}` }));
}
const fetchLibraryCard = (bookId: number) => {
const borrowerName = 'Borrower of Book' + bookId;
return timer(300).pipe(mapTo({ borrowerName }));
}
// Borrow a book
const bookId$ = new Subject<number>();
// Refresh library card
const libraryCardUpdater$ = new BehaviorSubject<void>();
// re-emit book2 on card update to allow zip to pair card and book correctly
const book$ = combineLatest([bookId$, libraryCardUpdater$])
.pipe(
switchMap(([bookId, _]) => fetchBook(bookId)),
shareReplay(1)
);
const libraryCard$ = combineLatest([bookId$, libraryCardUpdater$])
.pipe(
map(([bookId, _]) => fetchCard(bookId)),
shareReplay(1)
);
book$.zip(libraryCard$)
.pipe(
map(([book,libraryCard]) => `book: ${book.title}| borrower:${libraryCard.borrowerName}`),
)
.subscribe(console.log)
bookId$.next(1);
setTimeout(() => bookId$.next(2), 500);
setTimeout(() => libraryCardUpdater$.next(), 1000);
setTimeout(() => bookId$.next(3), 1500);
输出
book: Book1 | borrower: Borrower of Book1 <-- OK
book: Book2 | borrower: Borrower of Book2 <-- OK
book: Book2 | borrower: Borrower of Book2 <-- OK, but repeat due to libraryCardUpdater$
book: Book3 | borrower: Borrower of Book3 <-- OK
我有一个取书和与之关联的借书证的代码:
// mimic http requests
const fetchBook = (bookId: number) => {
const title = 'Book' + bookId;
return timer(200).pipe(mapTo({ bookId, title }));
}
const fetchLibraryCard = (bookId: number) => {
const borrowerName = 'Borrower of Book' + bookId;
return timer(300).pipe(mapTo({ borrowerName }));
}
const bookId$ = new Subject<number>();
const book$ = bookId$.pipe(
switchMap(bookId => fetchBook(bookId)),
shareReplay(1)
);
// e.g. 'Refresh library card' button
const libraryCardUpdater$ = new BehaviorSubject<void>(undefined);
const libraryCard$ = combineLatest([bookId$, libraryCardUpdater$]).pipe(
switchMap(([bookId]) => fetchLibraryCard(bookId)),
shareReplay(1)
);
combineLatest([book$, libraryCard$]).subscribe(([book, libraryCard]) => {
console.log('book:', book.title, '| borrower:', libraryCard.borrowerName)
})
bookId$.next(1);
setTimeout(() => bookId$.next(2), 500);
setTimeout(() => libraryCardUpdater$.next(), 1000);
setTimeout(() => bookId$.next(3), 1500);
订阅者状态不一致的问题:
book: Book1 | borrower: Borrower of Book1 <-- OK
book: Book2 | borrower: Borrower of Book1 <-- Not OK
book: Book2 | borrower: Borrower of Book2 <-- OK
book: Book2 | borrower: Borrower of Book2 <-- OK, but redundant
book: Book3 | borrower: Borrower of Book2 <-- Not OK
book: Book3 | borrower: Borrower of Book3 <-- OK
我想在 bookId$
改变的同时将 undefined
推到 libraryCard$
。
但是如何以被动的方式做到这一点呢?
更新:
借书证应始终与取回的图书一致(或在加载时 undefined
)。 bookId$
可以随时通过用户操作进行更改。借书证也可以由用户随时手动更新 (libraryCardUpdater$
)。 libraryCardUpdater$
发射应该重新取卡,但不应该重新取书
更新2: 我刚刚意识到借书证可以在书后顺序取回。这是可以接受的,虽然对于最终用户来说不是完美的解决方案。
你必须扭转局面。您的真实来源必须是 bookId$
,并且从构建的可观察对象中您可以获得这本书和 libraryCard
:
const bookId$ = new ReplaySubject<number>(1);
const libraryCardUpdater$ = new Subject<void>();
const libraryCardBook$ = combineLatest([
bookId$.pipe(
distinctUntilChanged(),
switchMap(bookId => fetchBook(bookId))
),
libraryCardUpdater$.pipe(
switchMap(() => this.bookId$),
switchMap((bookId) => fetchLibraryCard(bookId))
)
]).pipe(
map(([ book, libraryCard ]) => ({ book, libraryCard })),
startWith({ book: undefined, libraryCard: undefined }),
shareReplay(1)
);
const book$ = libraryCardBook$.pipe(map(({ book }) => book);
const libraryCard$ = libraryCardBook$.pipe(map(({ libraryCard }) => libraryCard);
而不是 combineLatest()
(当任何可观察值发出值时发出),我会使用 zip()
,当所有可观察值发出它们的值时发出。
此外,我已经更新了您的一些代码,使内容更加丰富declarative/reactive。虽然我知道它们是 API 调用的占位符,但当您可以从 bookId
主题中 switchMap()
时,就不需要这些函数了。
const bookId = new Subject<number>();
const book$ = bookId.pipe(
switchMap((bookId) =>
timer(200).pipe(mapTo({ bookId, title: `Book${bookId}` }))
),
shareReplay(1)
);
const libraryCard$ = bookId.pipe(
switchMap((bookId) =>
timer(300).pipe(mapTo({ borrowerName: `Borrower of Book${bookId}` }))
),
shareReplay(1)
);
zip(book$, libraryCard$).subscribe(([book, libraryCard]) => {
console.log("book:", book.title, "| borrower:", libraryCard.borrowerName);
});
bookId.next(1);
setTimeout(() => bookId.next(2), 500);
setTimeout(() => bookId.next(2), 1000); // library card refresh
setTimeout(() => bookId$.next(3), 1500);
这是我对此的看法:
const bookId$ = new Subject<number>();
const libraryCardUpdater$ = new Subject<void>();
const result$ = bookId$.pipe(
switchMap(() => {
const book$ = concat(of(null), fetchBook(bookId));
const libraryCard$ = concat(
of(null),
concat(of(null), libraryCardUpdater$).pipe(
concatMap(() => fetchLibraryCard(bookId))
)
);
return combineLatest([book$, libraryCard$]);
})
);
这将保证以下内容:
- 每当
bookId$
发生变化时,我们首先按预期发射 一次 :[null, null]
- 然后,
fetchBook
和fetchLibraryCard
调用之间存在竞争。所以我们可以以[someBook, null]
或[null, someLibraryCard]
结束。在某个时候,它们都应该在那里,并且会有[someBook, someLibraryCard]
的另一个发射
- 在我们有
[someBook, null]
和bookId
变化的情况下,我们将立即发出[null, null]
。bookId
更改 的所有情况都相同
- 至于
libraryCardUpdater$
,它只会触发fetchLibraryCard
而不会触发其他
希望符合您的要求,如果我遗漏了任何内容,随时乐意编辑我的答案
在 https://thinkrx.io/rxjs/ 中测试您的代码可以得到
最后一行与您的console.logs相同。
更改为 withLatestFrom
而不是 combineLatest
会删除不同步的 book/card (#2 - 1/2/
& #5 - 2/3
)
这是代码,有改动
- 标签缩写
- 时间除以 10
- 将 id 添加到
cardUpdater$
并在开始时将Subject()
与显式.next()
一起使用(化妆品 - 仍然适用于原始 BehaviorSubject)。
const { rxObserver } = require('api/v0.3');
const rx = require('rxjs');
const { timer } = rx;
const { switchMap, map, mapTo, combineLatest, withLatestFrom, shareReplay }
= require('rxjs/operators');
// mimic http requests
const fetchBook = (bookId) => {
return timer(20).pipe(mapTo({ bookId, title: 'b' + bookId }));
}
const fetchLibraryCard = (bookId) => {
return timer(30).pipe(mapTo({ name: `c${bookId}` }));
}
const bookId$ = new rx.Subject();
const book$ = bookId$.pipe(
switchMap(bookId => fetchBook(bookId)),
shareReplay(1)
);
// e.g. 'Refresh library card' button
const cardUpdater$ = new rx.Subject();
const libraryCard$ = bookId$.combineLatest(cardUpdater$)
.pipe(
switchMap(([bookId, cardId]) => fetchLibraryCard(bookId)),
shareReplay(1)
)
const combined$ = libraryCard$.withLatestFrom(book$)
.pipe(
map(([card,book]) => `b${book.title[1]}|c${card.name[1]}`),
)
// Marbles
bookId$.subscribe(rxObserver('id'))
book$.map(book=>book.title).subscribe(rxObserver('book'))
cardUpdater$.subscribe(rxObserver('card update'))
libraryCard$.map(card=>card.name).subscribe(rxObserver('libraryCard$'))
combined$.subscribe(rxObserver('combined'))
// Events
bookId$.next(1);
cardUpdater$.next(1)
setTimeout(() => bookId$.next(2), 50);
setTimeout(() => cardUpdater$.next(2), 100);
setTimeout(() => bookId$.next(3), 150);
让我困惑的一件事是您要删除的发射。
book: Book2 | borrower: Borrower of Book2 <-- OK, but redundant
由cardUpdater$
事件触发,可以在combined$
中用distinctUntilChanged()
移除,但这样做会使卡片刷新变得多余。
感觉你想要一个cardId
卡刷新时改变,并在新卡上重新发行相同的书。
像这样的东西有更正交的感觉
const { rxObserver } = require('api/v0.3');
const rx = require('rxjs');
const { timer } = rx;
const { switchMap, map, mapTo, combineLatest, withLatestFrom, shareReplay }
= require('rxjs/operators');
const fetchBook = (bookId) => {
return timer(20).pipe(mapTo({ bookId, title: 'b' + bookId }));
}
const fetchLibraryCard = (cardId) => {
return timer(30).pipe(mapTo({ name: `c${cardId}` }));
}
const bookId$ = new rx.Subject();
const book$ = bookId$.pipe(
switchMap(bookId => fetchBook(bookId)),
shareReplay(1)
);
const cardUpdater$ = new rx.Subject();
const card$ = cardUpdater$.pipe(
switchMap(cardId => fetchLibraryCard(cardId)),
shareReplay(1)
);
const issue$ = book$.merge(card$).pipe(
switchMap(() => card$.withLatestFrom(book$)),
map(([card,book]) => `${book.title}|${card.name}`),
)
// Marbles
bookId$.subscribe(rxObserver('id'))
book$.map(book=>book.title).subscribe(rxObserver('book'))
cardUpdater$.subscribe(rxObserver('card update'))
card$.map(card=>card.name).subscribe(rxObserver('libraryCard$'))
issue$.subscribe(rxObserver('combined'))
// Events
bookId$.next(1);
cardUpdater$.next(1)
setTimeout(() => bookId$.next(2), 50);
setTimeout(() => cardUpdater$.next(2), 100);
setTimeout(() => bookId$.next(3), 150);
顺序(不是同时)获取使事情变得更容易:
const libraryCard$ = combineLatest([book$, libraryCardUpdater$]).pipe(
switchMap(([book]) => concat(of({ borrowerName: <string>undefined }), fetchLibraryCard(book.bookId))),
shareReplay(1)
);
我唯一需要做的就是添加零去抖动器:
combineLatest([book$, libraryCard$]).pipe(debounceTime(0)).subscribe(...
zip
// mimic http requests
const fetchBook = (bookId: number) =>
const title = 'Book' + bookId;
return timer(200).pipe(mapTo({ bookId, title: `Book${bookId}` }));
}
const fetchLibraryCard = (bookId: number) => {
const borrowerName = 'Borrower of Book' + bookId;
return timer(300).pipe(mapTo({ borrowerName }));
}
// Borrow a book
const bookId$ = new Subject<number>();
// Refresh library card
const libraryCardUpdater$ = new BehaviorSubject<void>();
// re-emit book2 on card update to allow zip to pair card and book correctly
const book$ = combineLatest([bookId$, libraryCardUpdater$])
.pipe(
switchMap(([bookId, _]) => fetchBook(bookId)),
shareReplay(1)
);
const libraryCard$ = combineLatest([bookId$, libraryCardUpdater$])
.pipe(
map(([bookId, _]) => fetchCard(bookId)),
shareReplay(1)
);
book$.zip(libraryCard$)
.pipe(
map(([book,libraryCard]) => `book: ${book.title}| borrower:${libraryCard.borrowerName}`),
)
.subscribe(console.log)
bookId$.next(1);
setTimeout(() => bookId$.next(2), 500);
setTimeout(() => libraryCardUpdater$.next(), 1000);
setTimeout(() => bookId$.next(3), 1500);
输出
book: Book1 | borrower: Borrower of Book1 <-- OK
book: Book2 | borrower: Borrower of Book2 <-- OK
book: Book2 | borrower: Borrower of Book2 <-- OK, but repeat due to libraryCardUpdater$
book: Book3 | borrower: Borrower of Book3 <-- OK