使用 RxSwift 创建 "reactive" API

Creating a "reactive" API with RxSwift

我正在尝试 RxSwift 并想为我的一个常规 API 调用创建一个 "streaming API"。

我的想法是进行常规调用(它已经使用了没有任何问题的可观察对象)并让计时器触发此类调用并将结果发送到同一个可观察对象上,因此视图控制器可以自动更新,而不是这样做这(伪代码如下):

func getLocations() -> Observable<[Location]> {
  return Observable<[Location]>.create {
    sink in
    NSURLSession.sharedSession.rx_JSON(API.locationsRequest).map {
       json in
       return json.flatMap { Location([=12=]) }
    }
  }
}

我希望发生这种情况(伪代码如下):

func getLocations(interval: NSTimeInterval) -> Observable<[Location]> {
  return Observable<[Location]>.create {
    sink in
    NSTimer(interval) {
      NSURLSession.sharedSession.rx_JSON(API.locationsRequest).map {
        json in
        sink.onNext(json.flatMap { Location([=13=]) })
      }
    }
  }
}

我尝试的最后一件事是将 NSTimer 添加到混合中,但我无法弄清楚如何获取对接收器的引用并将其传递给计时器调用的方法以实际将事件发送到管道,因为计时器的处理程序必须在独立方法上。我尝试从 BlocksKit 中引入块计时器扩展,但计时器每秒触发一次,而不是在指定的时间间隔触发,这违背了目的。

我也读过 Interval 运算符,但我不确定这是正确的方法。

关于如何做到这一点的任何指示?

最终目标是让计时器仅在上一次调用完成(成功或失败)后才重新启动。

您应该执行类似于以下代码的操作:

  func getLocations(interval: NSTimeInterval) -> Observable<[CLLocation]> {

    return Observable<[CLLocation]>.create { observer in

      let interval = 20.0

      let getLocationDisposable = Observable<Int64>.interval(interval, scheduler: MainScheduler.instance)
        .subscribe { (e: Event<Int64>) in

          NSURLSession.sharedSession.rx_JSON(API.locationsRequest).map {
            json in
            observer.onNext(json.flatMap { Location([=10=]) })
          }
      }

      return AnonymousDisposable {
        getLocationDisposable.dispose()
      }

    }
  }

上面的代码每 20 秒触发一次 API.locationsRequest 并将结果发送到同一个 observable,请注意,当 maim observable dispose 时,你必须处理 Interval。