使用反应式扩展以正确的顺序处理多个响应
Use reactive extension to handle multiple responses in correct order
情况
我有一个系统,一个请求产生两个响应。请求和响应有相应的观察值:
IObservable<RequestSent> _requests;
IObservable<MainResponseReceived> _mainResponses;
IObservable<SecondResponseReceived> _secondaryResponses;
保证 RequestSent
事件早于 MainResponseReceived
和 SecondaryResponseReceived
发生,但响应的顺序是随机的。
我有什么
最初我想要处理两个响应的处理程序,所以我压缩了 observables:
_requests
.SelectMany(async request =>
{
var main = _mainResponses.FirstAsync(m => m.Id == request.Id);
var secondary = _secondaryResponses.FirstAsync(s => s.Id == request.Id);
var zippedResponse = main.Zip(secondary, (m, s) => new MainAndSecondaryResponseReceived {
Request = request,
Main = m,
Secondary = s
});
return await zippedResponse.FirstAsync(); ;
})
.Subscribe(OnMainAndSecondaryResponseReceived);
我需要什么
现在我还需要处理 MainResponseReceived
而无需等待 SecondaryResponseRecieved 并且必须保证 OnMainResponseRecieved 在调用 OnMainAndSecondaryResponseReceived
之前完成
请问这两个订阅怎么定义?
测试用例 1:
RequestSent
发生
MainResponseReceived
发生 -> 调用 OnMainResponseReceived
SecondaryResponseReceive
d 发生 -> 调用 OnMainAndSecondaryResponseReceived
测试用例 2:
RequestSent
发生
SecondaryResponseReceived
发生
MainResponseReceived occurs
-> 调用 OnMainResponseReceived -> 调用 OnMainAndSecondaryResponseReceived
我认为您的方向基本正确。我会停止处理所有 Async 的东西——那只会让事情变得复杂。
试试这个查询:
var query =
_requests
.SelectMany(request =>
_mainResponses.Where(m => m.Id == request.Id).Take(1)
.Do(m => OnMainResponseReceived(m))
.Zip(
_secondaryResponses.Where(s => s.Id == request.Id).Take(1),
(m, s) => new MainAndSecondaryResponseReceived()
{
Request = request,
Main = m,
Secondary = s
}));
var subscription =
query.Subscribe(x => OnMainAndSecondaryResponseReceived(x));
.Do(...)
是您的代码中重要的缺失部分。它确保 OnMainResponseReceived
在 OnMainAndSecondaryResponseReceived
之前被调用,无论主要响应还是次要响应先出现。
我测试了这个:
Subject<RequestSent> _requestsSubject = new Subject<RequestSent>();
Subject<MainResponseReceived> _mainResponsesSubject = new Subject<MainResponseReceived>();
Subject<SecondResponseReceived> _secondaryResponsesSubject = new Subject<SecondResponseReceived>();
IObservable<RequestSent> _requests = _requestsSubject.AsObservable();
IObservable<MainResponseReceived> _mainResponses = _mainResponsesSubject.AsObservable();
IObservable<SecondResponseReceived> _secondaryResponses = _secondaryResponsesSubject.AsObservable();
_requestsSubject.OnNext(new RequestSent() { Id = 42 });
_mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 42 });
_secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 42 });
_requestsSubject.OnNext(new RequestSent() { Id = 99 });
_mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 99 });
_secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 99 });
情况
我有一个系统,一个请求产生两个响应。请求和响应有相应的观察值:
IObservable<RequestSent> _requests;
IObservable<MainResponseReceived> _mainResponses;
IObservable<SecondResponseReceived> _secondaryResponses;
保证 RequestSent
事件早于 MainResponseReceived
和 SecondaryResponseReceived
发生,但响应的顺序是随机的。
我有什么
最初我想要处理两个响应的处理程序,所以我压缩了 observables:
_requests
.SelectMany(async request =>
{
var main = _mainResponses.FirstAsync(m => m.Id == request.Id);
var secondary = _secondaryResponses.FirstAsync(s => s.Id == request.Id);
var zippedResponse = main.Zip(secondary, (m, s) => new MainAndSecondaryResponseReceived {
Request = request,
Main = m,
Secondary = s
});
return await zippedResponse.FirstAsync(); ;
})
.Subscribe(OnMainAndSecondaryResponseReceived);
我需要什么
现在我还需要处理 MainResponseReceived
而无需等待 SecondaryResponseRecieved 并且必须保证 OnMainResponseRecieved 在调用 OnMainAndSecondaryResponseReceived
之前完成
请问这两个订阅怎么定义?
测试用例 1:
RequestSent
发生MainResponseReceived
发生 -> 调用 OnMainResponseReceivedSecondaryResponseReceive
d 发生 -> 调用 OnMainAndSecondaryResponseReceived
测试用例 2:
RequestSent
发生SecondaryResponseReceived
发生MainResponseReceived occurs
-> 调用 OnMainResponseReceived -> 调用 OnMainAndSecondaryResponseReceived
我认为您的方向基本正确。我会停止处理所有 Async 的东西——那只会让事情变得复杂。
试试这个查询:
var query =
_requests
.SelectMany(request =>
_mainResponses.Where(m => m.Id == request.Id).Take(1)
.Do(m => OnMainResponseReceived(m))
.Zip(
_secondaryResponses.Where(s => s.Id == request.Id).Take(1),
(m, s) => new MainAndSecondaryResponseReceived()
{
Request = request,
Main = m,
Secondary = s
}));
var subscription =
query.Subscribe(x => OnMainAndSecondaryResponseReceived(x));
.Do(...)
是您的代码中重要的缺失部分。它确保 OnMainResponseReceived
在 OnMainAndSecondaryResponseReceived
之前被调用,无论主要响应还是次要响应先出现。
我测试了这个:
Subject<RequestSent> _requestsSubject = new Subject<RequestSent>();
Subject<MainResponseReceived> _mainResponsesSubject = new Subject<MainResponseReceived>();
Subject<SecondResponseReceived> _secondaryResponsesSubject = new Subject<SecondResponseReceived>();
IObservable<RequestSent> _requests = _requestsSubject.AsObservable();
IObservable<MainResponseReceived> _mainResponses = _mainResponsesSubject.AsObservable();
IObservable<SecondResponseReceived> _secondaryResponses = _secondaryResponsesSubject.AsObservable();
_requestsSubject.OnNext(new RequestSent() { Id = 42 });
_mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 42 });
_secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 42 });
_requestsSubject.OnNext(new RequestSent() { Id = 99 });
_mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 99 });
_secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 99 });