检测到 "Busy" WebSocket

Detecting a "Busy" WebSocket

我最近 运行 在尝试使用 SendAsync 方法广播到打开的 WebSocket 时遇到了一个问题,收到了带有消息 "A send operation is already in progress"

的 InvalidOperationException

深入挖掘 WebSocket 的源代码 class,在内部跟踪忙碌状态:

internal ChannelState _sendState;

// Represents the state of a single channel; send and receive have their own states
internal enum ChannelState {
    Ready, // this channel is available for transmitting new frames
    Busy, // the channel is already busy transmitting frames
    Closed // this channel has been closed
}

理想情况下,如果我要向 WebSocket 连接广播更新,我想提前知道它正忙并执行一些其他处理(例如将消息排队)。

这个状态被标记为内部状态似乎很奇怪 - 如果它是 public 属性 我可以简单地检查

context.WebSocket.SendState == ChannelState.Ready

WebSocket 上 SendAsync 的正确模式是什么,可以防止抛出此异常?

我不愿意通过反射破解 属性 的访问权限。

编辑澄清:

WebSocket.State 属性 这种情况无济于事。 属性 使用此枚举:

public enum WebSocketState
{
    None,
    Connecting,
    Open,
    CloseSent,
    CloseReceived,
    Closed,
    Aborted
}

一旦套接字连接打开,这个语句的计算结果将是"true",不管它是否正忙于发送:

context.WebSocket.State == WebSocketState.Open

我已经实施了一个似乎可以满足我的需求的解决方案。

基本问题是当两个线程试图在同一个 WebSocket 上发送时。

我的解决方案有几个部分,并且取决于这是 运行AspNetWebSocketContect 下的事实,因此我可以使用“Items”字典来存储有关当前连接的属性。

  1. 一个“发送”属性用于跟踪WebSocket是否忙。
  2. “队列”属性 是要发送的 ArraySegment 列表。
  3. 同步锁定 WebSocket 对象和 运行 Send 方法。
  4. 发送完成后处理队列中的任何项目。
  5. 在方法开始时屈服以防止阻塞。

这是我目前在开发环境中使用的代码 -- 我会监控它的扩展性:

/// <summary>
/// Send a message to a specific client.
/// </summary>
/// <param name="context"></param>
/// <param name="buffer"></param>
/// <returns></returns>
private static async Task SendMessage(AspNetWebSocketContext context, ArraySegment<byte> buffer)
{
    // Return control to the calling method immediately.
    await Task.Yield();

    // Make sure we have data.
    if (buffer.Count == 0)
        return;

    // The state of the connection is contained in the context Items dictionary.
    bool sending;
    lock (context)
    {
        // Are we already in the middle of a send?
        bool.TryParse(context.Items["sending"]?.ToString(), out sending);

        // If not, we are now.
        if (!sending)
            context.Items["sending"] = true;
    }

    if (!sending)
    {
        // Lock with a timeout, just in case.
        if (!Monitor.TryEnter(context.WebSocket, 1000))
        {
            // If we couldn't obtain exclusive access to the socket in one second, something is wrong.
            await context.WebSocket.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Empty, CancellationToken.None);
            return;
        }

        try
        {
            // Send the message synchronously.
            var t = context.WebSocket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None);
            t.Wait();
        }
        finally
        {
            Monitor.Exit(context.WebSocket);
        }

        // Note that we've finished sending.
        lock (context)
        {
            context.Items["sending"] = false;
        }

        // Handle any queued messages.
        await HandleQueue(context);
    }
    else
    {
        // Add the message to the queue.
        lock (context)
        {
            var queue = context.Items["queue"] as List<ArraySegment<byte>>;
            if (queue == null)
                context.Items["queue"] = queue = new List<ArraySegment<byte>>();
            queue.Add(buffer);
        }
    }
}

/// <summary>
/// If there was a message in the queue for this particular web socket connection, send it.
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
private static async Task HandleQueue(AspNetWebSocketContext context)
{
    var buffer = new ArraySegment<byte>();
    lock (context)
    {
        // Check for an item in the queue.
        var queue = context.Items["queue"] as List<ArraySegment<byte>>;
        if (queue != null && queue.Count > 0)
        {
            // Pull it off the top.
            buffer = queue[0];
            queue.RemoveAt(0);
        }
    }

    // Send that message.
    if (buffer.Count > 0)
        await SendMessage(context, buffer);
}

我对这种方法的几点考虑:

  1. 我认为锁定一个简单的对象比锁定一个更复杂的对象更好,就像我在上面所做的那样。我不确定使用“上下文”和“context.WebSocket”对象进行锁定会有什么后果。
  2. 理论上我不需要锁定 WebSocket,因为我已经在测试“发送”属性。然而,测试导致 WebSocket 在几秒钟的重负载下变得无响应。一旦我实施了锁定模式,这就消失了。
  3. 我通过同一个 WebSocket 连接对 10 个并发线程(每个线程发送 1000 条消息)进行了测试。每条消息都有编号,这样我就可以在客户端记录它们,并且每条消息都通过了。所以队列系统似乎可以工作。