什么Combine operator/approach可以加载"paged API"的所有页面?

What Combine operator/approach can be used to load all pages of "paged API"?

我正在使用网络 API,它提供的结果达到给定的限制(请求的 pageSize 参数)。如果结果数超过此限制,则响应消息会预先填充一个 URL,可以向其发出后续请求以获取更多结果。如果有更多结果,将再次以相同方式显示。

我打算一次获取所有结果。


目前我有类似以下的请求和响应结构:

// Request structure
struct TvShowsSearchRequest {
    let q: String
    let pageSize: Int?
}

// Response structure
struct TvShowsSearchResponse: Decodable {
    let next: String?
    let total : Int
    let searchTerm : String
    let searchResultListShow: [SearchResult]?
}

在使用完成处理程序解决问题“old style”时,我不得不编写一个处理程序,它使用 [=98= 触发 'handle more' 请求] 的响应:

func handleResponse(request: TvShowsSearchRequest, result: Result<TvShowsSearchResponse, Error>) -> Void {
    switch result {
    case .failure(let error):
        fatalError(error.localizedDescription)
    case .success(let value):
        print("> Total number of shows matching the query: \(value.total)")
        print("> Number of shows fetched: \(value.searchResultListShow?.count ?? 0)")

        if let moreUrl = value.next {
            print("> URL to fetch more entries \(moreUrl)")

            // start recursion here: a new request, calling the same completion handler...
            dataProvider.handleMore(request, nextUrl: moreUrl, completion: handleResponse)
        }
    }
}

let request = TvShowsSearchRequest(query: "A", pageSize: 50)
dataProvider.send(request, completion: handleResponse)

在内部,sendhandleMore 函数都调用相同的 internalSend,后者使用 requesturl,然后调用 URLSession.dataTask(...),检查 HTTP 错误,解码响应并调用完成块。


现在我想使用 Combine 框架并使用自动提供分页响应的 Publisher,而无需调用另一个 Publisher。

因此,我编写了一个 requestPublisher 函数,它采用 request 和(初始)url 和 returns 一个 URLSession.dataTaskPublisher 来检查 HTTP 错误(使用 tryMap),decode 响应。

现在我必须确保只要最后一个发出的值具有有效 next URL 并且完成事件发生,发布者就会自动 "renews" 自己。

我发现有一个 Publisher.append 方法可以准确地做到这一点,但我目前遇到的问题是:我只想在特定条件下追加(=有效 next ).

以下伪代码对其进行了说明:

func requestPublisher(for request: TvShowsSearchRequest, with url: URL) -> AnyPublisher<TvShowsSearchResponse, Error> {
    // ... build urlRequest, skipped here ...

    let apiCall = self.session.dataTaskPublisher(for: urlRequest)
        .tryMap { data, response -> Data in
            guard let httpResponse = response as? HTTPURLResponse else {
                throw APIError.server(message: "No HTTP response received")
            }
            if !(200...299).contains(httpResponse.statusCode) {
                throw APIError.server(message: "Server respondend with status: \(httpResponse.statusCode)")
            }
            return data
        }
        .decode(type: TvShowsSearchResponse.self, decoder: JSONDecoder())
        .eraseToAnyPublisher()
    return apiCall
}


// Here I'm trying to use the Combine approach

var moreURL : String?

dataProvider.requestPublisher(request)
    .handleEvents(receiveOutput: {
        moreURL = [=14=].next   // remember the "next" to fetch more data
    })
    .append(dataProvider.requestPublisher(request, next: moreURL))  // this does not work, because moreUrl was not prepared at the time of creation!!
    .sink(receiveCompletion: { print([=14=]) }, receiveValue: { print([=14=]) })
    .store(in: &cancellableSet)

我想有些人已经以被动的方式解决了这个问题。每当我找到可行的解决方案时,它都会再次涉及递归。我认为这不是正确的解决方案。

我正在寻找发送响应但没有提供回调函数的发布者。可能一定有一个使用 Publisher of Publishers 的解决方案,但我还不了解它。


在@kishanvekariya 发表评论后,我尝试与多个发布者一起构建所有内容:

  1. 正在获取对 "main" 请求的响应的 mainRequest 发布者。

  2. 一个新的 urlPublisher,它正在接收 "main" 的所有 next URL 或任何后续请求。

  3. 一个新的 moreRequest 发布者,它正在为 urlPublisher 的每个值获取一个新请求,将所有 next URL 发送回urlPublisher.

然后我尝试使用 append.

moreRequest 发布者附加到 mainRequest
var urlPublisher = PassthroughSubject<String, Error>()

var moreRequest = urlPublisher
    .flatMap {
        return dataProvider.requestPublisher(request, next: [=15=])
            .handleEvents(receiveOutput: {
                if let moreURL = [=15=].next {
                    urlPublisher.send(moreURL)
                }
            })
    }

var mainRequest = dataProvider.requestPublisher(request)
    .handleEvents(receiveOutput: {
        if let moreURL = [=15=].next {
            urlPublisher.send(moreURL)
        }
    })
    .append(moreRequest)
    .sink(receiveCompletion: { print([=15=]) }, receiveValue: { print([=15=]) })
    .store(in: &cancellableSet)

但这仍然不起作用...我总是得到 "main" 请求的结果。缺少所有跟进请求。

看来我自己找到了解决办法

我的想法是,我有一个 urlPublisher,它用第一个 URL 初始化,然后执行并可能将 next URL 提供给 urlPublisher 并通过这样做引起后续请求。

let url = endpoint(for: request)     // initial URL
let urlPublisher = CurrentValueSubject<URL, Error>(url)

urlPublisher
    .flatMap {
        return dataProvider.requestPublisher(for: request, with: [=10=])
            .handleEvents(receiveOutput: {
                if let next = [=10=].next, let moreURL = URL(string: self.transformNextUrl(nextUrl: next)) {
                    urlPublisher.send(moreURL)
                } else {
                    urlPublisher.send(completion: .finished)
                }
            })
    }
    .sink(receiveCompletion: { print([=10=]) }, receiveValue: { print([=10=]) })
    .store(in: &cancellableSet)

所以最后,我使用了两个发布者的组合和 flatMap 而不是非功能性的 append。可能这也是人们从一开始就瞄准的解决方案...

在我深入探讨响应之前,只想说一次请求所有页面可能不是最好的主意:

  1. 它增加了服务器的压力,可能分页 API 存在是有原因的,以避免在后端进行昂贵的操作
  2. 关于页面请求失败时怎么办的讨论一直存在:报错,报部分结果,重试请求?
  3. 请记住,一旦您推出产品并且有许多客户请求整个电视节目数据集,后端服务器可能会超载,并产生更多故障

现在,回到正题,假设您的 requestPublisher 正常工作,您可以做的是编写一个链接这些调用的发布者,并且在收到最后一页之前不报告值。

代码可能如下所示:

func allPages(for request: TvShowsSearchRequest, with url: URL) -> AnyPublisher<TvShowsSearchResponse, Error> {
    // helper function to chain requests for all pages
    func doRequest(with pageURL: URL, accumulator: TvShowsSearchResponse) -> AnyPublisher<TvShowsSearchResponse, Error> {
        requestPublisher(for: request, with: pageURL)
            .flatMap { (r: TvShowsSearchResponse) -> AnyPublisher<TvShowsSearchResponse, Error> in
                if let next = r.next, let nextURL = URL(string: next) {
                    // we have a `next` url, append the received page,
                    // and make the next request
                    return doRequest(with: nextURL, accumulator: accumulator.accumulating(from: r))
                } else {
                    // no more pages, we have the response already build up
                    // just report it
                    return Just(accumulator).setFailureType(to: Error.self).eraseToAnyPublisher()
                }
            }
            .eraseToAnyPublisher()
    }
    return doRequest(with: url, accumulator: TvShowsSearchResponse())
}

您基本上使用 TvShowsSearchResponse 作为链式请求结果的累加器。

以上代码还需要以下扩展:

extension TvShowsSearchResponse {
    init() {
        self.init(next: nil, total: 0, searchTerm: "", searchResultListShow: nil)
    }
                  
    func accumulating(from other: TvShowsSearchResponse) -> TvShowsSearchResponse {
        TvShowsSearchResponse(
            next: nil,
            total: other.total,
            searchTerm: other.searchTerm,
            searchResultListShow: (searchResultListShow ?? []) + (other.searchResultListShow ?? []))
    }
}

,为清楚起见,累积 searchResultListShow 值的代码被放入专用扩展中。