Rx "pipeline" 具有解耦的源和目标

Rx "pipeline" with decoupled source and destination

我目前正在尝试使用RX来组织一个相对简单的数据流。我选择 RX 是因为它看起来很合身(纠正我,如果我错了)我希望它能让我以后更容易扩展 "pipeline"。

RX 对我来说还是很新,我了解基本概念并且已经关注它很长时间了,但现在是第一次评估它用于一些实际工作。

我的一般流程是:DataSource -> Pipeline -> DataDestination

DataSource returns 从不同来源收集数据的 IObservable

public class DataSource
{
    IObservable<Data> Run();
}

DataDestination负责将数据发送到另一台服务器。

public interface ILogDestination
{
    void SendData(IList<Data> dataList);
}

Pipeline 订阅 DataSource 并负责缓冲、批处理数据并将数据传递到 DataDestination。 如果 DataDestination 由于某种原因(服务器不可用,超时)失败,它还将处理 reties

public class Pipeline : IPipeline
{
    private ILogSource _source;
    private ILogDestination _destination;

    public void Start()
    {
        _destination.Initialize();
        _source.Initialize();

        _source.Run()
            .Buffer(TimeSpan.FromSeconds(1), 100) // Create batches
            .Do(_destination.SendData) // Send the data               
            .Retry(5) // Retry in case of timeouts
            .Subscribe();

            //Todo: think about error handling / retry mechanism
            //Todo: batching/buffering of data for rate limiting max (i.e. 100 per minute)
    }
}

我的问题是,如果 5 次重试失败,源生成的数据在任何情况下都不应该丢失。

捕获异常需要我提供替换数据(?),在这种情况下这对我来说毫无意义。吞下异常会丢失我的数据 - 我也想避免这种情况。

我考虑过在管道内实现一个 "database" 层,它将所有消息排队,以便源和目标分离,但这感觉像是一种非常非反应性的方式。

http://www.introtorx.com/ 上的小例子已经帮了大忙,但我在这里错过了大图。

Rx 是完成这项工作的正确工具还是我强加于此?

感谢您的宝贵时间。

您不应在主流程中使用 .Do() 运算符,它是为了副作用(如调试日志记录)而设计的。

Rx 适合你想要实现的目标,因为你想要在功能上组合未来价值流。尝试这样的事情:

  // make your SendData return a Observable which can succeed (emit Unit value[=void] or something more usefull) or throw exception if sending data fails
  function IOBservable<Unit> SendData(IList<string> logs){}

  _source.Run()
   .Buffer(TimeSpan.FromSeconds(1), 100) // Create batches
   .SelectMany(bufferedLogs => SendData(bufferedLogs)
     .Retry(5)
     .Catch(/* choose what to do with original logs after 5 retries*/)
    )
   .Subscribe();

还有一些决定需要做出:

  • 5次重试后怎么办,存盘?
  • Retry() 将在原始 SendData() 失败后立即重试。也许您想实施退避机制?

Rx 非常适合用作管道的导管。 Rx 可以有效地用于批处理和执行退休,例如您的查询正在做的事情。

但是如果你需要处理数据,我认为 Rx 没有那么强大。即,如果收到事件有效负载时,您需要执行一些计算密集型或一些 I/O。执行其中任何一项都可能花费非常多的*时间。这意味着您很可能在到达并等待处理时缓冲**其他值,或者更糟的是,您正在阻止生产者。

为什么这是个问题?

在缓冲(隐式队列)的情况下,您有生产者认为他们已经发布但尚未处理的数据。如果订阅被处置或管道中发生错误,那么您的数据将丢失。

在生产者被阻止的情况下,你正在打破 Rx 范式,你可能应该使用 IEnumerable<T>/Pull。在这种情况下,显式 队列可能是一个很好的解决方案。

回到你的问题。

首先我们不知道您的数据源是热序列还是冷序列。即如果我们停止收听,我们会丢弃消息吗?或者如果我们继续重新订阅(例如使用 Retry())它会继续重复我们的数据吗?

热点数据?

如果您的数据源很热,那么我认为如果您希望 Destination 获得所有数据的机会最大,则必须将数据持久保存在管道中。在这种情况下,我认为最好只订阅并将所有可以转储到队列中。然而,这是一个痛点,因为队列应该由磁盘支持,因此涉及 I/O。据我了解,大多数日志记录框架都实现了非阻塞写入,它只是缓冲条目。专用线程将缓冲区排空到磁盘。是的,可以删除一些条目,但它要么在写入时阻塞,要么有损。

在这个过程中,我们尝试优化每个部分以尽可能快。即尽快将消息转储到磁盘,尽可能避免框架、库、序列化的开销。然后在另一个 process/thread 中,我们获取这些值并执行将其发送到另一台服务器的缓慢过程。

冷数据?

如果您的数据源是冷的,那么我认为您将需要一种方法来指定要从中恢复的序列中的一个点。例如时间戳、检查点、版本或序列号。然后您的目的地将需要能够公开其当前检查点。源需要能够允许您从该检查点订阅。 https://geteventstore.com/ and https://github.com/damianh/SqlStreamStore 之类的东西支持这一点。

在这种情况下,您所依赖的事件流技术有望减轻发送到可能导致备份的目标服务器的成本。如果那是为您的查询提供专用流,那么您可能不是 "blocking" 生产者。

Observable.Defer(()=> _source.GetFrom(_destination.CurrentCheckpoint()))
    //Create batches
    .Buffer(TimeSpan.FromSeconds(1), 100) 
    //Custom Operator for you to write of find on the internet
    .BackoffRetry(
        initialDelay:TimeSpan.FromSeconds(1), 
        backoff: delay=>delay*2, 
        maxRetries :5)
    //Send the data. 
    // This should be considered a cheap operation. i.e. no I/O or computations.
    .Subscribe(
      _destination.SendData,
      ex=>/*We know this can error, so need to cater for the 6th failure*/);

*是的,我知道这是双重否定。我的观点是说 50 毫秒可能看起来并不重要 "significant",但是当您可能期望 100 毫秒/秒时,它就不能被认为是微不足道的。

**这里的缓冲就是我所说的"Implicit queueing"。虽然队列无处不在,但我更喜欢显式队列而不是隐式队列。