为什么 RxJava Completable 发射器中的 Firebase 任务不执行?

Why Firebase Task in RxJava Completable emitter doesn't execute?

我正在开发一个连接到 Firestore 的 Firebase Android 应用程序。命名法是该集合是 "Assets"。示例代码有一些简单的操作,例如 addAssetdeleteAsset,它们工作正常。这是与Firebase实际对话的数据存储层,视图模型层在其之上。

class FirestoreAssetRepository(secondaryDB: FirebaseFirestore) : IAssetRepository {

    companion object {
        private const val TAG = "FirestoreAssetRepo"
        private const val ASSET_COLLECTION = "Assets"
    }

    private var remoteDB: FirebaseFirestore
    private var changeObservable: Observable<List<DocumentSnapshot>>

    init {
        remoteDB = secondaryDB
    }

    override fun addAsset(asset: Asset): Completable {
        return Completable.create { emitter ->
            remoteDB.collection(ASSET_COLLECTION)
                .add(mapToAssetData(asset))
                .addOnSuccessListener {
                    if (!emitter.isDisposed) {
                        emitter.onComplete()
                    }
                }
                .addOnFailureListener {
                    if (!emitter.isDisposed) {
                        emitter.onError(it)
                    }
                }
        }
    }

    override fun deleteAsset(assetId: String): Completable {
        return Completable.create { emitter ->
            remoteDB.collection(ASSET_COLLECTION)
                .document(assetId)
                .delete()
                .addOnSuccessListener {
                    if (!emitter.isDisposed) {
                        emitter.onComplete()
                    }
                }
                .addOnFailureListener {
                    if (!emitter.isDisposed) {
                        emitter.onError(it)
                    }
                }
        }
    }

我正在向存储库添加一个将修改特定文档的操作。

    override fun lockUnlockAsset(assetId: String): Completable {
        Log.d(TAG, "lockUnlockAsset")
        return Completable.create { emitter ->
            remoteDB.collection(ASSET_COLLECTION)
                .document(assetId)
                .get()
                .addOnSuccessListener {
                    Log.d(TAG, "Unlocking")
                    val remoteAsset = mapDocumentToRemoteAsset(it)
                    it.reference.update(getUnlockLocation())
                    if (!emitter.isDisposed) {
                        emitter.onComplete()
                    }
                }
                .addOnFailureListener {
                    Log.d(TAG, "Could not find asset to unlock")
                    if (!emitter.isDisposed) {
                        emitter.onError(it)
                    }
                }
        }
    }

执行达到 Log.d(TAG, "lockUnlockAsset") 但从未达到 Log.d(TAG, "Unlocking")。如果我在第二个日志命令处放置一个断点,它在开始时是通常的红点,但是当调用进入函数时,图标变为灰色 "don't enter" 图标,当我将鼠标悬停在它上面时 Android Studio 告诉我 "No executable found at ..."。所以那里肯定有问题。

我是 Kotlin 和 RxJava2 的新手。我怎样才能让它工作?


更新:回答 Pavel 的问题:这些函数是从 ViewModel 层调用的:

fun deleteAsset(assetId: String) {
    repository.deleteAsset(assetId)
        .subscribeOn(Schedulers.io())
        .subscribe(
            {},
            {
                it.printStackTrace()
            })
        .addTo(disposable)
}

fun addAsset(assetTitle: String) {
    repository.addAsset(Asset("${System.currentTimeMillis()}", assetTitle))
        .subscribeOn(Schedulers.io())
        .subscribe(
            {},
            {
                it.printStackTrace()
            })
        .addTo(disposable)
}

fun lockUnlockAsset(assetId: String) {
    repository.lockUnlockAsset(assetId)
}

我在存储库级别试验 .subscribeOn(Schedulers.io()).observe 的组合。也许是 .addTo(disposable) 让它起作用了,我不确定我错过了什么。现在它正在工作,我等待 Pavel 的回答。

我在数据存储库级别尝试了 .subscribeOn(...)observeOn(..) + .observe(...) 的组合,但我应该遵循视图模型中的模式(视图模型调用数据存储库的功能):它是一个链式 subscribeOn + subscribe + addTo(disposable):

fun lockUnlockAsset(assetId: String) {
    repository.lockUnlockAsset(assetId)
        .subscribeOn(Schedulers.io())
        .subscribe(
            {},
            {
                it.printStackTrace()
            })
        .addTo(disposable)
}

感谢 Pavel 指出这一点。