如何使用rx和kotlin对远程服务一一进行请求调用?
How to make request calls to remote service one by one usnig rx and kotlin?
我有应用程序必须向其发送数据的远程服务:
retrofit2 中的定义:
interface FooRemoteService {
@POST("/foos")
fun postFoos(@Body foos: List<FooPojo>): Observable<Response<List<String>>
}
但是一次调用的限制不超过 X Foos。
每次调用都可以 returns 206 代码 "partially successful" 以及上传失败的 foos 列表。还有 413"Request Entity Too Large"。当然还有 400 和 500。
并且该应用需要发送未知数量的 foo 项目(由用户在 运行 时间内定义)。
为了避免服务应用程序的 DDoS,需要将此调用一一发送。
所以我在我的 FooRepositoryImpl 中做了这样的实现:
这是一个想法。我对下面的解决方案不满意,我相信它可以做得更好,但我 运行 没有想法。那么有什么建议吗?
override fun postFoos(foos: List<Foo>) Completable {
val fooChunks = divideListInToChuncksUnderRequestLimit(foos)
val unuploadedFoos = mutableListOf<UnuploadedFoo>()
fooChunks.fold(unuploadedFoos)
{ accu: MutableList<UnuploadedFoo>, chunk ->
fooRemoteService
.postFoos(chunk)
.subscribeOn(Schedulers.io())
.flatMapCompletable {
if (it.isSuccessful) {
Completable.complete()
} else {
Timber.e("$it")
accu.add(it.body())
}
}.blockingAwait()
responses
}
return Completable.complete()
}
最后,应用程序应显示所有不成功的 foos 的列表,或者如果有的话。所以我需要从未上传的 Foos 的功能列表中传递。
如果您可以稍微修改 postFoos
的 return 类型,那么这样的事情可能会起作用:
override fun postFoos(foos: List<Foo>): Observable<List<UnuploadedFoo>> {
val chunks = foos.chunked(CHUNK_SIZE)
val posters = chunks.map { chunk ->
fooRemoteService.postFoos(chunk)
.map { response ->
response.unUploaded.takeIf { !response.isSuccessful } ?: emptyList()
}
.filter { it.isNotEmpty() }
.toObservable()
}
return Observable.concatDelayError(posters)
}
我想您的服务应该是这样的:
data class Response(val isSuccessful: Boolean, val unUploaded: List<UnoploadedFoo>)
fun postFoos(foos: List<Foo>): Single<Response>
这里的技巧是 Concat
:
(...) waits to subscribe to each additional Observable that you pass to it until the previous Observable completes.
我有应用程序必须向其发送数据的远程服务: retrofit2 中的定义:
interface FooRemoteService {
@POST("/foos")
fun postFoos(@Body foos: List<FooPojo>): Observable<Response<List<String>>
}
但是一次调用的限制不超过 X Foos。 每次调用都可以 returns 206 代码 "partially successful" 以及上传失败的 foos 列表。还有 413"Request Entity Too Large"。当然还有 400 和 500。
并且该应用需要发送未知数量的 foo 项目(由用户在 运行 时间内定义)。
为了避免服务应用程序的 DDoS,需要将此调用一一发送。
所以我在我的 FooRepositoryImpl 中做了这样的实现:
这是一个想法。我对下面的解决方案不满意,我相信它可以做得更好,但我 运行 没有想法。那么有什么建议吗?
override fun postFoos(foos: List<Foo>) Completable {
val fooChunks = divideListInToChuncksUnderRequestLimit(foos)
val unuploadedFoos = mutableListOf<UnuploadedFoo>()
fooChunks.fold(unuploadedFoos)
{ accu: MutableList<UnuploadedFoo>, chunk ->
fooRemoteService
.postFoos(chunk)
.subscribeOn(Schedulers.io())
.flatMapCompletable {
if (it.isSuccessful) {
Completable.complete()
} else {
Timber.e("$it")
accu.add(it.body())
}
}.blockingAwait()
responses
}
return Completable.complete()
}
最后,应用程序应显示所有不成功的 foos 的列表,或者如果有的话。所以我需要从未上传的 Foos 的功能列表中传递。
如果您可以稍微修改 postFoos
的 return 类型,那么这样的事情可能会起作用:
override fun postFoos(foos: List<Foo>): Observable<List<UnuploadedFoo>> {
val chunks = foos.chunked(CHUNK_SIZE)
val posters = chunks.map { chunk ->
fooRemoteService.postFoos(chunk)
.map { response ->
response.unUploaded.takeIf { !response.isSuccessful } ?: emptyList()
}
.filter { it.isNotEmpty() }
.toObservable()
}
return Observable.concatDelayError(posters)
}
我想您的服务应该是这样的:
data class Response(val isSuccessful: Boolean, val unUploaded: List<UnoploadedFoo>)
fun postFoos(foos: List<Foo>): Single<Response>
这里的技巧是 Concat
:
(...) waits to subscribe to each additional Observable that you pass to it until the previous Observable completes.