如何使用 TakeUntil 提前终止可观察链?
How do I prematurely terminate an observable chain using TakeUntil?
我目前有:
var disconnect = Observable
.FromEvent<ExceptionListener, Exception>(
(handler) => connection.ExceptionListener += handler,
(handler) => connection.ExceptionListener -= handler
)
.Do((exception) =>
{
// note: This line does get executed when I trigger the error scenario,
// so I know it is in fact happening.
//
Console.WriteLine(exception.Message);
});
var messages = Observable
.Using(
() => connection.CreateSession(),
(session) => Observable
.Using(
() => session.GetQueue(address),
(queue) => Observable
.Using(
() => session.CreateConsumer(queue),
(consumer) => Observable
.FromEvent<MessageListener, IMessage>(
(handler) => consumer.Listener += handler,
(handler) => consumer.Listener -= handler
)
.TakeUntil(disconnect)
)))
每当我在 messages
上构建或以其他方式订阅时,似乎 TakeUntil
没有被兑现并且链没有中止,即使 disconnect
实际上确实如此似乎在发射。
理想情况下,我希望这个 IObservable
(或任何基于它的结果)到 complete/terminate(我不确定这里的正确词是什么)一旦disconnect
observable 发射。
为了完整起见,我的消费代码实际上是:
await messages.ForEachAsync(async (message) =>
{
// note: Do things with `message`
}, cancellationToken);
// note: During normal operation, this never gets run. But I want the await
// above to complete when my `TakeUntil` above emits.
Console.WriteLine("Chain completed/terminated.");
问题可能在于 observable 的消费方式:
await messages.ForEachAsync(async (message) =>
{
// note: Do things with `message`
});
ForEachAsync
operator does not accept an asynchronous delegate (Func<Task>
), so the lambda passed is async void
(也称为火灾和崩溃)。如果要为可观察序列的每个元素调用异步 lambda,则必须使用投影创建 IObservable<Task<T>>
,然后使用三个展平运算符之一(Concat
、Merge
或 Switch
) 以取回包含异步调用结果的 IObservable<TResult>
。如果这些没有结果,您可以 return 类似 Unit.Default
.
的东西
await messages.Select(async message =>
{
// note: Do things with `message`
return Unit.Default;
}).Merge().DefaultIfEmpty();
那里的 DefaultIfEmpty
只是为了防止 InvalidOperationException
以防序列以零消息完成。
具有可取消 lambda 的示例:
await messages.Select(message => Observable.FromAsync(async cancellationToken =>
{
// note: Do things with `message`, while observing the cancellationToken
return Unit.Default;
})).Merge().DefaultIfEmpty();
您的代码目前运行良好。这是我的测试方法。
首先,我将您的原始代码转换为可编译且 运行 可用的状态。
public delegate void ExceptionListener();
public delegate void MessageListener();
public interface IMessage
{
}
public static class connection
{
public static event ExceptionListener ExceptionListener;
public static Session CreateSession() => new Session();
}
public class Session : IDisposable
{
public Queue GetQueue(string address) => new Queue();
public Consumer CreateConsumer(Queue queue) => new Consumer();
#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
// TODO: dispose managed state (managed objects).
}
// TODO: free unmanaged resources (unmanaged objects) and override a finalizer below.
// TODO: set large fields to null.
disposedValue = true;
}
}
// TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
// ~Session() {
// // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
// Dispose(false);
// }
// This code added to correctly implement the disposable pattern.
public void Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
// TODO: uncomment the following line if the finalizer is overridden above.
// GC.SuppressFinalize(this);
}
#endregion
}
public class Queue : IDisposable
{
#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
// TODO: dispose managed state (managed objects).
}
// TODO: free unmanaged resources (unmanaged objects) and override a finalizer below.
// TODO: set large fields to null.
disposedValue = true;
}
}
// TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
// ~Queue() {
// // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
// Dispose(false);
// }
// This code added to correctly implement the disposable pattern.
public void Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
// TODO: uncomment the following line if the finalizer is overridden above.
// GC.SuppressFinalize(this);
}
#endregion
}
public class Consumer : IDisposable
{
public event MessageListener Listener;
#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
// TODO: dispose managed state (managed objects).
}
// TODO: free unmanaged resources (unmanaged objects) and override a finalizer below.
// TODO: set large fields to null.
disposedValue = true;
}
}
// TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
// ~Consumer() {
// // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
// Dispose(false);
// }
// This code added to correctly implement the disposable pattern.
public void Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
// TODO: uncomment the following line if the finalizer is overridden above.
// GC.SuppressFinalize(this);
}
#endregion
}
现在我可以运行这个代码:
var address = "";
var disconnect =
Observable
.FromEvent<ExceptionListener, Exception>(
handler => connection.ExceptionListener += handler,
handler => connection.ExceptionListener -= handler)
.Do(exception => Console.WriteLine(exception.Message));
var messages =
Observable.Using(
() => connection.CreateSession(),
session => Observable.Using(
() => session.GetQueue(address),
queue => Observable.Using(
() => session.CreateConsumer(queue),
consumer =>
Observable
.FromEvent<MessageListener, IMessage>(
handler => consumer.Listener += handler,
handler => consumer.Listener -= handler)
.TakeUntil(disconnect))));
然后我重构以避免事件并使用几个 Subject<T>
实例来模拟事件:
var address = "";
var disconnectSubject = new Subject<Exception>();
var disconnect =
disconnectSubject
.Do(exception => Console.WriteLine(exception.Message));
var messageSubject = new Subject<IMessage>();
var messages =
Observable.Using(
() => connection.CreateSession(),
session => Observable.Using(
() => session.GetQueue(address),
queue => Observable.Using(
() => session.CreateConsumer(queue),
consumer => messageSubject.TakeUntil(disconnect))));
现在我可以运行这个代码:
messages.Subscribe(m => Console.WriteLine("Message"));
messageSubject.OnNext(null);
messageSubject.OnNext(null);
messageSubject.OnNext(null);
disconnectSubject.OnNext(new Exception());
messageSubject.OnNext(null);
我得到的输出是:
Message
Message
Message
Exception of type 'System.Exception' was thrown.
当 disconnect
主题触发时,messages
观察确实结束了。
然后我将简单的 messages.Subscribe
代码替换为:
var cancellationToken = new CancellationToken();
var task =
messages
.ObserveOn(Scheduler.Default)
.ForEachAsync(async (message) =>
{
await Task.Delay(TimeSpan.FromSeconds(1.0));
Console.WriteLine("Message");
}, cancellationToken);
我现在得到:
Exception of type 'System.Exception' was thrown.
Message
Message
Message
除了 await Task.Delay(TimeSpan.FromSeconds(1.0));
产生的输出顺序的变化,这里没有任何变化。
messages
observable 在这两种情况下都如预期的那样结束。
您的代码,如此处所示,工作正常。
我目前有:
var disconnect = Observable
.FromEvent<ExceptionListener, Exception>(
(handler) => connection.ExceptionListener += handler,
(handler) => connection.ExceptionListener -= handler
)
.Do((exception) =>
{
// note: This line does get executed when I trigger the error scenario,
// so I know it is in fact happening.
//
Console.WriteLine(exception.Message);
});
var messages = Observable
.Using(
() => connection.CreateSession(),
(session) => Observable
.Using(
() => session.GetQueue(address),
(queue) => Observable
.Using(
() => session.CreateConsumer(queue),
(consumer) => Observable
.FromEvent<MessageListener, IMessage>(
(handler) => consumer.Listener += handler,
(handler) => consumer.Listener -= handler
)
.TakeUntil(disconnect)
)))
每当我在 messages
上构建或以其他方式订阅时,似乎 TakeUntil
没有被兑现并且链没有中止,即使 disconnect
实际上确实如此似乎在发射。
理想情况下,我希望这个 IObservable
(或任何基于它的结果)到 complete/terminate(我不确定这里的正确词是什么)一旦disconnect
observable 发射。
为了完整起见,我的消费代码实际上是:
await messages.ForEachAsync(async (message) =>
{
// note: Do things with `message`
}, cancellationToken);
// note: During normal operation, this never gets run. But I want the await
// above to complete when my `TakeUntil` above emits.
Console.WriteLine("Chain completed/terminated.");
问题可能在于 observable 的消费方式:
await messages.ForEachAsync(async (message) =>
{
// note: Do things with `message`
});
ForEachAsync
operator does not accept an asynchronous delegate (Func<Task>
), so the lambda passed is async void
(也称为火灾和崩溃)。如果要为可观察序列的每个元素调用异步 lambda,则必须使用投影创建 IObservable<Task<T>>
,然后使用三个展平运算符之一(Concat
、Merge
或 Switch
) 以取回包含异步调用结果的 IObservable<TResult>
。如果这些没有结果,您可以 return 类似 Unit.Default
.
await messages.Select(async message =>
{
// note: Do things with `message`
return Unit.Default;
}).Merge().DefaultIfEmpty();
那里的 DefaultIfEmpty
只是为了防止 InvalidOperationException
以防序列以零消息完成。
具有可取消 lambda 的示例:
await messages.Select(message => Observable.FromAsync(async cancellationToken =>
{
// note: Do things with `message`, while observing the cancellationToken
return Unit.Default;
})).Merge().DefaultIfEmpty();
您的代码目前运行良好。这是我的测试方法。
首先,我将您的原始代码转换为可编译且 运行 可用的状态。
public delegate void ExceptionListener();
public delegate void MessageListener();
public interface IMessage
{
}
public static class connection
{
public static event ExceptionListener ExceptionListener;
public static Session CreateSession() => new Session();
}
public class Session : IDisposable
{
public Queue GetQueue(string address) => new Queue();
public Consumer CreateConsumer(Queue queue) => new Consumer();
#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
// TODO: dispose managed state (managed objects).
}
// TODO: free unmanaged resources (unmanaged objects) and override a finalizer below.
// TODO: set large fields to null.
disposedValue = true;
}
}
// TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
// ~Session() {
// // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
// Dispose(false);
// }
// This code added to correctly implement the disposable pattern.
public void Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
// TODO: uncomment the following line if the finalizer is overridden above.
// GC.SuppressFinalize(this);
}
#endregion
}
public class Queue : IDisposable
{
#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
// TODO: dispose managed state (managed objects).
}
// TODO: free unmanaged resources (unmanaged objects) and override a finalizer below.
// TODO: set large fields to null.
disposedValue = true;
}
}
// TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
// ~Queue() {
// // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
// Dispose(false);
// }
// This code added to correctly implement the disposable pattern.
public void Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
// TODO: uncomment the following line if the finalizer is overridden above.
// GC.SuppressFinalize(this);
}
#endregion
}
public class Consumer : IDisposable
{
public event MessageListener Listener;
#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
// TODO: dispose managed state (managed objects).
}
// TODO: free unmanaged resources (unmanaged objects) and override a finalizer below.
// TODO: set large fields to null.
disposedValue = true;
}
}
// TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
// ~Consumer() {
// // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
// Dispose(false);
// }
// This code added to correctly implement the disposable pattern.
public void Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
// TODO: uncomment the following line if the finalizer is overridden above.
// GC.SuppressFinalize(this);
}
#endregion
}
现在我可以运行这个代码:
var address = "";
var disconnect =
Observable
.FromEvent<ExceptionListener, Exception>(
handler => connection.ExceptionListener += handler,
handler => connection.ExceptionListener -= handler)
.Do(exception => Console.WriteLine(exception.Message));
var messages =
Observable.Using(
() => connection.CreateSession(),
session => Observable.Using(
() => session.GetQueue(address),
queue => Observable.Using(
() => session.CreateConsumer(queue),
consumer =>
Observable
.FromEvent<MessageListener, IMessage>(
handler => consumer.Listener += handler,
handler => consumer.Listener -= handler)
.TakeUntil(disconnect))));
然后我重构以避免事件并使用几个 Subject<T>
实例来模拟事件:
var address = "";
var disconnectSubject = new Subject<Exception>();
var disconnect =
disconnectSubject
.Do(exception => Console.WriteLine(exception.Message));
var messageSubject = new Subject<IMessage>();
var messages =
Observable.Using(
() => connection.CreateSession(),
session => Observable.Using(
() => session.GetQueue(address),
queue => Observable.Using(
() => session.CreateConsumer(queue),
consumer => messageSubject.TakeUntil(disconnect))));
现在我可以运行这个代码:
messages.Subscribe(m => Console.WriteLine("Message"));
messageSubject.OnNext(null);
messageSubject.OnNext(null);
messageSubject.OnNext(null);
disconnectSubject.OnNext(new Exception());
messageSubject.OnNext(null);
我得到的输出是:
Message
Message
Message
Exception of type 'System.Exception' was thrown.
当 disconnect
主题触发时,messages
观察确实结束了。
然后我将简单的 messages.Subscribe
代码替换为:
var cancellationToken = new CancellationToken();
var task =
messages
.ObserveOn(Scheduler.Default)
.ForEachAsync(async (message) =>
{
await Task.Delay(TimeSpan.FromSeconds(1.0));
Console.WriteLine("Message");
}, cancellationToken);
我现在得到:
Exception of type 'System.Exception' was thrown.
Message
Message
Message
除了 await Task.Delay(TimeSpan.FromSeconds(1.0));
产生的输出顺序的变化,这里没有任何变化。
messages
observable 在这两种情况下都如预期的那样结束。
您的代码,如此处所示,工作正常。