分区可观察的第二个流从未到达

Partitioned Observable second stream never reached

我有一个处理 Web 请求的 Observable,我想在单独的流中处理成功或失败,这与 this example 非常相似。我的脚本和示例之间的主要区别是我不想合并流然后订阅。我将 RxPY 1.6.1 与 Python 2.7 一起使用。

request = Observable.of(requests.get(self.URL, params=request_params))

request_success, request_failed = request.partition(lambda r: r.status_code == requests.codes.ok)          

request_failed.subscribe(lambda r: print_msg('failure!'))
request_success.subscribe(lambda r: print_msg('success!'))

当请求失败时,脚本会按预期打印 failure!。但是,当响应正常时,脚本不会打印 success!。有趣的是,当您切换订阅顺序时,success! 确实会打印出来,而 failure! 却永远不会打印出来。

我想也许 request 无法被多播,所以我尝试将 publish() 添加到 request 可观察对象并在创建订阅后调用 connect()。那没有帮助(所以我在上面的最小示例中将其遗漏了)。

我错过了什么?

通过将您的代码与 the unit tests that RxPy has for the partition operator 进行比较,看起来代码几乎是正确的。

您走对了路,您确实需要将请求 Observable 转换为多播 observable。

Here is working code (tested on Repl.it, you will have to convert the list of requests back to the classes/objects you're using in your code):

from rx import Observable

def print_msg(message):
  print(message)

class Request(object):
  def __init__(self, status_code):
    self.status_code = status_code

request = Observable.of(
  Request(200),
  Request(404),
  Request(412),
  Request(200),
).publish()

request_success, request_failed = request.partition(lambda r: \
  r.status_code == 200)

request_success.subscribe(lambda r: print_msg('success!'))
request_failed.subscribe(lambda r: print_msg('failure!'))
request.connect()

请注意,一旦请求列表变成一个 Observable,它就会被发布 (Observable.of(...).publish()),并且只有 我们订阅分区的 observable 后,我们才可以呼叫连接。

输出为:

success!
failure!
failure!
success!