如何在 Android 中将许多 AsyncTasks 转换为 Rx Observables?

How convert many AsyncTasks to Rx Observables in Android?

我正在使用 Facebook Graph API 查找喜欢我的 post 的用户。所有逻辑只需几步即可实现:

  1. 通过在 AsyncTask
  2. 中向 API 发出请求来查找所有 post
  3. 将此 AT 转换为 Rx.Observable
  4. 将 Observables GraphResponse 映射到列表(Post 是 POJO)
  5. 调用 FlatMap 运算符并在其中主体调用方法迭代每个 post 并创建 asyncTask,然后将其转换为 Observable 并放入 Array.
  6. 将一组 Observable 合并到一个 Observable 中。
  7. 将其 GraphResponse 映射到喜欢的个人资料
  8. 订阅并呈现喜欢者列表
  9. 盈利!

我在第 4-5 步遇到了一些小问题。请查看存储库中的方法 'likes'。在评论中我写了问题

提示:我正在使用 MVP + Clean Architecture with Repository(数据层)和 Interactor(业务层)

class FacebookRepository {

    private val facebook = Facebook.instance()
    private val gson = GsonBuilder().create()

    fun posts(): Observable<GraphResponse>? {

        return RxDecorator<GraphResponse>().decorate(Observable.defer({
            val request = GraphRequest(
                    facebook.token,
                    "/me/posts",
                    null,
                    HttpMethod.GET,
                    GraphRequest.Callback { /* handle the result */ }
            )

            Observable.just(request.executeAndWait())
        }))

    }

    fun setFaceBookAccessToken(currentAccessToken: AccessToken?) {
        facebook.token = currentAccessToken
    }

    fun logout() {
        facebook.logout()
    }

    fun token(): String? {
        return facebook.token?.token
    }

    fun likes(posts: List<Post>?): Observable<List<Profile>> {

        Log.d("observables:posts", posts.toString())

        val p = iterateObservables(posts) // STOP HERE and WAIT to complete this method.

        // Then p is composite - merge and return
        return Observable.merge(p).map {
            Log.d("merge:posts", it.toString())
            val profiles = gson.fromJson<List<Profile>>(
                    it.jsonObject["data"].toString(),
                    object : TypeToken<List<Profile>>() {}.type
            )

            return@map profiles
        }
    }

    private fun iterateObservables(posts: List<Post>?): MutableList<Observable<GraphResponse>>? {
        val observables: MutableList<Observable<GraphResponse>>? = null

        Log.d("iterateObs:posts", posts.toString())
        Log.d("posts_not_null", (posts != null).toString())
        Log.d("posts.size", posts?.size.toString())

        if (posts != null) {
            for (post in posts) {

                Log.d("iterateObs:post", post.toString())

                val request = GraphRequest(
                            AccessToken.getCurrentAccessToken(),
                            "/${post.id}/likes",
                            null,
                            HttpMethod.GET,
                            GraphRequest.Callback { Log.d("fb:post:id", "${post.id}: ${it.jsonObject}")
                        }).executeAsync()

                Log.d("obs:after:post", observables.toString())
            }
        }

        return observables
    }
}

这是交互器

class FacebookInteractor {

    private val callbackManager = com.facebook.CallbackManager.Factory.create()
    private val repository = FacebookRepository()

    fun facebookAuth(view: IMainView) {
        LoginManager
                .getInstance()
                .logInWithReadPermissions(
                        view.getContext() as MainActivity,
                        Arrays.asList("user_friends", "user_likes", "user_posts", "public_profile", "email")
                )
    }

    fun onFacebookLoginResult(requestCode: Int, resultCode: Int, data: Intent) {
        callbackManager.onActivityResult(requestCode, resultCode, data)
    }

    fun facebookAccessTokenChanged(oldAccessToken: AccessToken?, currentAccessToken: AccessToken?) {
        if(oldAccessToken?.token != currentAccessToken?.token) {
            repository.setFaceBookAccessToken(currentAccessToken)
        }
    }

    fun likes(): Observable<List<Profile>>? {
        return repository.posts()?.map {
            val gson = GsonBuilder().create()
            val posts = gson.fromJson<List<Post>>(
                    it.jsonObject["data"].toString(),
                    object : TypeToken<List<Post>>() {}.getType()
            )

            return@map posts
        }?.flatMap {
            return@flatMap repository.likes(it)
        }
    }

    fun logout() {
        repository.logout()
    }

    fun isLogined(): Boolean {
        return repository.token() != null
    }

}

我正在使用 Kotlin 作为开发语言。

谢谢大家!)但答案是另一种情况)问题出在逻辑上,而不是实现上)Facebook SDK 有 RequestBatch 并且不需要 Future 或停止线程)

package com.github.scrobot.likes_listener.data.facebook

import android.util.Log
import com.facebook.*
import com.github.scrobot.likes_listener.data.facebook.models.Facebook
import com.github.scrobot.likes_listener.data.facebook.models.Post
import com.github.scrobot.likes_listener.data.facebook.models.Profile
import com.google.gson.GsonBuilder
import com.google.gson.reflect.TypeToken
import rus.pifpaf.client.util.rx.RxDecorator
import rx.Observable
import java.util.*


/**
 * Created by aleksejskrobot on 07.12.16.
 */
class FacebookRepository {

    private val facebook = Facebook.instance()
    private val gson = GsonBuilder().create()

    fun posts(): Observable<GraphResponse>? {

        return RxDecorator<GraphResponse>().decorate(Observable.defer({
            val request = GraphRequest(
                    facebook.token,
                    "/me/posts",
                    null,
                    HttpMethod.GET,
                    GraphRequest.Callback { /* handle the result */ }
            )

            Observable.just(request.executeAndWait())
        }))

    }

    fun setFaceBookAccessToken(currentAccessToken: AccessToken?) {
        facebook.token = currentAccessToken
    }

    fun logout() {
        facebook.logout()
    }

    fun token(): String? {
        return facebook.token?.token
    }

    fun likes(posts: List<Post>?): Observable<List<Profile>> {

        val batch = GraphRequestBatch()

        posts?.mapTo(batch) { post ->
            GraphRequest(
                    facebook.token,
                    "/${post.id}/likes",
                    null,
                    HttpMethod.GET,
                    GraphRequest.Callback { Log.d("fb:post:id", "${post.id}: ${it.jsonObject}") }
            )
        }

        return RxDecorator<List<GraphResponse>>().decorate(rx.Observable.defer({
            rx.Observable.just(batch.executeAndWait())
        })).map {
            Log.d("batchResponse:it", it.toString())

            val list = ArrayList<Profile>()

            for (i in it) {
                list.addAll(gson.fromJson<List<Profile>>(
                        i.jsonObject["data"].toString(),
                        object : TypeToken<List<Profile>>() {}.type
                ))
            }

            list.filter {
                list.contains(it)
            }

            return@map list
        }
    }
}