RxJava Return 函数中的订阅值
RxJava Return Subscribed value in a function
出于某种原因,我有时想使用 RxOperators 而不是正常的 java 转换数据结构的方式,因为它更干净。例如:
Observable.from(listOfStrings)
.filter(string -> string.getNumber() >5)
.toList()
有什么方法可以在函数中等待 observable 和 return 的结果:这可以(但不起作用):
private String getFilteredString(String string){
Observable.from(listOfStrings)
.filter(string -> string.getNumber() >5)
.toList()
.subscribe(finalStirng -> {
return finalString;
})
}
您可以使用 .toBlocking()
:
将任何 Observable
转换为同步的
private List<String> getFilteredString(List<String> strings) {
return Observable.from(strings)
.filter(string -> string.length() > 5)
.toList()
.toBlocking()
.single();
}
更新:
尽管上面的代码完全有效并且会按预期工作,但这不是应该使用 RxJava
的方式。如果您的唯一目标是转换一个集合,那么有更好的方法可以做到:
- Java 8个有一个
Stream
API
- Guava has its own collections API
- Apache 有一个
commons-collections
库
rx.Observable
上有一个名为 x()
(link) 的实例方法,可用于将一个 observable 直接流畅地转换为另一个值。这个运算符的目的是建立一个转换函数的存储库,可以将 Observable 转换为其他类型的 concurrent/asynchronous 数据结构(例如其他类型的 observables),或者只是简单地将 observable 的内容解包到包含的值。一次编写转换函数并将其存储起来以供重复使用。
方法签名:
public <R> R x(Func1<? super OnSubscribe<T>, ? extends R> conversion);
将潜在异步可观察对象转换为并发列表的用法:
List<Integer> list = Observable.range(1, 10).x(new Func1<OnSubscribe<Integer>, List<Integer>>() {
@Override
public List<Integer> call(OnSubscribe<Integer> onSubscribe) {
List<Integer> allElements = new ArrayList<Integer>();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Throwable> failure = new AtomicReference<Throwable>();
onSubscribe.call(new Subscriber<Integer>(){
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
failure.set(e);
}
@Override
public void onNext(Integer t) {
allElements.add(t);
}});
while (true) {
try {
latch.await();
break;
} catch (InterruptedException e1) {
// continue waiting
}
}
Throwable e = failure.get();
if (e != null)
throw new RuntimeException(e);
return allElements;
}});
System.out.println(list);
输出
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
请注意,此运算符当前为 @Experimental
,并且很可能会重命名(至 "extend"),但是 RxJava 中的实验性功能可以是 changed in any release。
出于某种原因,我有时想使用 RxOperators 而不是正常的 java 转换数据结构的方式,因为它更干净。例如:
Observable.from(listOfStrings)
.filter(string -> string.getNumber() >5)
.toList()
有什么方法可以在函数中等待 observable 和 return 的结果:这可以(但不起作用):
private String getFilteredString(String string){
Observable.from(listOfStrings)
.filter(string -> string.getNumber() >5)
.toList()
.subscribe(finalStirng -> {
return finalString;
})
}
您可以使用 .toBlocking()
:
Observable
转换为同步的
private List<String> getFilteredString(List<String> strings) {
return Observable.from(strings)
.filter(string -> string.length() > 5)
.toList()
.toBlocking()
.single();
}
更新:
尽管上面的代码完全有效并且会按预期工作,但这不是应该使用 RxJava
的方式。如果您的唯一目标是转换一个集合,那么有更好的方法可以做到:
- Java 8个有一个
Stream
API - Guava has its own collections API
- Apache 有一个
commons-collections
库
rx.Observable
上有一个名为 x()
(link) 的实例方法,可用于将一个 observable 直接流畅地转换为另一个值。这个运算符的目的是建立一个转换函数的存储库,可以将 Observable 转换为其他类型的 concurrent/asynchronous 数据结构(例如其他类型的 observables),或者只是简单地将 observable 的内容解包到包含的值。一次编写转换函数并将其存储起来以供重复使用。
方法签名:
public <R> R x(Func1<? super OnSubscribe<T>, ? extends R> conversion);
将潜在异步可观察对象转换为并发列表的用法:
List<Integer> list = Observable.range(1, 10).x(new Func1<OnSubscribe<Integer>, List<Integer>>() {
@Override
public List<Integer> call(OnSubscribe<Integer> onSubscribe) {
List<Integer> allElements = new ArrayList<Integer>();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Throwable> failure = new AtomicReference<Throwable>();
onSubscribe.call(new Subscriber<Integer>(){
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
failure.set(e);
}
@Override
public void onNext(Integer t) {
allElements.add(t);
}});
while (true) {
try {
latch.await();
break;
} catch (InterruptedException e1) {
// continue waiting
}
}
Throwable e = failure.get();
if (e != null)
throw new RuntimeException(e);
return allElements;
}});
System.out.println(list);
输出
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
请注意,此运算符当前为 @Experimental
,并且很可能会重命名(至 "extend"),但是 RxJava 中的实验性功能可以是 changed in any release。