用于扩展 azure 服务总线的 Websocket

Websocket to extend a azure service bus

我正在通过 .net 中的 rest 和 web 套接字编写一个高度可用的 api。我正在寻找以下策略:

  1. 跟踪每个打开的网络套接字到我的 api 应用程序
  2. 如何在 asp.net webapi 应用程序
  3. 中使用来自 azure 服务总线的队列消息或主题
  4. 我应该研究哪些模式或策略

此时,我的关键字搜索并没有为我提供很多关于如何实现这一点的见解,但如果有人有任何想法,我将不胜感激!

TL;DR: 是否有人在 webapi 项目中创建了队列服务来将消息推送到 websocket?

anyone created a queue service in a webapi project to push messages down a websocket?

根据您的要求,您可以创建一个后台任务来接收来自服务总线队列的消息,如下所示:

Global.asax.cs

public class WebApiApplication : System.Web.HttpApplication
{
    private static ManualResetEvent _maunalResetEvent = new ManualResetEvent(false);
    protected void Application_Start()
    {
        AreaRegistration.RegisterAllAreas();
        GlobalConfiguration.Configure(WebApiConfig.Register);
        FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
        RouteConfig.RegisterRoutes(RouteTable.Routes);
        BundleConfig.RegisterBundles(BundleTable.Bundles);

        ThreadPool.QueueUserWorkItem(_ =>
        {
            var connectionString = "";
            var queueName = "samplequeue";
            var client = QueueClient.CreateFromConnectionString(connectionString, queueName);
            client.OnMessage(message =>
            {
                var messageBody = message.GetBody<String>();
                System.Diagnostics.Trace.TraceInformation(String.Format("Message body: {0}", messageBody));

               //TODO: send message to the specific client (user)
            });
            _maunalResetEvent.WaitOne();
        });
    }

    protected void Application_End()
    {
        _maunalResetEvent.Set();
    }
}

注意:关于从队列接收消息的更多细节,可以参考官方tutorial.

为了向您的客户端推送消息,如果您基于 webapi-and-websockets,我假设您需要选择 WebSocketCollection 实例,然后选择特定的 WebSocketHandler 实例_username 从收到的队列消息中匹配 属性 或将消息广播给所有客户端,如下所示:

当收到队列消息时,使用以下代码向所有客户端广播:

ChatWebSocketHandler.ChatClients.Broadcast("{queue-message}");

此外,您可以利用 SignalR and map your client (user) to signalR connections, then when you receiving the queue message, you could send it to your client (user). For more details, you could refer to Mapping SignalR Users to Connections. Also, you could follow SignalR Hubs API Guide and this git sample

我的要求有点绕。我能够通过主题和订阅模型解决问题。

我发布到队列中的主题,并通过根据 websocket 请求生成的过滤订阅广播到所有 websocket 连接。很漂亮!

 private async Task ProcessWebsocketSession(AspNetWebSocketContext context)
    {
        var details = ((APIIdentity)User.Identity).Details;
        //SubscriptionHandler creates the subscription for the given system, and disposes of it safely when leaving scope
        using (var ws = context.WebSocket)
        using (var subHander = new SubscriptionHandler(details))
        {
            var client = subHander.Client;
            client.OnMessage((brokerMsg) => 
            {
                var payload = brokerMsg.GetBody<string>();
                var dataToSend = JsonConvert.DeserializeObject<Data>(brokerMsg.GetBody<string>());
                //refreshing from db, to check if data has been updated in process
                dataToSend = Repo.GetData(BusinessID, dataToSend .ID).FirstOrDefault();
                if (dataToSend != null && !dataToSend Send.IsConfirmed.Value)
                {
                    byte[] binaryData = Encoding.ASCII.GetBytes(payload);
                    var segment = new ArraySegment<byte>(binaryData);
                    var task = ws.SendAsync(segment, WebSocketMessageType.Text,
                        true, CancellationToken.None);
                    task.Wait();
                }
                brokerMsg.Complete();
            });
            var receivedBytes = new ArraySegment<byte>(new byte[1024]);
            while (ws.State == WebSocketState.Open)
            {
                // ping/pong response to verify socket is connected without changing data
                var msg = await ws.ReceiveAsync(receivedBytes, CancellationToken.None);
                if (msg.MessageType == WebSocketMessageType.Text)
                {
                    var text = Encoding.ASCII.GetString(receivedBytes.ToArray());
                    if (text.ToLower() == "ping")
                    {
                        await ws.SendAsync(new ArraySegment<byte>(Encoding.ASCII.GetBytes(text.ToCharArray())), WebSocketMessageType.Text, true, CancellationToken.None);
                    }
                }

            } 

        }