为 Alamofire 请求创建像 RxSwift 的 Observable.Create 这样的 Combine 发布者

Creating a Combine's publisher like RxSwift's Observable.Create for an Alamofire request

我使用下面的一段代码来生成一个冷RxSwift Observable:

func doRequest<T :Mappable>(request:URLRequestConvertible) -> Observable<T> {
        let observable = Observable<T>.create { [weak self] observer in
        guard let self = self else { return Disposables.create() }
        self.session.request(request).validate().responseObject { (response: AFDataResponse<T>) in
            switch response.result {
                case .success(let obj):
                    observer.onNext(obj)
                    observer.onCompleted()
                case .failure(let error):
                    let theError = error as Error
                    observer.onError(theError)
            }
        }
         return Disposables.create()
    }
    return observable
}

其中 Mappable 是基于 ObjectMapper 的类型,self.session 是 Alamofire 的 Session 对象。

我在 Apple 的 Combine 框架中找不到与 Observable.create {...} 等效的东西。我只发现 URLSession.shared.dataTaskPublisher(for:) 使用 Apple 的 URLSession class.

创建发布者

如何将上述可观察对象转换为 Alamofire Combine 的发布者?

编辑: 使用 rob 提供的解决方案,我得到了以下结果:

 private let apiQueue = DispatchQueue(label: "API", qos: .default, attributes: .concurrent)

  func doRequest<T>(request: URLRequestConvertible) -> AnyPublisher<T, AFError> where T : Mappable {

       Deferred { [weak self] () -> Future<T, AFError> in

          guard let self = self else {
              return Future<T, AFError> { promise in  
promise(.failure(.explicitlyCancelled))  }
        }

          return Future { promise in
            self.session
            .request(request)
            .validate()
            .responseObject { (response: AFDataResponse<T>) in
                promise(response.result)
            }
        }
    }
    .handleEvents(receiveCompletion: { completion in
        if case .failure (let error) = completion {
                //handle the error
        }
    })
    .receive(on: self.apiQueue)
    .eraseToAnyPublisher()
}

EDIT2: 我必须删除私有队列,因为它不需要,Alamofire 自己解析解码,所以删除队列及其用法(.receive(on: self.apiQueue))

您可以使用 FutureresponseObject 的回调连接到 Combine Publisher。我手头没有 Alamofire 用于测试,但我认为以下方法应该有效:

func doRequest<T: Mappable>(request: URLRequestConvertible) -> AnyPublisher<T, AFError> {
    return Future { promise in
        self.session
            .request(request)
            .validate()
            .responseObject { (response: AFDataResponse<T>) in
            promise(response.result)
        }
    }.eraseToAnyPublisher()
}

请注意,这比 RxSwift 版本稍微简单一些,因为 promise 直接接受一个 Result,所以我们不必切换 response.result

A Future 是一种“不冷不热”的出版商。它就像一个热的可观察对象,因为它会立即执行它的主体并且只执行一次,所以它会立即启动 Alamofire 请求。它也像一个冷可观察对象,因为每个订阅者最终都会收到一个值或一个错误(假设您最终调用 promise)。 Future 只执行一次它的主体,但它会缓存你传递给 promise.

Result

您可以通过将 Future 包装在 Deferred:

中来创建一个真正的冷发布者
func doRequest<T: Mappable>(request: URLRequestConvertible) -> AnyPublisher<T, AFError> {
    return Deferred {
        Future { promise in
            self.session
                .request(request)
                .validate()
                .responseObject { (response: AFDataResponse<T>) in
                    promise(response.result) }
        }
    }.eraseToAnyPublisher()
}
每次订阅它时,

Deferred 都会调用它的主体创建一个新的内部 Publisher。因此,每次订阅时,您都将创建一个新的 Future,它将立即启动一个新的 Alamofire 请求。如果您想使用 retry 运算符,这很有用,如 .