RxSwift - 如何在可观察值发生变化但仅发出最后一个值时重试?

RxSwift - How to retry when an observable changes but emit only last value?

我正在尝试使用 RxSwift 建立聊天服务链。 基本上应该这样做:

下面的功能基本上可以工作,但暂时不检查套接字连接状态。 它应该在第二个 flatMap 之前检查套接字是否已连接,如果是,它应该使用 messageUpload 转到 flatMap,如果没有,它应该等到连接 returns 为真。 我有变量告诉我当前的连接状态(Bool)

chatSocketService.isSubscribedToChannel.asObservable()

但我不知道如何将它们组合在一起。 我尝试使用第三个 flatMap(在当前最后一个之前)它不起作用。 下一个问题是用户可以在连接恢复之前尝试发送少量消息,因此每次他点击 sendButton 时都会执行此方法,因此当连接恢复时它应该只发送最后一条消息。 知道我如何用 Rx 处理它吗?

func sendMessage(withBody body: String) {
    guard !body.isEmpty else { return }

    Observable.just(chatModel.value)
        .filter({ [=11=].product != nil })
        .flatMap({ [unowned self] chatModel -> Observable<ChatModel> in
            if chatModel.id != nil {
                return Observable.just(chatModel)
            } else {
                return self.createChat(withProductModel: chatModel.product!)
            }
        })
        .flatMap({ [unowned self] chatModel -> Observable<ChatMessageModel> in
            return self.chatService.uploadChatMessage(forChat: chatModel, withBody: body)
                    .trackActivity(self.progressHelper.activityIndicator)
        })
        .subscribe(onNext: { [unowned self] chatMessageModel in
            self.finishedSendingMessage.onNext(())
        })
        .addDisposableTo(disposeBag)
}
.flatMap({ [unowned self] chatModel -> Observable<ChatMessageModel> in
    let successfulConnect = chatSocketService.isSubscribedToChannel.asObservable()
       .skipWhile { [=10=] == false }
       .map { _ -> Void in }
       .take(1)

    return successfulConnect.flatMap { self.chatService.uploadChatMessage(forChat: chatModel, withBody: body) }
        .trackActivity(self.progressHelper.activityIndicator)
})

以上代码应该具有管理连接问题中描述的行为。 successfulConnect 是一个 Observable,它将发出一个值并在连接状态为真时完成。

skipWhile 将忽略 false 值,map 将我们的可观察对象从 Observable<Bool> 转换为 Observable<Void> 并且 take(1) 确保可观察对象完成在第一个值之后。 flatMap 然后在 sucessfulConnect 发送值时执行 uploadChatMessage


您描述的第二种行为意味着如果用户在上一条消息上传之前发送了另一条消息,则取消上一次上传。这可以通过在输入 sendMessage

时处理订阅来处理
var disposable: Disposable?

func sendMessage(withBody body: String) {
   disposable?.dispose()
   disposable = Observable.just(chatModel.value)
      // ...
      .subscribe(onNext: { /* ... */ })
}