在带有平面图的 Scala 中使用 RxJava
Using RxJava in Scala with flatmap
我正在尝试在 Scala 应用程序中使用 RX Java 来并行执行多个请求
java中运行成功的代码是
List<rx.Observable<Response<MyObject>>> observables = requests.stream().map(
request -> client.getResponse(request).collect(Collectors.toList());
Observable<MyObject> mergedObservable = Observable.merge(observables)
.flatMap(page -> {
List<MyObject> objects = page.getResults();
Observable<MyObject> objectsObservable = Observable.from(objects);
return objectsObservable;
}).toBlocking().getIterator()
我想在 Scala 中编写与此代码段等效的代码,而我所拥有的是
val observables: Array[Observable[Response[MyObject]]] = requests.map(request => client.getResponse(request));
Observable.merge(observables.toList)
.flatMap[Observable[MyObject]]((page: Response[MyObject]) => {
val resutls: java.util.List[MyObject] = page.getResults
val resultObservable: Observable[MyObject] = Observable.from(resutls)
resultObservable
}).toBlocking
.getIterator
代码失败并出现异常
overloaded method value flatMap with alternatives:
(x: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x: rx.functions.Func1[_ >: Throwable, _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x: rx.functions.Func0[_ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x: Int)rx.Observable[rx.Observable[com.mypackage.MyObject]] <and>
(x: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x: rx.functions.Func1[_ >: Throwable, _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x: rx.functions.Func0[_ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]])rx.Observable[rx.Observable[com.mypackage.MyObject]] <and>
(x: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x: Int)rx.Observable[rx.Observable[com.mypackage.MyObject]] <and>
(x: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]])rx.Observable[rx.Observable[com.mypackage.MyObject]]
cannot be applied to (com.mypackage.Response[com.mypackage.MyObject] => rx.Observable[com.mypackage.MyObject])
有没有想过如何更改 Scala 代码来避免这种情况?
我想我能帮上忙,但也许吧。有一次我有类似的问题,原因是 scala 首先使用 scala.function 而不是 java.fucntion 所以 lambda 不兼容。我不保证这会起作用,但使用内部 类 代替 lambda。
如果您仔细阅读,错误会说它希望看到类似 Func1[Response[MyObject], Observable[Observable[MyObject]]]
的内容,因为 flatMap
上的类型参数。删除它或更改为正确的 flatMap[MyObject]
可能足以解决问题(假设您使用的是 Scala 2.12 或带有 -Xexperimental
的 Scala 2.11;否则您不能直接将 lambda 用于 Func
并且需要辅助函数)。
如果不是,我将提取 lambda 并为其指定所需的类型(仍然假设 Scala 2.12):
val f: Func1[Response[MyObject], Observable[MyObject]] =
page => Observable.from(page.getResults)
Observable.merge(observables.toList).flatMap(f).toBlocking.getIterator
您也可以在线执行此操作:
.flatMap({ page =>
val resutls: java.util.List[MyObject] = page.getResults
val resultObservable: Observable[MyObject] = Observable.from(resutls)
resultObservable
}: Func1[Response[MyObject], Observable[MyObject]])...
我正在尝试在 Scala 应用程序中使用 RX Java 来并行执行多个请求
java中运行成功的代码是
List<rx.Observable<Response<MyObject>>> observables = requests.stream().map(
request -> client.getResponse(request).collect(Collectors.toList());
Observable<MyObject> mergedObservable = Observable.merge(observables)
.flatMap(page -> {
List<MyObject> objects = page.getResults();
Observable<MyObject> objectsObservable = Observable.from(objects);
return objectsObservable;
}).toBlocking().getIterator()
我想在 Scala 中编写与此代码段等效的代码,而我所拥有的是
val observables: Array[Observable[Response[MyObject]]] = requests.map(request => client.getResponse(request));
Observable.merge(observables.toList)
.flatMap[Observable[MyObject]]((page: Response[MyObject]) => {
val resutls: java.util.List[MyObject] = page.getResults
val resultObservable: Observable[MyObject] = Observable.from(resutls)
resultObservable
}).toBlocking
.getIterator
代码失败并出现异常
overloaded method value flatMap with alternatives:
(x: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x: rx.functions.Func1[_ >: Throwable, _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x: rx.functions.Func0[_ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x: Int)rx.Observable[rx.Observable[com.mypackage.MyObject]] <and>
(x: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x: rx.functions.Func1[_ >: Throwable, _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x: rx.functions.Func0[_ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]])rx.Observable[rx.Observable[com.mypackage.MyObject]] <and>
(x: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x: Int)rx.Observable[rx.Observable[com.mypackage.MyObject]] <and>
(x: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]])rx.Observable[rx.Observable[com.mypackage.MyObject]]
cannot be applied to (com.mypackage.Response[com.mypackage.MyObject] => rx.Observable[com.mypackage.MyObject])
有没有想过如何更改 Scala 代码来避免这种情况?
我想我能帮上忙,但也许吧。有一次我有类似的问题,原因是 scala 首先使用 scala.function 而不是 java.fucntion 所以 lambda 不兼容。我不保证这会起作用,但使用内部 类 代替 lambda。
如果您仔细阅读,错误会说它希望看到类似 Func1[Response[MyObject], Observable[Observable[MyObject]]]
的内容,因为 flatMap
上的类型参数。删除它或更改为正确的 flatMap[MyObject]
可能足以解决问题(假设您使用的是 Scala 2.12 或带有 -Xexperimental
的 Scala 2.11;否则您不能直接将 lambda 用于 Func
并且需要辅助函数)。
如果不是,我将提取 lambda 并为其指定所需的类型(仍然假设 Scala 2.12):
val f: Func1[Response[MyObject], Observable[MyObject]] =
page => Observable.from(page.getResults)
Observable.merge(observables.toList).flatMap(f).toBlocking.getIterator
您也可以在线执行此操作:
.flatMap({ page =>
val resutls: java.util.List[MyObject] = page.getResults
val resultObservable: Observable[MyObject] = Observable.from(resutls)
resultObservable
}: Func1[Response[MyObject], Observable[MyObject]])...