为什么Rx buffer在buffer不包含items的情况下会继续执行method?
Why does Rx buffer continuously perform method when buffer contains no items?
我有一个充当缓冲区的 Rx Observable。现在它在获得 10 个项目时或 100 毫秒后执行订阅中的方法,以先到者为准。
我注意到我的方法每 100 毫秒连续被调用一次,即使缓冲区中没有任何项目,这让我感到惊讶。如果我的方法没有从缓冲区接收到任何项目,那么立即使我的方法 return 非常简单,但我认为它像那样在后台搅动是很奇怪的。
这是为什么?你建议我如何最好地处理这个问题?我是 Rx 的新手,所以也许我在做一些奇怪的事情。这是我的代码的简化版本:
private Subject<KeyValuePair<int, Action<MyData>>> serverRequests;
public MyBufferClass(IMyServer server, IScheduler scheduler)
{
this.serverRequests = new Subject<KeyValuePair<int, Action<MyData>>>();
this.serverRequests
.Buffer(TimeSpan.FromMilliseconds(100), 10, scheduler)
.Subscribe(buffer => GetMultipleItemsFromServer(buffer));
}
public void GetSingleItemFromServer(int id, Action<MyData> callback)
{
this.serverRequests.OnNext(new KeyValuePair<int, Action<MyData>>(id, callback));
}
public void GetMultipleItemsFromServer(IEnumerable<KeyValuePair<int, Action<MyData>>> idsWithCallbacks)
{
if (idsWithCallbacks.IsNullOrEmpty()) return;
this.server.GetMultipleItems(idsWithCallbacks)
}
在我的测试中,如果我调用 GetSingleItemFromServer 5 次然后将我的 TestScheduler 提前 1000 毫秒,我认为 GetMultipleItemsFromServer 只会被调用一次,但它被调用了 10 次。
在这种情况下,一个优雅的解决方案是在 Buffer 之后直接使用 Where 运算符来过滤掉任何空结果。像这样:
stream
.Buffer (...)
.Where (x => x.Any())
.Subscribe (x => {...}, ex => {...});
至于为什么 Buffer 会这样,我想最好是展示一个空集合并允许消费者选择如何处理它,而不是吞下它并拒绝这个机会。
另外,我不会在订阅块中调用您的服务器。我认为最好将任何异步操作作为 Rx 流组合本身的一部分,并将订阅操作限制为处理最终结果的任何轻量级操作,即更新 UI、记录 success/failure 等等。像这样:
(from request in serverRequests
.Buffer (TimeSpan.FromMinutes (1))
.Where (x => x.Any())
from response in Observable.Start(server.GetMultipleItems(...))
select response)
.Subscribe (x => {}, ex => {});
这样做的好处包括:
- 能够在您的服务器调用中使用更多 Rx 运算符,例如 Timeout()、Retry()、Catch() 等。
-能够处理 Subscribe() 重载中的任何管道错误
-管道的独立调度和使用 SubscribeOn()/ObserveOn() 的订阅操作。
也许这样试试:
public MyBufferClass(IMyServer server, IScheduler scheduler)
{
this.serverRequests = new Subject<KeyValuePair<int, Action<MyData>>>();
this.serverRequests
.GroupByUntil(x => 1, x => Observable.Timer(TimeSpan.FromMilliseconds(1000)))
.SelectMany(x => x.ToArray())
.Subscribe(buffer => GetMultipleItemsFromServer(buffer));
}
这不会给你空的结果。
关于 .Buffer(...)
问题的答案 - 这就是它的设计方式。没有比这更复杂的了。
我有一个充当缓冲区的 Rx Observable。现在它在获得 10 个项目时或 100 毫秒后执行订阅中的方法,以先到者为准。
我注意到我的方法每 100 毫秒连续被调用一次,即使缓冲区中没有任何项目,这让我感到惊讶。如果我的方法没有从缓冲区接收到任何项目,那么立即使我的方法 return 非常简单,但我认为它像那样在后台搅动是很奇怪的。
这是为什么?你建议我如何最好地处理这个问题?我是 Rx 的新手,所以也许我在做一些奇怪的事情。这是我的代码的简化版本:
private Subject<KeyValuePair<int, Action<MyData>>> serverRequests;
public MyBufferClass(IMyServer server, IScheduler scheduler)
{
this.serverRequests = new Subject<KeyValuePair<int, Action<MyData>>>();
this.serverRequests
.Buffer(TimeSpan.FromMilliseconds(100), 10, scheduler)
.Subscribe(buffer => GetMultipleItemsFromServer(buffer));
}
public void GetSingleItemFromServer(int id, Action<MyData> callback)
{
this.serverRequests.OnNext(new KeyValuePair<int, Action<MyData>>(id, callback));
}
public void GetMultipleItemsFromServer(IEnumerable<KeyValuePair<int, Action<MyData>>> idsWithCallbacks)
{
if (idsWithCallbacks.IsNullOrEmpty()) return;
this.server.GetMultipleItems(idsWithCallbacks)
}
在我的测试中,如果我调用 GetSingleItemFromServer 5 次然后将我的 TestScheduler 提前 1000 毫秒,我认为 GetMultipleItemsFromServer 只会被调用一次,但它被调用了 10 次。
在这种情况下,一个优雅的解决方案是在 Buffer 之后直接使用 Where 运算符来过滤掉任何空结果。像这样:
stream
.Buffer (...)
.Where (x => x.Any())
.Subscribe (x => {...}, ex => {...});
至于为什么 Buffer 会这样,我想最好是展示一个空集合并允许消费者选择如何处理它,而不是吞下它并拒绝这个机会。
另外,我不会在订阅块中调用您的服务器。我认为最好将任何异步操作作为 Rx 流组合本身的一部分,并将订阅操作限制为处理最终结果的任何轻量级操作,即更新 UI、记录 success/failure 等等。像这样:
(from request in serverRequests
.Buffer (TimeSpan.FromMinutes (1))
.Where (x => x.Any())
from response in Observable.Start(server.GetMultipleItems(...))
select response)
.Subscribe (x => {}, ex => {});
这样做的好处包括:
- 能够在您的服务器调用中使用更多 Rx 运算符,例如 Timeout()、Retry()、Catch() 等。
-能够处理 Subscribe() 重载中的任何管道错误
-管道的独立调度和使用 SubscribeOn()/ObserveOn() 的订阅操作。
也许这样试试:
public MyBufferClass(IMyServer server, IScheduler scheduler)
{
this.serverRequests = new Subject<KeyValuePair<int, Action<MyData>>>();
this.serverRequests
.GroupByUntil(x => 1, x => Observable.Timer(TimeSpan.FromMilliseconds(1000)))
.SelectMany(x => x.ToArray())
.Subscribe(buffer => GetMultipleItemsFromServer(buffer));
}
这不会给你空的结果。
关于 .Buffer(...)
问题的答案 - 这就是它的设计方式。没有比这更复杂的了。