如何使用rx和kotlin对远程服务一一进行请求调用?

How to make request calls to remote service one by one usnig rx and kotlin?

我有应用程序必须向其发送数据的远程服务: retrofit2 中的定义:

interface FooRemoteService {
    @POST("/foos")
    fun postFoos(@Body foos: List<FooPojo>): Observable<Response<List<String>>
 }

但是一次调用的限制不超过 X Foos。 每次调用都可以 returns 206 代码 "partially successful" 以及上传失败的 foos 列表。还有 413"Request Entity Too Large"。当然还有 400 和 500。

并且该应用需要发送未知数量的 foo 项目(由用户在 运行 时间内定义)。

为了避免服务应用程序的 DDoS,需要将此调用一一发送。

所以我在我的 FooRepositoryImpl 中做了这样的实现:

这是一个想法。我对下面的解决方案不满意,我相信它可以做得更好,但我 运行 没有想法。那么有什么建议吗?

override fun postFoos(foos: List<Foo>) Completable {
  val fooChunks = divideListInToChuncksUnderRequestLimit(foos)

  val unuploadedFoos = mutableListOf<UnuploadedFoo>()
  fooChunks.fold(unuploadedFoos) 
  { accu: MutableList<UnuploadedFoo>, chunk ->
     fooRemoteService
        .postFoos(chunk)
        .subscribeOn(Schedulers.io())
        .flatMapCompletable {
             if (it.isSuccessful) {
                         Completable.complete()
                    } else {
                        Timber.e("$it")
                       accu.add(it.body())

                    }
                }.blockingAwait()
        responses

  }
return Completable.complete()
}

最后,应用程序应显示所有不成功的 foos 的列表,或者如果有的话。所以我需要从未上传的 Foos 的功能列表中传递。

如果您可以稍微修改 postFoos 的 return 类型,那么这样的事情可能会起作用:

override fun postFoos(foos: List<Foo>): Observable<List<UnuploadedFoo>> {
    val chunks = foos.chunked(CHUNK_SIZE)
    val posters = chunks.map { chunk ->
        fooRemoteService.postFoos(chunk)
                .map { response ->
                    response.unUploaded.takeIf { !response.isSuccessful } ?: emptyList()
                }
                .filter { it.isNotEmpty() }
                .toObservable()
    }

    return Observable.concatDelayError(posters)
}

我想您的服务应该是这样的:

data class Response(val isSuccessful: Boolean, val unUploaded: List<UnoploadedFoo>)

fun postFoos(foos: List<Foo>): Single<Response>

这里的技巧是 Concat:

(...) waits to subscribe to each additional Observable that you pass to it until the previous Observable completes.