在 Reactive Extensions 中订阅结束时关闭非托管资源
Close unmanaged resources when Subscription end in Reactive Extensions
我正在从 Rx 向网络写入数据。当订阅结束时,我自然会使用 Finally
来关闭我的流。这在 OnError()
和 OnComplete()
上都能正常工作。 Rx 会依次 运行 OnNext()
... OnNext()
, OnComplete()
, Finally()
.
然而,有时我想提前终止序列,为此我使用 Dispose()
。 Finally()
现在 运行 与上次 OnNext()
调用并行,导致在 OnNext()
中仍在写入流时出现异常,以及不完整的写入。
我的订阅大概是这样的:
NetworkStream stm = client.GetStream();
IDisposable disp = obs
.Finally(() => {
client.Close();
})
.Subscribe(d => {
client.GetStream().Write(d.a, 0, d.a.Lenght);
client.GetStream().Write(d.b, 0, d.b.Lenght);
} () => {
client.GetStream().Write(something(), 0, 1);
});
Thread.sleep(1000);
disp.Dispose();
我也试过另一种方法,CancellationToken
。
如何正确取消订阅?我不介意它是否跳过 OnComplete()
,只要 Finally()
仍然是 运行。但是,运行宁Finally()
并行是有问题的。
我也觉得应该有更好的方法来管理资源,将声明移到序列中,这将是一个更好的解决方案。
编辑:下面的代码更清楚地显示了问题。我希望它总是打印 true,相反,它经常给出 false,表明 Dispose 在最后一个 OnNext
之前结束。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Try finally");
for (int i = 0; i < 10; i++)
{
Finally();
}
Console.WriteLine("Try using");
for (int i = 0; i < 10; i++)
{
Using();
}
Console.WriteLine("Try using2");
for (int i = 0; i < 10; i++)
{
Using2();
}
Console.ReadKey();
}
private static void Using2()
{
bool b = true, c = true, d;
var dis = Disposable.Create(() => c = b);
IDisposable obDis = Observable.Using(
() => dis,
_ => Observable.Create<Unit>(obs=>
Observable.Generate(0,
i => i < 1000,
i => i + 1,
i => i,
i => TimeSpan.FromMilliseconds(1)
).Subscribe(__ => { b = false; Thread.Sleep(100); b = true; })))
.Subscribe();
Thread.Sleep(15);
obDis.Dispose();
d = b;
Thread.Sleep(101);
Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
}
private static void Using()
{
bool b = true, c = true, d;
var dis = Disposable.Create(() => c = b);
IDisposable obDis = Observable.Using(
() => dis,
_ => Observable.Generate(0,
i => i < 1000,
i => i + 1,
i => i,
i => TimeSpan.FromMilliseconds(1)
)).Subscribe(_ => { b = false; Thread.Sleep(100); b = true; });
Thread.Sleep(15);
obDis.Dispose();
d = b;
Thread.Sleep(101);
Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
}
private static void Finally()
{
bool b = true, c = true, d;
IDisposable obDis = Observable.Generate(0,
i => i < 1000,
i => i + 1,
i => i,
_ => DateTime.Now.AddMilliseconds(1)
)
.Finally(() => c = b)
.Subscribe(_ => { b = false; Thread.Sleep(100); b = true; });
Thread.Sleep(15);
obDis.Dispose();
d = b;
Thread.Sleep(101);
Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
}
}
}
Finally
很可能不是您想要的。当您取消订阅时,它不会处置您的资源。相反,它的行为类似于 C# 中的普通 finally
块,也就是说,无论其对应的 try
块中的代码是否抛出异常,它都会保证执行某些代码。此外,给定 this question on MSDN,您在 Finally
中的代码甚至可能不会在所有情况下都执行,因为您的订阅没有指定错误处理程序。
你可能想要的是 Using
:
IDisposable disp = Observable
.Using(
() => Disposable.Create(() => client.Close),
_ => obs)
.Subscribe(....);
Using
负责在 observable 终止或订阅被取消时正确处理资源。
假设client
是一个TcpClient,那就更简单了:
IDisposable disp = Observable
.Using(
() => client),
_ => obs)
.Subscribe(....);
我希望对 OnNext
的调用不会与关闭客户端重叠,即使提前取消订阅也是如此,但我还没有对此进行测试。
最后一件事:谨防关闭示例中的 stm
等外部变量。始终与当地人合作更安全。我尝试的完整重写是这样的:
IDisposable disp = Observable.Using(
() => client,
_ => Observable.Using(
() => client.GetStream(),
stream => Observable.Create<Unit>(observer => obs
.Subscribe(
d => {
stream.Write(d.a, 0, d.a.Lenght);
stream.Write(d.b, 0, d.b.Lenght);
},
() => {
stream.Write(something(), 0, 1);
}))))
.Subscribe();
我认为您只是对 NetworkStream
的运作方式做出了错误的假设。
NetworkStream.Write
和TcpClient.Close
不一定要等客户端读取数据。 (此外,NetworkStream.Flush
什么都不做)。
当您调用 Close
时,您可能会在客户端读取所有内容之前关闭套接字。
看看这个相关问题:NetworkStream doesn't always send data unless I Thread.Sleep() before closing the NetworkStream. What am I doing wrong?
该页面不同地提到了使用接受超时的 Close
的重载,或指定 LingerOption
- 但最好是发送 Shutdown
或具有更高的消息传递抽象级别,其中客户端通过自己的回复确认您的消息。
我正在从 Rx 向网络写入数据。当订阅结束时,我自然会使用 Finally
来关闭我的流。这在 OnError()
和 OnComplete()
上都能正常工作。 Rx 会依次 运行 OnNext()
... OnNext()
, OnComplete()
, Finally()
.
然而,有时我想提前终止序列,为此我使用 Dispose()
。 Finally()
现在 运行 与上次 OnNext()
调用并行,导致在 OnNext()
中仍在写入流时出现异常,以及不完整的写入。
我的订阅大概是这样的:
NetworkStream stm = client.GetStream();
IDisposable disp = obs
.Finally(() => {
client.Close();
})
.Subscribe(d => {
client.GetStream().Write(d.a, 0, d.a.Lenght);
client.GetStream().Write(d.b, 0, d.b.Lenght);
} () => {
client.GetStream().Write(something(), 0, 1);
});
Thread.sleep(1000);
disp.Dispose();
我也试过另一种方法,CancellationToken
。
如何正确取消订阅?我不介意它是否跳过 OnComplete()
,只要 Finally()
仍然是 运行。但是,运行宁Finally()
并行是有问题的。
我也觉得应该有更好的方法来管理资源,将声明移到序列中,这将是一个更好的解决方案。
编辑:下面的代码更清楚地显示了问题。我希望它总是打印 true,相反,它经常给出 false,表明 Dispose 在最后一个 OnNext
之前结束。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Try finally");
for (int i = 0; i < 10; i++)
{
Finally();
}
Console.WriteLine("Try using");
for (int i = 0; i < 10; i++)
{
Using();
}
Console.WriteLine("Try using2");
for (int i = 0; i < 10; i++)
{
Using2();
}
Console.ReadKey();
}
private static void Using2()
{
bool b = true, c = true, d;
var dis = Disposable.Create(() => c = b);
IDisposable obDis = Observable.Using(
() => dis,
_ => Observable.Create<Unit>(obs=>
Observable.Generate(0,
i => i < 1000,
i => i + 1,
i => i,
i => TimeSpan.FromMilliseconds(1)
).Subscribe(__ => { b = false; Thread.Sleep(100); b = true; })))
.Subscribe();
Thread.Sleep(15);
obDis.Dispose();
d = b;
Thread.Sleep(101);
Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
}
private static void Using()
{
bool b = true, c = true, d;
var dis = Disposable.Create(() => c = b);
IDisposable obDis = Observable.Using(
() => dis,
_ => Observable.Generate(0,
i => i < 1000,
i => i + 1,
i => i,
i => TimeSpan.FromMilliseconds(1)
)).Subscribe(_ => { b = false; Thread.Sleep(100); b = true; });
Thread.Sleep(15);
obDis.Dispose();
d = b;
Thread.Sleep(101);
Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
}
private static void Finally()
{
bool b = true, c = true, d;
IDisposable obDis = Observable.Generate(0,
i => i < 1000,
i => i + 1,
i => i,
_ => DateTime.Now.AddMilliseconds(1)
)
.Finally(() => c = b)
.Subscribe(_ => { b = false; Thread.Sleep(100); b = true; });
Thread.Sleep(15);
obDis.Dispose();
d = b;
Thread.Sleep(101);
Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
}
}
}
Finally
很可能不是您想要的。当您取消订阅时,它不会处置您的资源。相反,它的行为类似于 C# 中的普通 finally
块,也就是说,无论其对应的 try
块中的代码是否抛出异常,它都会保证执行某些代码。此外,给定 this question on MSDN,您在 Finally
中的代码甚至可能不会在所有情况下都执行,因为您的订阅没有指定错误处理程序。
你可能想要的是 Using
:
IDisposable disp = Observable
.Using(
() => Disposable.Create(() => client.Close),
_ => obs)
.Subscribe(....);
Using
负责在 observable 终止或订阅被取消时正确处理资源。
假设client
是一个TcpClient,那就更简单了:
IDisposable disp = Observable
.Using(
() => client),
_ => obs)
.Subscribe(....);
我希望对 OnNext
的调用不会与关闭客户端重叠,即使提前取消订阅也是如此,但我还没有对此进行测试。
最后一件事:谨防关闭示例中的 stm
等外部变量。始终与当地人合作更安全。我尝试的完整重写是这样的:
IDisposable disp = Observable.Using(
() => client,
_ => Observable.Using(
() => client.GetStream(),
stream => Observable.Create<Unit>(observer => obs
.Subscribe(
d => {
stream.Write(d.a, 0, d.a.Lenght);
stream.Write(d.b, 0, d.b.Lenght);
},
() => {
stream.Write(something(), 0, 1);
}))))
.Subscribe();
我认为您只是对 NetworkStream
的运作方式做出了错误的假设。
NetworkStream.Write
和TcpClient.Close
不一定要等客户端读取数据。 (此外,NetworkStream.Flush
什么都不做)。
当您调用 Close
时,您可能会在客户端读取所有内容之前关闭套接字。
看看这个相关问题:NetworkStream doesn't always send data unless I Thread.Sleep() before closing the NetworkStream. What am I doing wrong?
该页面不同地提到了使用接受超时的 Close
的重载,或指定 LingerOption
- 但最好是发送 Shutdown
或具有更高的消息传递抽象级别,其中客户端通过自己的回复确认您的消息。