使用 SignalR 从 Azure ServiceBus 消息队列向 Azure Web App 广播数据

Using SignalR to broadcast data to Azure Web App from Azure ServiceBus Message Queue

我有一个 Azure ServiceBus 消息队列,它从成熟的 Windows 应用程序接收消息。这部分效果很好(每分钟数十条消息)。我知道消息在那里。 ServiceBus 命名空间类似于 dtx-ns,队列名称类似于 dtxrm001.

我还有一个完全开发的 MVC 5 Azure Web 应用程序。我想连接一个新的视图页面,使用 SignalR 从 Azure 服务总线消息队列接收消息。

我只想通过转到网页查看 ServiceBus 队列消息(来自任何 运行 成熟的 Windows 应用程序)。

这基本上是传入的传感器数据,比如来自传感器的温度。

在购买了 5 本 SignalR 书籍并为此花费了 3 周之后,我想我需要一些指导。我是否将 Web 角色添加到我的 Azure MVC 应用程序?我要添加工作者角色吗?我是否使用依赖注入和 SignalRBackplaneMessageBus?

看似简单的任务,但我现在不知所措,至于方法论是什么?当您真正进入示例然后尝试连接来自 ServiceBus 消息队列的消息的实际广播时,似乎没有任何意义。

这是我尝试过的 MVC 网络应用启动代码:

    Dim cn1 As String = "Endpoint=sb://dtx-ns.servicebus.windows.net/;SharedAccessKeyName=myname;SharedAccessKey=mykey"
    Dim config As New ServiceBusScaleoutConfiguration(cn1, "dtx1")
    config.TopicCount = 3
    config.BackoffTime = New TimeSpan(1, 0, 1)
    config.IdleSubscriptionTimeout = New TimeSpan(1, 0, 0)
    config.MaximumMessageSize = 20000
    config.MaxQueueLength = 50

    GlobalHost.DependencyResolver.UseServiceBus(config)
    GlobalHost.Configuration.TransportConnectTimeout = TimeSpan.FromSeconds(10)

    app.UseCors(CorsOptions.AllowAll)
    app.MapSignalR()

上面显示的端点是服务总线消息队列的连接字符串的一部分。如果上面看起来正确,那么我应该如何对集线器进行编程以从该连接发送消息?

我需要网络角色吗?我是否需要在添加到当前 MVC 应用程序的项目中以某种方式实现 'Backplane Hub'?我被难住了。

您正在使用代码 scale out SignalR via ServiceBus(所谓的 ServiceBus 背板 - 示例中令人困惑的 Web 角色实际上是您的 MVC 应用程序,它可以 运行 在 App Service 中或作为WebRole。在一般情况下,您的 SignalR Hub 确实与您的网络应用程序共存)。在 Azure Web 应用程序上 运行ning 时,为了横向扩展 SignalR 本身,这是正确的。但不是你要求的。

你要求的是从 ServiceBus 读取消息并将它们广播到你的 SignalR 连接的客户端。这是完全不同的用例。您可以使用几个选项来实现您的目标:

在这两种情况下,SignalR 有一个 "native" .NET client 这一鲜为人知的事实将对您有所帮助(您可以从任何 .NET 应用程序连接到 SignalR,包括 运行ning 在您的 Windows Phone 或您的桌面)。

我在这里没有提供任何代码示例,但我想我可以消除您的疑虑。


我需要从我的 Azure 服务总线消息队列中读取我的 Azure MVC-5 Web 应用程序,然后通过 SignalR 将数据发送到所有客户端网页,这个问题已经解决。

代码方面,它非常简单和优雅。这是我的解决方案:

首先,我的 Visual Studio 解决方案只有一个项目——主 Web 应用程序项目。在项目的根目录中,我有我的 SignalR 集线器 class,名为 djwHub;

Imports Microsoft.AspNet.SignalR
Imports Microsoft.AspNet.SignalR.Hubs
Imports Microsoft.AspNet.SignalR.Messaging
Imports System.Threading.Tasks
Imports Microsoft.ServiceBus
Imports Microsoft.ServiceBus.Messaging

<HubName("djwHub")>
Public Class djwHub
    Inherits Hub

    Private connectString As String = "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=myKeyName;SharedAccessKey=myKey"
    Private queueName As String = "myQueueName"
    Private m_count As Integer

    Public Sub beginReadingMessageQue()
        Dim rf = MessagingFactory.CreateFromConnectionString(connectString)

        Dim taskTimer = Task.Factory.StartNew(Async Function()
                                                  Dim receiver = Await rf.CreateMessageReceiverAsync(queueName, ReceiveMode.PeekLock)

                                                  While True
                                                      Dim timeNow As String = DateTime.Now.ToString()
                                                      Clients.All.sendServerTime(timeNow)

                                                      Try
                                                          Dim message = Await receiver.ReceiveAsync(TimeSpan.FromSeconds(5))

                                                          If message IsNot Nothing Then
                                                              Dim messageBody = message.GetBody(Of [String])()

                                                              Clients.All.sendNewMessage(messageBody)

                                                              Await message.CompleteAsync
                                                          Else
                                                              'no more messages in the queue
                                                              Exit Try
                                                          End If
                                                      Catch e As MessagingException
                                                          If Not e.IsTransient Then
                                                              'Console.WriteLine(e.Message)
                                                              'Throw
                                                          End If
                                                      End Try

                                                      'Delaying by 1/2 second.
                                                      Await Task.Delay(500)
                                                  End While

                                              End Function, TaskCreationOptions.LongRunning)
    End Sub
End Class

现在,我的 MVC-5 网络应用程序在根目录中没有 Startup class。相反,我的启动发生在 IdentityConfig.vb class 中,它位于我的 App_Start 文件夹中。所以这就是我放置 app.MapSignalR() 的地方,如图所示;

Imports Microsoft.AspNet.Identity
Imports Microsoft.Owin
Imports Microsoft.Owin.Security.Cookies
Imports Owin
Imports myApp.Users.Infrastructure
Imports Microsoft.Owin.Security.Google
Imports Microsoft.AspNet.SignalR
Imports Microsoft.Owin.Cors
Imports Microsoft.AspNet.SignalR.ServiceBus

Namespace Users
    Public Class IdentityConfig
        Public Sub Configuration(app As IAppBuilder)
            app.CreatePerOwinContext(Of AppIdentityDbContext)(AddressOf AppIdentityDbContext.Create)
            app.CreatePerOwinContext(Of AppUserManager)(AddressOf AppUserManager.Create)
            app.CreatePerOwinContext(Of AppRoleManager)(AddressOf AppRoleManager.Create)

            app.UseCookieAuthentication(New CookieAuthenticationOptions() With { _
                .AuthenticationType = DefaultAuthenticationTypes.ApplicationCookie, _
                .LoginPath = New PathString("/Account/Login") _
            })

            app.UseExternalSignInCookie(DefaultAuthenticationTypes.ExternalCookie)
            app.MapSignalR()
        End Sub
    End Class
End Namespace

唯一剩下的部分是网页视图页面。请注意,我现在在网页上有 6 个 FusionChart 仪表。但是您应该能够找出与 djwHub 对话的 SignalR 函数调用;

@Code
    Layout = Nothing
End Code

<head>
    <title>Drillers Readout - Job #@ViewBag.JobNumber</title>
    @Scripts.Render("~/bundles/modernizr")
    @Scripts.Render("~/bundles/jquery")
    @Scripts.Render("~/bundles/bootstrap")
    @Scripts.Render("~/bundles/jqueryui")

    <script src="~/Scripts/jquery.signalR-2.2.1.js"></script>
    <script src="~/Scripts/jquery.signalR-2.2.1.min.js"></script>
    <script src="~/signalr/hubs"></script>
    <script type="text/javascript">
        (function () {
            var currentIndex = -1;
            var lastTime;
            var GxDisplay;
            var GyDisplay;
            var GzDisplay;
            var HxDisplay;
            var HyDisplay;
            var HzDisplay;

            var myHub = $.connection.djwHub;
            $.connection.hub.logging = true;

            myHub.client.sendNewMessage = function (message) {
                var myArray = JSON.parse(message);

                currentIndex += 1;
                //var dataId = myArray.dataCount;

                if (currentIndex == 0) {
                    lastTime = new Date(myArray.time1);
                } else {
                    var newTime = new Date(myArray.time1);
                    if (newTime >= lastTime) {
                        var dataId = myArray.dataCount;
                        var Gx = myArray.Gx;
                        var Gy = myArray.Gy;
                        var Gz = myArray.Gz;
                        var Hx = myArray.Hx;
                        var Hy = myArray.Hy;
                        var Hz = myArray.Hz;

                        lastTime = newTime;

                        GxDisplay.feedData("value=" + Gx);
                        GyDisplay.feedData("value=" + Gy);
                        GzDisplay.feedData("value=" + Gz);
                        HxDisplay.feedData("value=" + Hx);
                        HyDisplay.feedData("value=" + Hy);
                        HzDisplay.feedData("value=" + Hz);

                        $("#newMessage").text('#' + dataId + ": " + lastTime + " Gx=" + Gx.toFixed(2) + " Gy=" + Gy.toFixed(2) + " Gz=" + Gz.toFixed(2)
                            + " Hx=" + Hx.toFixed(2) + " Hy=" + Hy.toFixed(2) + " Hz=" + Hz.toFixed(2));
                    }
                }
            };

            $.connection.hub.start().done(function () {
                myHub.server.beginReadingMessageQue();
            });

            myHub.client.sendServerTime = function (serverTime) {
                $("#newTime").text(serverTime);
            };

            FusionCharts.ready(function () {
                GxDisplay = new FusionCharts({
                    type: 'angulargauge',
                    renderAt: 'GxChart',
                    width: '250',
                    height: '175',
                    dataFormat: 'json',
                    dataSource: {
                        "chart": {
                            "caption": "Gx",
                            "subcaption": "",
                            "lowerLimit": "-2000",
                            "upperLimit": "2000",
                            "lowerLimitDisplay": "",
                            "upperLimitDisplay": "",
                            "showValue": "1",
                            "valueBelowPivot": "1",
                            "theme": "fint"
                        },
                        "colorRange": {
                            "color": [{
                                "minValue": "-2000",
                                "maxValue": "2000",
                                "code": "#ADD8E6"
                            }]
                        },
                        "dials": {
                            "dial": [{
                                "id": "fcGx",
                                "value": "0"
                            }]
                        }
                    }
                });

                GyDisplay = new FusionCharts({
                    type: 'angulargauge',
                    renderAt: 'GyChart',
                    width: '250',
                    height: '175',
                    dataFormat: 'json',
                    dataSource: {
                        "chart": {
                            "caption": "Gy",
                            "subcaption": "",
                            "lowerLimit": "-2000",
                            "upperLimit": "2000",
                            "lowerLimitDisplay": "",
                            "upperLimitDisplay": "",
                            "showValue": "1",
                            "valueBelowPivot": "1",
                            "theme": "fint"
                        },
                        "colorRange": {
                            "color": [{
                                "minValue": "-2000",
                                "maxValue": "2000",
                                "code": "#ADD8E6"
                            }]
                        },
                        "dials": {
                            "dial": [{
                                "id": "fcGy",
                                "value": "0"
                            }]
                        }
                    }
                });

                GzDisplay = new FusionCharts({
                    type: 'angulargauge',
                    renderAt: 'GzChart',
                    width: '250',
                    height: '175',
                    dataFormat: 'json',
                    dataSource: {
                        "chart": {
                            "caption": "Gz",
                            "subcaption": "",
                            "lowerLimit": "-2000",
                            "upperLimit": "2000",
                            "lowerLimitDisplay": "",
                            "upperLimitDisplay": "",
                            "showValue": "1",
                            "valueBelowPivot": "1",
                            "theme": "fint"
                        },
                        "colorRange": {
                            "color": [{
                                "minValue": "-2000",
                                "maxValue": "2000",
                                "code": "#ADD8E6"
                            }]
                        },
                        "dials": {
                            "dial": [{
                                "id": "fcGz",
                                "value": "0"
                            }]
                        }
                    }
                });

                HxDisplay = new FusionCharts({
                    type: 'angulargauge',
                    renderAt: 'HxChart',
                    width: '250',
                    height: '175',
                    dataFormat: 'json',
                    dataSource: {
                        "chart": {
                            "caption": "Hx",
                            "subcaption": "",
                            "lowerLimit": "-100000",
                            "upperLimit": "100000",
                            "lowerLimitDisplay": "",
                            "upperLimitDisplay": "",
                            "showValue": "1",
                            "valueBelowPivot": "1",
                            "theme": "fint"
                        },
                        "colorRange": {
                            "color": [{
                                "minValue": "-100000",
                                "maxValue": "100000",
                                "code": "#ff1493"
                            }]
                        },
                        "dials": {
                            "dial": [{
                                "id": "fcHx",
                                "value": "0"
                            }]
                        }
                    }
                });

                HyDisplay = new FusionCharts({
                    type: 'angulargauge',
                    renderAt: 'HyChart',
                    width: '250',
                    height: '175',
                    dataFormat: 'json',
                    dataSource: {
                        "chart": {
                            "caption": "Hy",
                            "subcaption": "",
                            "lowerLimit": "-100000",
                            "upperLimit": "100000",
                            "lowerLimitDisplay": "",
                            "upperLimitDisplay": "",
                            "showValue": "1",
                            "valueBelowPivot": "1",
                            "theme": "fint"
                        },
                        "colorRange": {
                            "color": [{
                                "minValue": "-100000",
                                "maxValue": "100000",
                                "code": "#ff1493"
                            }]
                        },
                        "dials": {
                            "dial": [{
                                "id": "fcHy",
                                "value": "0"
                            }]
                        }
                    }
                });

                HzDisplay = new FusionCharts({
                    type: 'angulargauge',
                    renderAt: 'HzChart',
                    width: '250',
                    height: '175',
                    dataFormat: 'json',
                    dataSource: {
                        "chart": {
                            "caption": "Hz",
                            "subcaption": "",
                            "lowerLimit": "-100000",
                            "upperLimit": "100000",
                            "lowerLimitDisplay": "",
                            "upperLimitDisplay": "",
                            "showValue": "1",
                            "valueBelowPivot": "1",
                            "theme": "fint"
                        },
                        "colorRange": {
                            "color": [{
                                "minValue": "-100000",
                                "maxValue": "100000",
                                "code": "#ff1493"
                            }]
                        },
                        "dials": {
                            "dial": [{
                                "id": "fcHz",
                                "value": "0"
                            }]
                        }
                    }
                });

                GxDisplay.render();
                GyDisplay.render();
                GzDisplay.render();
                HxDisplay.render();
                HyDisplay.render();
                HzDisplay.render();
            });
        }());
    </script> 
</head>
<body>
    <div id="newTime"></div><br />
    <ul id="newMessage"></ul>
    <div id="gCharts">
        <div id="GxChart"></div>
        <div id="GyChart"></div>
        <div id="GzChart"></div>       
    </div>
    <div id="hCharts">
        <div id="HxChart"></div>
        <div id="HyChart"></div>
        <div id="HzChart"></div>
    </div>
</body>

在此特别感谢 astaykov 和 Ashley Medway 以及 Microsoft 使这一切成为可能!