在 Reactive Extensions 中订阅结束时关闭非托管资源

Close unmanaged resources when Subscription end in Reactive Extensions

我正在从 Rx 向网络写入数据。当订阅结束时,我自然会使用 Finally 来关闭我的流。这在 OnError()OnComplete() 上都能正常工作。 Rx 会依次 运行 OnNext() ... OnNext(), OnComplete(), Finally().

然而,有时我想提前终止序列,为此我使用 Dispose()Finally() 现在 运行 与上次 OnNext() 调用并行,导致在 OnNext() 中仍在写入流时出现异常,以及不完整的写入。

我的订阅大概是这样的:

NetworkStream stm = client.GetStream();
IDisposable disp = obs
    .Finally(() => {
        client.Close();
    })
    .Subscribe(d => {
        client.GetStream().Write(d.a, 0, d.a.Lenght);
        client.GetStream().Write(d.b, 0, d.b.Lenght);
    } () => {
        client.GetStream().Write(something(), 0, 1);
    });
Thread.sleep(1000);
disp.Dispose();

我也试过另一种方法,CancellationToken

如何正确取消订阅?我不介意它是否跳过 OnComplete(),只要 Finally() 仍然是 运行。但是,运行宁Finally()并行是有问题的。

我也觉得应该有更好的方法来管理资源,将声明移到序列中,这将是一个更好的解决方案。

编辑:下面的代码更清楚地显示了问题。我希望它总是打印 true,相反,它经常给出 false,表明 Dispose 在最后一个 OnNext 之前结束。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Try finally");
            for (int i = 0; i < 10; i++)
            {
                Finally();
            }
            Console.WriteLine("Try using");
            for (int i = 0; i < 10; i++)
            {
                Using();
            }
            Console.WriteLine("Try using2");
            for (int i = 0; i < 10; i++)
            {
                Using2();
            }
            Console.ReadKey();
        }

        private static void Using2()
        {
            bool b = true, c = true, d;
            var dis = Disposable.Create(() => c = b);
            IDisposable obDis = Observable.Using(
                () => dis,
                _ => Observable.Create<Unit>(obs=>
                    Observable.Generate(0,
                    i => i < 1000,
                    i => i + 1,
                    i => i,
                    i => TimeSpan.FromMilliseconds(1)
                ).Subscribe(__ => { b = false; Thread.Sleep(100); b = true; })))
                .Subscribe();
            Thread.Sleep(15);
            obDis.Dispose();
            d = b;
            Thread.Sleep(101);
            Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
        }

        private static void Using()
        {
            bool b = true, c = true, d;
            var dis = Disposable.Create(() => c = b);
            IDisposable obDis = Observable.Using(
                () => dis,
                _ => Observable.Generate(0,
                    i => i < 1000,
                    i => i + 1,
                    i => i,
                    i => TimeSpan.FromMilliseconds(1)
                )).Subscribe(_ => { b = false; Thread.Sleep(100); b = true; });
            Thread.Sleep(15);
            obDis.Dispose();
            d = b;
            Thread.Sleep(101);
            Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
        }

        private static void Finally()
        {
            bool b = true, c = true, d;
            IDisposable obDis = Observable.Generate(0,
                i => i < 1000,
                i => i + 1,
                i => i,
                _ => DateTime.Now.AddMilliseconds(1)
                )
                .Finally(() => c = b)
                .Subscribe(_ => { b = false; Thread.Sleep(100); b = true; });
            Thread.Sleep(15);
            obDis.Dispose();
            d = b;
            Thread.Sleep(101);
            Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
        }
    }
}

Finally 很可能不是您想要的。当您取消订阅时,它不会处置您的资源。相反,它的行为类似于 C# 中的普通 finally 块,也就是说,无论其对应的 try 块中的代码是否抛出异常,它都会保证执行某些代码。此外,给定 this question on MSDN,您在 Finally 中的代码甚至可能不会在所有情况下都执行,因为您的订阅没有指定错误处理程序。

你可能想要的是 Using:

IDisposable disp = Observable
    .Using(
        () => Disposable.Create(() => client.Close),
        _ => obs)
    .Subscribe(....);

Using 负责在 observable 终止或订阅被取消时正确处理资源。

假设client是一个TcpClient,那就更简单了:

IDisposable disp = Observable
    .Using(
        () => client),
        _ => obs)
    .Subscribe(....);

我希望对 OnNext 的调用不会与关闭客户端重叠,即使提前取消订阅也是如此,但我还没有对此进行测试。

最后一件事:谨防关闭示例中的 stm 等外部变量。始终与当地人合作更安全。我尝试的完整重写是这样的:

IDisposable disp = Observable.Using(
    () => client,
    _ => Observable.Using(
         () => client.GetStream(),
         stream => Observable.Create<Unit>(observer => obs
             .Subscribe(
                 d => {
                     stream.Write(d.a, 0, d.a.Lenght);
                     stream.Write(d.b, 0, d.b.Lenght);
                 },
                 () => {
                     stream.Write(something(), 0, 1);
                 }))))
    .Subscribe();

我认为您只是对 NetworkStream 的运作方式做出了错误的假设。

NetworkStream.WriteTcpClient.Close不一定要等客户端读取数据。 (此外,NetworkStream.Flush 什么都不做)。

当您调用 Close 时,您可能会在客户端读取所有内容之前关闭套接字。

看看这个相关问题:NetworkStream doesn't always send data unless I Thread.Sleep() before closing the NetworkStream. What am I doing wrong?

该页面不同地提到了使用接受超时的 Close 的重载,或指定 LingerOption - 但最好是发送 Shutdown 或具有更高的消息传递抽象级别,其中客户端通过自己的回复确认您的消息。