Reactive Extensions - De-coupling 每个 onNext 调用在 observable 中

Reactive Extensions - De-coupling each onNext call within an observable

请放轻松,我也在学习这个主题,并且真的很享受。所以这里...

我从流中创建一个可观察的读数,当有足够的数据进入并表示我需要触发的 "IMyEvent" 类型时,我将构建该类型并调用 observer.OnNext。

此流是来自服务器的响应流,命令从我自己或外部发送到此服务器,因此我可以读取此流并据此做出反应。

对于我发送的每个命令,我都订阅了从这个流中生成的可观察对象。我可以查看我的命令是否已成功完成。我也在外部订阅了这个流,并对可能发生的其他事件做出反应。

让我们举个例子,假设有人从外部加入这个服务器的 conference,服务器将沿着这个流发送数据,然后我的 Observable 捕获这个并调用 onNext观察者。我想据此做出反应,锁定会议在此处编辑... 我所说的锁定环境的意思是我向服务器发送一个锁定命令,服务器知道 "lock itself" 不允许任何其他人加入 会议.

我有一个始终订阅 "SomeoneJoinsEvent" 的客户端(他们订阅了可观察流),同时在此订阅中执行此 onNext 我还触发了一个命令来锁定 会议。然后,此命令还临时(我使用超时运算符)订阅相同的可观察流,但我注意到我在这里 blocked/locked。

我可以看到,当我执行 onNext 时,它不会继续读取流以查看更多 IMyEvents。

所以只有我的 ideas/thoughts/brainfarts 关注... 有没有一种特殊的方法我可以以某种方式解耦这个 Observable 流,所以它不断地读取并触发 OnNext 到它的所有订阅者并且不等待他们完成?或者在中间暴露一些与主 Observable 流安全分离的东西。

  1. 我想我可以在内部订阅这个 Observable 流并创建 2 个新的 observable,但我认为当我订阅新的 observable 时我仍然会进一步传播这个问题。也许我在这里错了,请参阅 brainfart 2 和 3。
  2. 阅读 this 之后,我还想也许我应该在一个单独的线程上订阅这个 Observable 流,并将每个 IMyEvent 添加到 2 个队列中,也许这样就可以解耦它并允许我分别订阅每个队列,一个用于等待命令完成的内部调用的队列订阅,以及一个用于我的外部调用的订阅,不知何故这仍然感觉不对,如果我有很多外部订阅者到这个 Observable 流并且其中一个被阻塞怎么办?
  3. 我想我在这里有点困惑,现在我在想我的主要 Observable 流是 .Publish.RefCount,我需要做的就是每次我想接收 IMyEvent 时创建一个新的 Observable (这与创建一个队列相同,如 brain fart 2),订阅了主要的可连接可观察对象?所以内部命令订阅 1 个 observable 等待正确的事件,1 个 observable 暴露在外部,我还没有想过或尝试过这个。

有人可以帮我吗? 我希望这能理解我正在尝试做的事情。

这是我的代码示例...

//This is the observable that I create from the stream of events
public IObservable<IMyEvent> MyEvents()
{
    if (_myEventObservable != null)
    {
        return _myEventObservable;
    }

    // Here is one observable that reads data as it comes in off the asynchronous stream.
    _myEventObservable = Observable.Create<IMyEvent>(async observer =>
    {
        var myServerEventStream = await GetStreamFromMyServer(_myAuthenticationConfiguration, _token, _httpClientWrapper);
        var streamReaderAsyncState = _streamReaderAsyncStateFactory.CreateStreamReadAsyncState(myServerEventStream, 255);

        var currentStateDisposable = new SerialDisposable();
        var iterator = new Action<IStreamReaderAsyncState, Action<IStreamReaderAsyncState>>((state, self) =>
        {
            try
            {
                currentStateDisposable.Disposable = state.StreamObservable().Subscribe(
                    //OnNext
                    bytesRead =>
                    {
                        if (bytesRead == 0)
                        {
                            observer.OnCompleted();
                        }
                        else
                        {
                            //In here it does all the magic to put all the data together and only call OnNext on the observer when there is a complete IMyEvent.
                            //It is just a plain observer.OnNext(IMyEvent whatever event we have build up)
                            _messageParser.Parse(new DataChunk(state.Buffer, bytesRead), _token, observer);
                            self(state);
                        }
                    });
            }
            catch (Exception e)
            {
                observer.OnError(e);
            }
        });

        var schedulerDisposable = TaskPoolScheduler.Default.Schedule(streamReaderAsyncState, iterator);
        return new CompositeDisposable(myServerEventStream, schedulerDisposable, currentStateDisposable);
    }).Publish().RefCount();

    return _myEventObservable;
}

//Just for fuller visibility on the stream reader async state, here is the class that is returned from the "_streamReaderAsyncStateFactory.CreateStreamReadAsyncState(myServerEventStream, 255)" call   
internal class StreamReaderAsyncState : IStreamReaderAsyncState
{
    private readonly IObservable<int> _readAsyncObservable;

    public StreamReaderAsyncState(Stream stream, int bufferSize)
    {
        Buffer = new byte[bufferSize];
        _readAsyncObservable = Observable.FromAsync(() => stream.ReadAsync(Buffer, 0, bufferSize));
    }

    public byte[] Buffer { get; private set; }

    public IObservable<int> StreamObservable()
    {
        return _readAsyncObservable;
    }
}


//Externally I subscribe to this like so...
MyEvents.OfType<SomoneJoinsEvent>
.Subscribe(
    //I read somewhere that I shouldn't be making this async and using a selectmany with the async, but I am unsure.
    async myEvent => {
        await LockEnvironment()
    }
)

//The LockEnvironment call
//The myCommandPost is the LockEnvironment Command that is passed in.
private async Task<CommandResponse<TCommandResponseDto>> PostCommandAndWaitForEvent<TEventToWaitFor, TCommandResponseDto>(IMyCommandPost myCommandPost)
    where TEventToWaitFor : IMyEvent
{
    //So my intention here is to zip together the result of the post command with the TEventToWaitFor and return the first one. Otherwise if it takes too long it will return the result of the Timeout operator.
    return await MyEvents()
                    .OfType<TEventToWaitFor>()
                    //This myCommandPost.PostCommand... returns a Task<CommandResponse<TCommandResponseDto>>
                    .Zip(myCommandPost.PostCommand<TCommandResponseDto>().ToObservable(), (myEvent, myCommandResponse) => myCommandResponse)
                    .Timeout(new TimeSpan(0, 0, _myAuthenticationConfiguration.TimeoutToWaitForCommands), TimedOutLookingForMyEvent<TCommandResponseDto>())
                    .FirstAsync();
}

//The timeout observable
private IObservable<CommandResponse<TCommandResponseDto>> TimedOutLookingForMyEvent<TCommandResponseDto>()
{
    return Observable.Create<CommandResponse<TCommandResponseDto>>(observable =>
    {
        observable.OnNext(new CommandResponse<TCommandResponseDto>());
        return Disposable.Empty;
    });
}

也在此处编辑,添加我为事件解析器所做的...

internal class MyEventParser : IMessageParser
{
    private readonly ILogService _logService;
    private readonly IMyEventFactory _MyEventFactory;
    private readonly StringBuilder _data = new StringBuilder();
    private const string EventDelimiter = "\n\n";
    private readonly Regex _eventDelimiterRegex = new Regex("\n{3,}");

    public MyEventParser(ILogService logService, IMyEventFactory myEventFactory)
    {
        _logService = logService;
        _myEventFactory = myEventFactory;
    }

    public void Parse(DataChunk dataChunk, string token, IObserver<IMyEvent> myEventObserver)
    {
        _data.Append(dataChunk);
        CleanUpEventDelimiterInData();
        var numberOfSubstrings = CountNumberOfSubstrings(EventDelimiter, _data.ToString());
        if (numberOfSubstrings == 0)
        {
            return;
        }

        var events = _data.ToString().Split(new[]{EventDelimiter}, StringSplitOptions.RemoveEmptyEntries);

        events.Take(numberOfSubstrings).Foreach(x =>
        {
            _logService.InfoFormat("MyEventParser - {0} - OnNext: \n\n{1}\n\n", token.Substring(token.Length -10), x);
            myEventObserver.OnNext(_myEventFactory.Create(x));
        });

        //Clean up data of what has already been fired.
        if (events.Count() == numberOfSubstrings)
        {
            _data.Clear();
        }
        else
        {
            _data.Clear();
            _data.Append(events.Last());
        }
    }

    private void CleanUpEventDelimiterInData()
    {
        var eventDelimitersFixed = _eventDelimiterRegex.Replace(_data.ToString(), EventDelimiter);
        _data.Clear();
        _data.Append(eventDelimitersFixed);
    }

    private int CountNumberOfSubstrings(string subString, string source)
    {
        var i = 0;
        var count = 0;
        while ((i = source.IndexOf(subString, i, StringComparison.InvariantCulture)) != -1)
        {
            i += subString.Length;
            count++;
        }

        return count;
    }
}

提前感谢您的帮助:-)

首先,欢迎使用 Rx 和响应式编程。 一开始可能会让人感到困惑,但是掌握了基本的正确方法会使事情变得容易得多。

首先,我想快速浏览一下您的代码

public IObservable<IMyEvent> MyEvents()
{
    if (_myEventObservable != null)
    {
        return _myEventObservable;
    }

这看起来应该是 属性 和 private readonly 支持字段。 可观察序列是惰性评估的,因此如果没有人订阅 none 代码将 运行.

what if I have many external subscribers to this Observable stream and one of them blocks?

那么你有一个行为不端的观察者。 Rx Observable 序列的订阅者应该尽快处理他们的回调。 这意味着没有锁定,没有 IO,没有 CPU 密集处理。 如果您需要这样做,那么您可能需要将一条消息排队并完成这项工作 somewhere/sometime else.

似乎 StreamReaderAsyncState 做得还不够,而 MyEvents 做得太多了。 也许如果你有这样的东西(修改自 https://github.com/LeeCampbell/RxCookbook/tree/master/IO/Disk

public static class ObservableStreamExtensions
{
    public static IObservable<byte[]> ToObservable(this Stream source, int bufferSize)
    {
        return Observable.Create<byte[]>(async (o, cts) =>
        {
            var buffer = new byte[bufferSize];
            var bytesRead = 0;
            do 
            {
                try
                {
                    bytesRead = await source.ReadAsync(buffer, 0, bufferSize);
                    if (bytesRead > 0)
                    {
                        var output = new byte[bytesRead];
                        Array.Copy(buffer, output, bytesRead);
                        o.OnNext(output);   
                    }
                }
                catch (Exception e)
                {
                    o.OnError(e);
                }
            }
            while (!cts.IsCancellationRequested && bytesRead > 0);

            if (!cts.IsCancellationRequested && bytesRead == 0)
            {
                o.OnCompleted();
            }
        });
    }
}

那么你的代码就缩减为

private readonly IObservable<IMyEvent> _myEvents;
public ctor()
{
    _myEvents = return Observable.Create<IMyEvent>(async observer =>
    {
        var bufferSize = 255;
        var myServerEventStream = await GetStreamFromMyServer(_pexipAuthenticationConfiguration, _token, _httpClientWrapper);

        var subscription = myServerEventStream
            .ToObservable(bufferSize)
            .Select(buffer=>new DataChunk(buffer, _token))
            .Subscribe(observer);

        return new CompositeDisposable(myServerEventStream, subscription);
    })
    .Publish()
    .RefCount();
}

//This is the observable that I create from the stream of events
public IObservable<IMyEvent> MyData{ get { return _myEvents; } }

请注意,new DataChunk 现在不采用 IObserver,这对我来说看起来像代码味道。 另请注意,您在评论 shudder 中使用了术语 "magic"。 None这个应该是神奇的。 只是订阅者和转换的简单组合。

接下来我们看看问题的根源是什么:锁定。 我不知道"locking the environment"是什么意思,你似乎没有解释或证明它。

也许你想做的是有一个异步门的概念。 在这里您只需设置一个标志来指定您所处的状态。 理想情况下,这个标志也是可观察的。 然后,您可以使用该标志的状态组合其他传入的 events/commands/messages,而不是锁定系统。

在这里观看 Matt 谈论异步门的视频(转到时间 00:18:00):https://yow.eventer.com/yow-2014-1222/event-driven-user-interfaces-by-matt-barrett-and-lee-campbell-1686

回到我认为是你的实际问题,我认为你已经造成了死锁。

MyEvents.OfType<SomoneJoinsEvent>
.Subscribe(
    //I read somewhere that I shouldn't be making this async and using a selectmany with the async, but I am unsure.
    async myEvent => {
        await LockEnvironment()
    }
)

此代码正在主动阻止生产者回调。 Rx 是一个自由线程模型,所以实际上(理论上)它在幕后所做的事情非常简单。 当调用 OnNext 时,它只是循环遍历每个订阅者并调用其回调。 这里你阻塞了,所以生产者不仅不能调用下一个订阅者,也不能处理下一个消息。

所以你正在收听 MyEvents 序列,你会收到 SomoneJoinsEvent 作为第 100 条消息。 然后您尝试推送将在 MyEvents 中产生事件的命令。 收到此事件后,您将继续。 但是,您正在阻止接收该消息。 这样你就陷入了僵局。

那么现在问题又回到你身上,你希望这个 LockEnvironment 实际实现什么?

编辑:

查看您的代码,它似乎 过于复杂。 您似乎经常使用 StringBuilder,添加、查询变异和替换其内容。

我认为你可以使用更通用的解决方案来解决这个问题。

这里是一个示例,说明如何处理被读入缓冲区然后 projected/translated 到记录的流。

var testScheduler = new TestScheduler();

//A series of bytes/chars to be treated as buffer read from a stream (10 at a time).
//  a \n\n represents a record delimiter.
var source = testScheduler.CreateColdObservable<char[]>(
    ReactiveTest.OnNext(0100, new char[] { 'a', 'b', 'c', 'd', 'e', 'f', 'g', '\n', '\n', 'h' }),
    ReactiveTest.OnNext(0200, new char[] { 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', '\n', '\n' }),
    ReactiveTest.OnNext(0300, new char[] { 'q', 'r', 's', '\n', '\n', 't', 'u', 'v', '\n', '\n' }),
    ReactiveTest.OnNext(0400, new char[] { 'w', 'x', '\n', 'y', 'z', '\n', '\n' })
);

var delimiter = '\n';
var observer = testScheduler.CreateObserver<string>();
var shared = source.SelectMany(buffer=>buffer).Publish().RefCount();
var subscription = shared
    //Where we see two '\n' values then emit the buffered values
    .Buffer(() => shared.Scan(Tuple.Create(' ',' '), (acc, cur)=>Tuple.Create(acc.Item2, cur)).Where(t => t.Item1 == delimiter && t.Item2==delimiter))
    //Remove trailing delimiters
    .Select(chunk =>
        {
            var len = chunk.Count;
            while(chunk[chunk.Count-1]==delimiter)
            {
                chunk.RemoveAt(chunk.Count-1);
            }
            return chunk;
        })
    //Filter out empty buffers
    .Where(chunk=>chunk.Any())
    //Translate the record to the desired output type
    .Select(chunk=>new string(chunk.ToArray()))
    .Subscribe(observer);

testScheduler.Start();
observer.Messages.AssertEqual(
    ReactiveTest.OnNext(0100, "abcdefg"),
    ReactiveTest.OnNext(0200, "hijklmnop"),
    ReactiveTest.OnNext(0300, "qrs"),
    ReactiveTest.OnNext(0300, "tuv"),
    ReactiveTest.OnNext(0400, "wx\nyz")
);