检测到 "Busy" WebSocket
Detecting a "Busy" WebSocket
我最近 运行 在尝试使用 SendAsync 方法广播到打开的 WebSocket 时遇到了一个问题,收到了带有消息 "A send operation is already in progress"
的 InvalidOperationException
深入挖掘 WebSocket 的源代码 class,在内部跟踪忙碌状态:
internal ChannelState _sendState;
// Represents the state of a single channel; send and receive have their own states
internal enum ChannelState {
Ready, // this channel is available for transmitting new frames
Busy, // the channel is already busy transmitting frames
Closed // this channel has been closed
}
理想情况下,如果我要向 WebSocket 连接广播更新,我想提前知道它正忙并执行一些其他处理(例如将消息排队)。
这个状态被标记为内部状态似乎很奇怪 - 如果它是 public 属性 我可以简单地检查
context.WebSocket.SendState == ChannelState.Ready
WebSocket 上 SendAsync 的正确模式是什么,可以防止抛出此异常?
我不愿意通过反射破解 属性 的访问权限。
编辑澄清:
WebSocket.State 属性 这种情况无济于事。 属性 使用此枚举:
public enum WebSocketState
{
None,
Connecting,
Open,
CloseSent,
CloseReceived,
Closed,
Aborted
}
一旦套接字连接打开,这个语句的计算结果将是"true",不管它是否正忙于发送:
context.WebSocket.State == WebSocketState.Open
我已经实施了一个似乎可以满足我的需求的解决方案。
基本问题是当两个线程试图在同一个 WebSocket 上发送时。
我的解决方案有几个部分,并且取决于这是 运行AspNetWebSocketContect 下的事实,因此我可以使用“Items”字典来存储有关当前连接的属性。
- 一个“发送”属性用于跟踪WebSocket是否忙。
- “队列”属性 是要发送的 ArraySegment 列表。
- 同步锁定 WebSocket 对象和 运行 Send 方法。
- 发送完成后处理队列中的任何项目。
- 在方法开始时屈服以防止阻塞。
这是我目前在开发环境中使用的代码 -- 我会监控它的扩展性:
/// <summary>
/// Send a message to a specific client.
/// </summary>
/// <param name="context"></param>
/// <param name="buffer"></param>
/// <returns></returns>
private static async Task SendMessage(AspNetWebSocketContext context, ArraySegment<byte> buffer)
{
// Return control to the calling method immediately.
await Task.Yield();
// Make sure we have data.
if (buffer.Count == 0)
return;
// The state of the connection is contained in the context Items dictionary.
bool sending;
lock (context)
{
// Are we already in the middle of a send?
bool.TryParse(context.Items["sending"]?.ToString(), out sending);
// If not, we are now.
if (!sending)
context.Items["sending"] = true;
}
if (!sending)
{
// Lock with a timeout, just in case.
if (!Monitor.TryEnter(context.WebSocket, 1000))
{
// If we couldn't obtain exclusive access to the socket in one second, something is wrong.
await context.WebSocket.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Empty, CancellationToken.None);
return;
}
try
{
// Send the message synchronously.
var t = context.WebSocket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None);
t.Wait();
}
finally
{
Monitor.Exit(context.WebSocket);
}
// Note that we've finished sending.
lock (context)
{
context.Items["sending"] = false;
}
// Handle any queued messages.
await HandleQueue(context);
}
else
{
// Add the message to the queue.
lock (context)
{
var queue = context.Items["queue"] as List<ArraySegment<byte>>;
if (queue == null)
context.Items["queue"] = queue = new List<ArraySegment<byte>>();
queue.Add(buffer);
}
}
}
/// <summary>
/// If there was a message in the queue for this particular web socket connection, send it.
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
private static async Task HandleQueue(AspNetWebSocketContext context)
{
var buffer = new ArraySegment<byte>();
lock (context)
{
// Check for an item in the queue.
var queue = context.Items["queue"] as List<ArraySegment<byte>>;
if (queue != null && queue.Count > 0)
{
// Pull it off the top.
buffer = queue[0];
queue.RemoveAt(0);
}
}
// Send that message.
if (buffer.Count > 0)
await SendMessage(context, buffer);
}
我对这种方法的几点考虑:
- 我认为锁定一个简单的对象比锁定一个更复杂的对象更好,就像我在上面所做的那样。我不确定使用“上下文”和“context.WebSocket”对象进行锁定会有什么后果。
- 理论上我不需要锁定 WebSocket,因为我已经在测试“发送”属性。然而,测试导致 WebSocket 在几秒钟的重负载下变得无响应。一旦我实施了锁定模式,这就消失了。
- 我通过同一个 WebSocket 连接对 10 个并发线程(每个线程发送 1000 条消息)进行了测试。每条消息都有编号,这样我就可以在客户端记录它们,并且每条消息都通过了。所以队列系统似乎可以工作。
我最近 运行 在尝试使用 SendAsync 方法广播到打开的 WebSocket 时遇到了一个问题,收到了带有消息 "A send operation is already in progress"
的 InvalidOperationException深入挖掘 WebSocket 的源代码 class,在内部跟踪忙碌状态:
internal ChannelState _sendState;
// Represents the state of a single channel; send and receive have their own states
internal enum ChannelState {
Ready, // this channel is available for transmitting new frames
Busy, // the channel is already busy transmitting frames
Closed // this channel has been closed
}
理想情况下,如果我要向 WebSocket 连接广播更新,我想提前知道它正忙并执行一些其他处理(例如将消息排队)。
这个状态被标记为内部状态似乎很奇怪 - 如果它是 public 属性 我可以简单地检查
context.WebSocket.SendState == ChannelState.Ready
WebSocket 上 SendAsync 的正确模式是什么,可以防止抛出此异常?
我不愿意通过反射破解 属性 的访问权限。
编辑澄清:
WebSocket.State 属性 这种情况无济于事。 属性 使用此枚举:
public enum WebSocketState
{
None,
Connecting,
Open,
CloseSent,
CloseReceived,
Closed,
Aborted
}
一旦套接字连接打开,这个语句的计算结果将是"true",不管它是否正忙于发送:
context.WebSocket.State == WebSocketState.Open
我已经实施了一个似乎可以满足我的需求的解决方案。
基本问题是当两个线程试图在同一个 WebSocket 上发送时。
我的解决方案有几个部分,并且取决于这是 运行AspNetWebSocketContect 下的事实,因此我可以使用“Items”字典来存储有关当前连接的属性。
- 一个“发送”属性用于跟踪WebSocket是否忙。
- “队列”属性 是要发送的 ArraySegment 列表。
- 同步锁定 WebSocket 对象和 运行 Send 方法。
- 发送完成后处理队列中的任何项目。
- 在方法开始时屈服以防止阻塞。
这是我目前在开发环境中使用的代码 -- 我会监控它的扩展性:
/// <summary>
/// Send a message to a specific client.
/// </summary>
/// <param name="context"></param>
/// <param name="buffer"></param>
/// <returns></returns>
private static async Task SendMessage(AspNetWebSocketContext context, ArraySegment<byte> buffer)
{
// Return control to the calling method immediately.
await Task.Yield();
// Make sure we have data.
if (buffer.Count == 0)
return;
// The state of the connection is contained in the context Items dictionary.
bool sending;
lock (context)
{
// Are we already in the middle of a send?
bool.TryParse(context.Items["sending"]?.ToString(), out sending);
// If not, we are now.
if (!sending)
context.Items["sending"] = true;
}
if (!sending)
{
// Lock with a timeout, just in case.
if (!Monitor.TryEnter(context.WebSocket, 1000))
{
// If we couldn't obtain exclusive access to the socket in one second, something is wrong.
await context.WebSocket.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Empty, CancellationToken.None);
return;
}
try
{
// Send the message synchronously.
var t = context.WebSocket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None);
t.Wait();
}
finally
{
Monitor.Exit(context.WebSocket);
}
// Note that we've finished sending.
lock (context)
{
context.Items["sending"] = false;
}
// Handle any queued messages.
await HandleQueue(context);
}
else
{
// Add the message to the queue.
lock (context)
{
var queue = context.Items["queue"] as List<ArraySegment<byte>>;
if (queue == null)
context.Items["queue"] = queue = new List<ArraySegment<byte>>();
queue.Add(buffer);
}
}
}
/// <summary>
/// If there was a message in the queue for this particular web socket connection, send it.
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
private static async Task HandleQueue(AspNetWebSocketContext context)
{
var buffer = new ArraySegment<byte>();
lock (context)
{
// Check for an item in the queue.
var queue = context.Items["queue"] as List<ArraySegment<byte>>;
if (queue != null && queue.Count > 0)
{
// Pull it off the top.
buffer = queue[0];
queue.RemoveAt(0);
}
}
// Send that message.
if (buffer.Count > 0)
await SendMessage(context, buffer);
}
我对这种方法的几点考虑:
- 我认为锁定一个简单的对象比锁定一个更复杂的对象更好,就像我在上面所做的那样。我不确定使用“上下文”和“context.WebSocket”对象进行锁定会有什么后果。
- 理论上我不需要锁定 WebSocket,因为我已经在测试“发送”属性。然而,测试导致 WebSocket 在几秒钟的重负载下变得无响应。一旦我实施了锁定模式,这就消失了。
- 我通过同一个 WebSocket 连接对 10 个并发线程(每个线程发送 1000 条消息)进行了测试。每条消息都有编号,这样我就可以在客户端记录它们,并且每条消息都通过了。所以队列系统似乎可以工作。