将流<Stream<T>> 转换为流<T>
Convert Stream<Stream<T>> to Stream<T>
我正在使用 rxdart 包来处理 dart 中的流。我被困在处理一个特殊的问题中。
请看一下这个伪代码:
final userId = BehaviorSubject<String>();
Stream<T> getStream(String uid) {
// a sample code that returns a stream
return BehaviorSubject<T>().stream;
}
final Observable<Stream<T>> oops = userId.map((uid) => getStream(uid));
现在我想将 oops
变量转换为仅获取 Observable<T>
。
我很难解释清楚。但让我试试。我有一个流 A。我将流 A 的每个输出映射到另一个流 B。现在我有 Stream<Stream<B>>
- 一种 recurrent 流。我只想听听这种模式产生的最新价值。我怎样才能做到这一点?
如果你想要一个流>到return一个流,你基本上需要展平流。
flatmap 函数是你在这里使用的
public static void main(String args[]) {
StreamTest st = new StreamTest<String>();
List<String> l = Arrays.asList("a","b", "c");
Stream s = l.stream().map(i -> st.getStream(i)).flatMap(i->i);
}
Stream<T> static getStream(T uid) {
// a sample code that returns a stream
return Stream.of(uid);
}
如果需要第一个对象,那么使用findFirst()方法
public static void main(String args[]) {
StreamTest st = new StreamTest<String>();
List<String> l = Arrays.asList("a","b", "c");
String str = l.stream().map(i -> st.getStream(i)).flatMap(i->i).findFirst().get();
}
Stream<Stream<Something>>
很少见,因此没有太多明确的支持。
一个原因是有几种(至少两种)方法可以将事物流组合成事物流。
要么依次监听每个流,等待它完成再开始下一个流,然后按顺序发出事件。
或者您在每个新流可用时监听它,然后尽快从任何流发出事件。
前者使用async
/await
很容易写成:
Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
await for (var stream in source) yield* stream;
}
后者更复杂,因为它需要同时监听多个流,并组合它们的事件。 (如果一次只 StreamController.addStream
允许多个流,那么会容易得多)。为此,您可以使用 package:async
中的 StreamGroup
class:
import "package:async/async" show StreamGroup;
Stream<T> mergeStreams<T>(Stream<Stream<T>> source) {
var sg = StreamGroup<T>();
source.forEach(sg.add).whenComplete(sg.close);
// This doesn't handle errors in [source].
// Maybe insert
// .catchError((e, s) {
// sg.add(Future<T>.error(e, s).asStream())
// before `.whenComplete` if you worry about errors in [source].
return sg.stream;
}
我将列出几种将 Stream<Stream<T>>
扁平化为单个 Stream<T>
的方法。
1。使用纯飞镖
正如@ 的回答,这是一个纯粹的飞镖解决方案:
Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
await for (var stream in source) yield* stream;
}
Stream<int> getStream(String v) {
return Stream.fromIterable([1, 2, 3, 4]);
}
void main() {
List<String> list = ["a", "b", "c"];
Stream<int> s = flattenStreams(Stream.fromIterable(list).map(getStream));
s.listen(print);
}
输出:1 2 3 4 1 2 3 4 1 2 3 4
2。使用 Observable.flatMap
Observable 有一个方法 flatMap 可以将输出流扁平化并将其附加到正在进行的流中:
import 'package:rxdart/rxdart.dart';
Stream<int> getStream(String v) {
return Stream.fromIterable([1, 2, 3, 4]);
}
void main() {
List<String> list = ["a", "b", "c"];
Observable<int> s = Observable.fromIterable(list).flatMap(getStream);
s.listen(print);
}
输出:1 2 3 4 1 2 3 4 1 2 3 4
3。使用 Observable.switchLatest
Convert a Stream that emits Streams (aka a "Higher Order Stream") into a single Observable that emits the items emitted by the most-recently-emitted of those Streams.
这就是我一直在寻找的解决方案!我只需要内部流发出的最新输出。
import 'package:rxdart/rxdart.dart';
Stream<int> getStream(String v) {
return Stream.fromIterable([1, 2, 3, 4]);
}
void main() {
List<String> list = ["a", "b", "c"];
Observable<int> s = Observable.switchLatest(
Observable.fromIterable(list).map(getStream));
s.listen(print);
}
输出:1 1 1 2 3 4
我正在使用 rxdart 包来处理 dart 中的流。我被困在处理一个特殊的问题中。
请看一下这个伪代码:
final userId = BehaviorSubject<String>();
Stream<T> getStream(String uid) {
// a sample code that returns a stream
return BehaviorSubject<T>().stream;
}
final Observable<Stream<T>> oops = userId.map((uid) => getStream(uid));
现在我想将 oops
变量转换为仅获取 Observable<T>
。
我很难解释清楚。但让我试试。我有一个流 A。我将流 A 的每个输出映射到另一个流 B。现在我有 Stream<Stream<B>>
- 一种 recurrent 流。我只想听听这种模式产生的最新价值。我怎样才能做到这一点?
如果你想要一个流>到return一个流,你基本上需要展平流。 flatmap 函数是你在这里使用的
public static void main(String args[]) {
StreamTest st = new StreamTest<String>();
List<String> l = Arrays.asList("a","b", "c");
Stream s = l.stream().map(i -> st.getStream(i)).flatMap(i->i);
}
Stream<T> static getStream(T uid) {
// a sample code that returns a stream
return Stream.of(uid);
}
如果需要第一个对象,那么使用findFirst()方法
public static void main(String args[]) {
StreamTest st = new StreamTest<String>();
List<String> l = Arrays.asList("a","b", "c");
String str = l.stream().map(i -> st.getStream(i)).flatMap(i->i).findFirst().get();
}
Stream<Stream<Something>>
很少见,因此没有太多明确的支持。
一个原因是有几种(至少两种)方法可以将事物流组合成事物流。
要么依次监听每个流,等待它完成再开始下一个流,然后按顺序发出事件。
或者您在每个新流可用时监听它,然后尽快从任何流发出事件。
前者使用async
/await
很容易写成:
Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
await for (var stream in source) yield* stream;
}
后者更复杂,因为它需要同时监听多个流,并组合它们的事件。 (如果一次只 StreamController.addStream
允许多个流,那么会容易得多)。为此,您可以使用 package:async
中的 StreamGroup
class:
import "package:async/async" show StreamGroup;
Stream<T> mergeStreams<T>(Stream<Stream<T>> source) {
var sg = StreamGroup<T>();
source.forEach(sg.add).whenComplete(sg.close);
// This doesn't handle errors in [source].
// Maybe insert
// .catchError((e, s) {
// sg.add(Future<T>.error(e, s).asStream())
// before `.whenComplete` if you worry about errors in [source].
return sg.stream;
}
我将列出几种将 Stream<Stream<T>>
扁平化为单个 Stream<T>
的方法。
1。使用纯飞镖
正如@
Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
await for (var stream in source) yield* stream;
}
Stream<int> getStream(String v) {
return Stream.fromIterable([1, 2, 3, 4]);
}
void main() {
List<String> list = ["a", "b", "c"];
Stream<int> s = flattenStreams(Stream.fromIterable(list).map(getStream));
s.listen(print);
}
输出:1 2 3 4 1 2 3 4 1 2 3 4
2。使用 Observable.flatMap
Observable 有一个方法 flatMap 可以将输出流扁平化并将其附加到正在进行的流中:
import 'package:rxdart/rxdart.dart';
Stream<int> getStream(String v) {
return Stream.fromIterable([1, 2, 3, 4]);
}
void main() {
List<String> list = ["a", "b", "c"];
Observable<int> s = Observable.fromIterable(list).flatMap(getStream);
s.listen(print);
}
输出:1 2 3 4 1 2 3 4 1 2 3 4
3。使用 Observable.switchLatest
Convert a Stream that emits Streams (aka a "Higher Order Stream") into a single Observable that emits the items emitted by the most-recently-emitted of those Streams.
这就是我一直在寻找的解决方案!我只需要内部流发出的最新输出。
import 'package:rxdart/rxdart.dart';
Stream<int> getStream(String v) {
return Stream.fromIterable([1, 2, 3, 4]);
}
void main() {
List<String> list = ["a", "b", "c"];
Observable<int> s = Observable.switchLatest(
Observable.fromIterable(list).map(getStream));
s.listen(print);
}
输出:1 1 1 2 3 4