RxJava2如何分离可观察发射器的不同实现

RxJava2 how to separate different implementation of observable emitter

这是我的情况。 我想从 Android 的本地 LocationManager 或 Google 服务公开 Observable<'Location'> 的 2 个不同实现。

我想检查一下我使用的是原生方法还是 gms。

所以最后我想向我的客户公开 Observable - 他不需要知道我收集位置的方法。 请注意,我正在使用这个库: https://github.com/mcharmas/Android-ReactiveLocation

用于从 google 服务公开 Observable。它已经公开了我正在搜索的 Observable。 但是另一个呢 - 位置管理器。它使用回调。

这是我的实现:

  var locationEmitter : Observable<Location> = Observable.empty()


init {
    configureEmitter()
}

@SuppressLint("MissingPermission")
private fun configureEmitter(){
    if (!isUsingLocationNativeApi)
      locationEmitter = reactiveLocationProvider.getUpdatedLocation(reactiveLocationRequest)
    else{
      configureNativeLocationEmitter()
    }
}

@SuppressLint("MissingPermission")
private fun configureNativeLocationEmitter() {


    val mLocationCallbackNativeApi: LocationListener = object : LocationListener {

        override fun onLocationChanged(location: Location) {
        locationEmitter = Observable.create<Location> { emitter -> emitter.onNext(location) }

        }

        override fun onStatusChanged(provider: String, status: Int, extras: Bundle) {}

        override fun onProviderEnabled(provider: String) {}

        override fun onProviderDisabled(provider: String) {}

    }


    try {
        locationManager.requestLocationUpdates(LocationManager.GPS_PROVIDER,
                (geoEventsIntervalSeconds * 1000).toLong(),
                geoEventsDistanceMeters.toFloat(),
                mLocationCallbackNativeApi,
                Looper.getMainLooper())
    } catch (ignored: IllegalArgumentException) {
        ignored.printStackTrace()

    }

    try {
        locationManager.requestLocationUpdates(LocationManager.NETWORK_PROVIDER,
                (geoEventsIntervalSeconds * 1000).toLong(),
                geoEventsDistanceMeters.toFloat(),
                mLocationCallbackNativeApi,
                Looper.getMainLooper())
    } catch (ignored: IllegalArgumentException) {
        ignored.printStackTrace()
    }
}



@SuppressLint("MissingPermission")
override fun onLocationUpdate(): Observable<Location> {
    return locationEmitter
    }

}

但是,它不适用于 LocationManager 实现,

在客户端,我希望能够做这样的事情:

 rxLocationRepository.onLocationUpdate()
            .subscribe(Consumer { location -> 
    //do something with location, no matter it is from LocationManager or google play services
 })

如何在 Kotlin 中做到这一点?

更新

如何摆脱subject(实际上是摆脱subject)和其他locationProvider的跟踪?如果客户不想使用它,我不想失去使用它们,例如在客户端调用这种方法

 override fun stopLocationUpdates() {
 //get rid of the location updates
}

例如在客户端:

 rxLocationRepository.stopLocationUpdates()

每次调用 onLocationChanged 时,您都在创建一个新的可观察对象,这是没有意义的。 只需使用 PublishSubject(或 BehaviorSubject 在这种情况下可能更合适),它可以作为 Observable 返回给客户并在 onLocationChanged 中调用 subject.onNext(location)