如何使用 TakeUntil 提前终止可观察链?

How do I prematurely terminate an observable chain using TakeUntil?

我目前有:

var disconnect = Observable
    .FromEvent<ExceptionListener, Exception>(
        (handler) => connection.ExceptionListener += handler,
        (handler) => connection.ExceptionListener -= handler
    )
    .Do((exception) =>
    {
        // note: This line does get executed when I trigger the error scenario,
        //       so I know it is in fact happening.
        //
        Console.WriteLine(exception.Message);
    });

var messages = Observable
    .Using(
        () => connection.CreateSession(),
        (session) => Observable
    .Using(
        () => session.GetQueue(address),
        (queue) => Observable
    .Using(
        () => session.CreateConsumer(queue),
        (consumer) => Observable
            .FromEvent<MessageListener, IMessage>(
                (handler) => consumer.Listener += handler,
                (handler) => consumer.Listener -= handler
            )
            .TakeUntil(disconnect)
    )))

每当我在 messages 上构建或以其他方式订阅时,似乎 TakeUntil 没有被兑现并且链没有中止,即使 disconnect 实际上确实如此似乎在发射。

理想情况下,我希望这个 IObservable(或任何基于它的结果)到 complete/terminate(我不确定这里的正确词是什么)一旦disconnect observable 发射。

为了完整起见,我的消费代码实际上是:

await messages.ForEachAsync(async (message) =>
{
    // note: Do things with `message`
}, cancellationToken);

// note: During normal operation, this never gets run. But I want the await 
//       above to complete when my `TakeUntil` above emits.
Console.WriteLine("Chain completed/terminated.");

问题可能在于 observable 的消费方式:

await messages.ForEachAsync(async (message) =>
{
    // note: Do things with `message`
});

ForEachAsync operator does not accept an asynchronous delegate (Func<Task>), so the lambda passed is async void(也称为火灾和崩溃)。如果要为可观察序列的每个元素调用异步 lambda,则必须使用投影创建 IObservable<Task<T>>,然后使用三个展平运算符之一(ConcatMergeSwitch) 以取回包含异步调用结果的 IObservable<TResult>。如果这些没有结果,您可以 return 类似 Unit.Default.

的东西
await messages.Select(async message =>
{
    // note: Do things with `message`
    return Unit.Default;
}).Merge().DefaultIfEmpty();

那里的 DefaultIfEmpty 只是为了防止 InvalidOperationException 以防序列以零消息完成。


具有可取消 lambda 的示例:

await messages.Select(message => Observable.FromAsync(async cancellationToken =>
{
    // note: Do things with `message`, while observing the cancellationToken
    return Unit.Default;
})).Merge().DefaultIfEmpty();

您的代码目前运行良好。这是我的测试方法。

首先,我将您的原始代码转换为可编译且 运行 可用的状态。

public delegate void ExceptionListener();
public delegate void MessageListener();

public interface IMessage
{
    
}

public static class connection
{
    public static event ExceptionListener ExceptionListener;
    public static Session CreateSession() => new Session();
}

public class Session : IDisposable
{
    public Queue GetQueue(string address) => new Queue();
    public Consumer CreateConsumer(Queue queue) => new Consumer();
    
    #region IDisposable Support
    private bool disposedValue = false; // To detect redundant calls

    protected virtual void Dispose(bool disposing)
    {
        if (!disposedValue)
        {
            if (disposing)
            {
                // TODO: dispose managed state (managed objects).
            }

            // TODO: free unmanaged resources (unmanaged objects) and override a finalizer below.
            // TODO: set large fields to null.

            disposedValue = true;
        }
    }

    // TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
    // ~Session() {
    //   // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
    //   Dispose(false);
    // }

    // This code added to correctly implement the disposable pattern.
    public void Dispose()
    {
        // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
        Dispose(true);
        // TODO: uncomment the following line if the finalizer is overridden above.
        // GC.SuppressFinalize(this);
    }
    #endregion

}

public class Queue : IDisposable
{

    #region IDisposable Support
    private bool disposedValue = false; // To detect redundant calls

    protected virtual void Dispose(bool disposing)
    {
        if (!disposedValue)
        {
            if (disposing)
            {
                // TODO: dispose managed state (managed objects).
            }

            // TODO: free unmanaged resources (unmanaged objects) and override a finalizer below.
            // TODO: set large fields to null.

            disposedValue = true;
        }
    }

    // TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
    // ~Queue() {
    //   // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
    //   Dispose(false);
    // }

    // This code added to correctly implement the disposable pattern.
    public void Dispose()
    {
        // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
        Dispose(true);
        // TODO: uncomment the following line if the finalizer is overridden above.
        // GC.SuppressFinalize(this);
    }
    #endregion

}

public class Consumer : IDisposable
{
    public event MessageListener Listener;

    #region IDisposable Support
    private bool disposedValue = false; // To detect redundant calls

    protected virtual void Dispose(bool disposing)
    {
        if (!disposedValue)
        {
            if (disposing)
            {
                // TODO: dispose managed state (managed objects).
            }

            // TODO: free unmanaged resources (unmanaged objects) and override a finalizer below.
            // TODO: set large fields to null.

            disposedValue = true;
        }
    }

    // TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
    // ~Consumer() {
    //   // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
    //   Dispose(false);
    // }

    // This code added to correctly implement the disposable pattern.
    public void Dispose()
    {
        // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
        Dispose(true);
        // TODO: uncomment the following line if the finalizer is overridden above.
        // GC.SuppressFinalize(this);
    }
    #endregion
}

现在我可以运行这个代码:

var address = "";

var disconnect =
    Observable
        .FromEvent<ExceptionListener, Exception>(
            handler => connection.ExceptionListener += handler,
            handler => connection.ExceptionListener -= handler)
        .Do(exception => Console.WriteLine(exception.Message));

var messages =
    Observable.Using(
        () => connection.CreateSession(),
        session => Observable.Using(
            () => session.GetQueue(address),
            queue => Observable.Using(
                () => session.CreateConsumer(queue),
                consumer =>
                    Observable
                        .FromEvent<MessageListener, IMessage>(
                            handler => consumer.Listener += handler,
                            handler => consumer.Listener -= handler)
                        .TakeUntil(disconnect))));

然后我重构以避免事件并使用几个 Subject<T> 实例来模拟事件:

var address = "";

var disconnectSubject = new Subject<Exception>();

var disconnect =
    disconnectSubject
        .Do(exception => Console.WriteLine(exception.Message));

var messageSubject = new Subject<IMessage>();

var messages =
    Observable.Using(
        () => connection.CreateSession(),
        session => Observable.Using(
            () => session.GetQueue(address),
            queue => Observable.Using(
                () => session.CreateConsumer(queue),
                consumer => messageSubject.TakeUntil(disconnect))));

现在我可以运行这个代码:

messages.Subscribe(m => Console.WriteLine("Message"));

messageSubject.OnNext(null);
messageSubject.OnNext(null);
messageSubject.OnNext(null);
disconnectSubject.OnNext(new Exception());
messageSubject.OnNext(null);

我得到的输出是:

Message
Message
Message
Exception of type 'System.Exception' was thrown.

disconnect 主题触发时,messages 观察确实结束了。

然后我将简单的 messages.Subscribe 代码替换为:

var cancellationToken = new CancellationToken();
var task =
    messages
        .ObserveOn(Scheduler.Default)
        .ForEachAsync(async (message) =>
        {
            await Task.Delay(TimeSpan.FromSeconds(1.0));
            Console.WriteLine("Message");
        }, cancellationToken);

我现在得到:

Exception of type 'System.Exception' was thrown.
Message
Message
Message

除了 await Task.Delay(TimeSpan.FromSeconds(1.0)); 产生的输出顺序的变化,这里没有任何变化。

messages observable 在这两种情况下都如预期的那样结束。

您的代码,如此处所示,工作正常。