使用 Combine 创建可重用管道的正确方法是什么?

What's the proper method for creating re-usable pipelines with Combine?

我对函数式响应式编程世界还比较陌生,并且仍在努力理解这些概念。我正在利用 SDK 发出一些网络请求——特别是查询远程数据库。 SDK returns 是一个发布者,我有一个可以将结果转换为模型对象的工作管道。这是工作流水线:

let existingClaimQuery = "SELECT Id, Subject, CaseNumber FROM Case WHERE Status != 'Closed' ORDER BY CaseNumber DESC"
let requestForOpenCases = RestClient.shared.request(forQuery: existingClaimQuery, apiVersion: RestClient.apiVersion)
caseCancellable = RestClient.shared
  .publisher(for: requestForOpenCases)
  .receive(on: RunLoop.main)
  .tryMap({restresponse -> [String:Any] in
    let json = try restresponse.asJson() as? [String:Any]
    return json ?? RestClient.JSONKeyValuePairs()
  })
  .map({json -> [[String:Any]] in
    let records = json["records"] as? [[String:Any]]
    return records ?? [[:]]
  })
  .map({
    [=10=].map{(item) -> Claim in
      return Claim(
        id: item["Id"] as? String ?? "None Listed",
        subject: item["Subject"] as? String ?? "None Listed",
        caseNumber: item["CaseNumber"] as? String ?? "0"
      )
    }
  })
  .mapError{error -> Error in
    print(error)
    return error
  }
  .catch{ error in
    return Just([])
  }
.assign(to: \.claims, on: self)

我开始处理代码的另一部分,并意识到我经常 需要执行相同的过程 - 编写查询,为该查询创建请求,然后处理它通过最终 returns 一个 [[String:Any]] 的管道。

所以这是百万美元的问题。 正确 封装此管道的方法是什么,这样我就可以重新使用它而不必 copy/pasta 整个代码库中的整个管道?这是我的...尝试,但感觉...不对?

class QueryStream: ObservableObject {

  var query: String = ""
  private var queryCancellable: AnyCancellable?

  @Published var records: [[String:Any]] = [[String:Any]]()

  func execute(){
    let queryRequest = RestClient.shared.request(forQuery: query, apiVersion: RestClient.apiVersion)

    queryCancellable = RestClient.shared.publisher(for: queryRequest)
      .receive(on: RunLoop.main)
      .tryMap({restresponse -> [String:Any] in
        let json = try restresponse.asJson() as? [String:Any]
        return json ?? [String:Any]()
      })
      .map({json -> [[String:Any]] in
        let records = json["records"] as? [[String:Any]]
        return records ?? [[:]]
      })
      .mapError{error -> Error in
        print(error)
        return error
      }
      .catch{ error in
        return Just([])
      }
    .assign(to: \.records, on: self)
  }

}

这仍然需要为每次使用编写一个管道。我觉得应该有一些方法可以实现像管道这样的一次性承诺,允许

let SomeRecords = QueryStream("Query here").execute()

我也是n00b吗?想多了?堆栈的智慧是什么?

整个管道不可重复使用。 发布者 可重复使用。当我说 "publisher" 时,我的意思是初始发布者加上附属于它的运营商。 (请记住,运营商本身就是发布者。)发布者可以作为某物的 属性 存在,因此您可以订阅它,或者它可以针对特定情况(如特定查询请求)由功能。

为了说明,这里有一个一次性管道:

let s = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let url = URL(string:s)!
let eph = URLSessionConfiguration.ephemeral
let session = URLSession(configuration: eph)
session.dataTaskPublisher(for: url)
    .map {[=10=].data}
    .replaceError(with: Data())
    .compactMap { UIImage(data:[=10=]) }
    .receive(on: DispatchQueue.main)
    .assign(to: \.image, on: self.iv)
    .store(in:&self.storage)

该管道尝试从 URL 下载数据,测试它是否是图像数据,如果是,则将图像数据转换为图像并将其显示在界面的图像视图中.

假设我想为各种不同的远程图像执行此操作。显然,到处重复整个管道是荒谬的。前面不同的是 URL,所以让我们将管道的第一部分封装为发布者,可以根据 URL:

按需生成
func image(fromURL url:URL) -> AnyPublisher<UIImage,Never> {
    let eph = URLSessionConfiguration.ephemeral
    let session = URLSession(configuration: eph)
    return session.dataTaskPublisher(for: url)
        .map {[=11=].data}
        .replaceError(with: Data())
        .compactMap { UIImage(data:[=11=]) }
        .receive(on: DispatchQueue.main)
        .eraseToAnyPublisher()
}

现在,我们代码中唯一需要在不同地方重复的是 订阅者 到该发布者:

let s = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let url = URL(string:s)!
image(fromURL:url)
    .map{Optional([=12=])}
    .assign(to: \.image, on: self.iv)
    .store(in:&self.storage)

看到了吗?在其他地方,我们可能有一个不同的 URL,我们可能会对调用 image(fromURL:) 时弹出的 UIImage 做一些不同的事情,这很好;流水线的大部分已封装,无需重复。

您的示例管道的发布者容易受到相同类型的封装和重用的影响。

首先请注意,我认为您正在将 main 分派到管道的早期阶段。据我所知,您所有的 map 转换都是纯函数(没有副作用或对可变状态的引用),因此它们也可以 运行 在后台线程上运行,因此不会阻塞UI.

其次,正如 Matt 所说,Publisher 通常可以重复使用。您的管道构建了一个大的复合体 Publisher,然后订阅它,这会产生一个 AnyCancellable。因此,排除复杂的大 Publisher 而不是订阅。

为了方便起见,您可以将其分解为 RestClient 上的扩展方法:

extension RestClient {
    func records<Record>(
        forQuery query: String,
        makeRecord: @escaping ([String: Any]) throws -> Record)
        -> AnyPublisher<[Record], Never>
    {
        let request = self.request(forQuery: query, apiVersion: RestClient.apiVersion)

        return self.publisher(for: request)
            .tryMap { try [=10=].asJson() as? [String: Any] ?? [:] }
            .map { [=10=]["records"] as? [[String: Any]] ?? [] }
            .tryMap { try [=10=].map { try makeRecord([=10=]) } }
            .mapError { dump([=10=]) } // dump is a Swift standard function
            .replaceError(with: []) // simpler than .catch
            .eraseToAnyPublisher()
    }
}

那么你可以这样使用它:

struct Claim {
    var id: String
    var subject: String
    var caseNumber: String
}

extension Claim {
    static func from(json: [String: Any]) -> Claim {
        return .init(
            id: json["Id"] as? String ?? "None Listed",
            subject: json["Subject"] as? String ?? "None Listed",
            caseNumber: json["CaseNumber"] as? String ?? "0")
    }
}

class MyController {
    var claims: [Claim] = []
    var caseCancellable: AnyCancellable?

    func run() {
        let existingClaimQuery = "SELECT Id, Subject, CaseNumber FROM Case WHERE Status != 'Closed' ORDER BY CaseNumber DESC"
        caseCancellable = RestClient.shared.records(forQuery: existingClaimQuery, makeRecord: Claim.from(json:))
            .receive(on: RunLoop.main)
            .assign(to: \.claims, on: self)
    }
}

请注意,我已将 receive(on: RunLoop.main) 运算符放在订阅发布者的方法中,而不是将其构建到发布者中。这使得在调度到主线程之前在后台调度程序上添加额外的运算符变得容易 运行。


更新

来自您的评论:

In promise syntax, i could say execute run() as defined above, and .then(doSomethingWithThatData()) knowing that the doSomethingWithThatData wouldn't run until the intial work had completed successfully. I'm trying to develop a setup where I need to use this records(fromQuery:) method runs, and then (and only then) do soemthing with that data. I'm struggling with how to bolt that on to the end.

我不知道你使用的是什么 promise 实现,所以很难知道你的 .then(doSomethingWithThatData()) 做了什么。你写的东西在 Swift 中并没有多大意义。也许你的意思是:

.then { data in doSomething(with: data) }

在这种情况下,在 data 可用之前不可能调用 doSomething(with:) 方法,因为 doSomething(with:)data 作为参数!