辅助角色和 Web 角色之间的 Azure 通信

Azure communication between Worker Role and Web Role

您好,我正在构建一个云服务,它(目前)具有一个网络和一个工作者角色。我想要的工作流程是:浏览器调用 Web 角色上的 webApi 控制器,该控制器将消息发送到队列(或服务总线),然后由辅助角色处理。到目前为止,一切都很好。现在,当工作者角色完成处理消息时,我想调用 Web 角色上的一个方法,然后谁将向浏览器发出处理已完成的信号(通过 SignalR)。如果这不适合提问,请原谅,因为这更像是一个“最佳实践”问题,而不是一个真正的问题。到目前为止,我考虑过两种方法:

  1. 工作者角色更新 table 行(在 table 存储中),其中包含任务的进度和完成情况。没有向 web 角色发送信号。浏览器直接读取轮询 table 存储(通过 REST api),因此知道任务何时完成。这很有效(我已经测试过了),尽管我不喜欢持续轮询的方法,而且我希望有一个“基于事件”的解决方案。此外,一旦客户端获得进程已完成的信息,它必须执行对 Web api 方法的额外调用,以向其他客户端(通过 SignalR)广播操作已完成。

  2. Interrole communication 与 SignalR 一起使用(参见下面的代码示例)也有效(也已经过测试)

代码示例:

var protocol = "http";      
var ipAddress = RoleEnvironment.Roles["XXX.YYY.Web"]
        .Instances[0]
        .InstanceEndpoints.ToArray()
        .Where(ep => ep.Value.Protocol == protocol)
        .First()
        .Value.IPEndpoint.ToString();

var stringEndpoint = string.Format("{0}://{1}", protocol, ipAddress.ToString());                
Trace.WriteLine("Retrieved endpoint address: " + stringEndpoint, "Information");            
HubConnection connection = new HubConnection(stringEndpoint);
IHubProxy proxy = connection.CreateHubProxy("MyWebHub");
connection.Start().Wait();

//later...

proxy.Invoke("ProgressUpdated", clientId, progress);

我的问题是:是否有其他(更好的)方法可以在 Worker role -> Web role 方向进行通信?也就是说,当工作者角色完成处理时,在网络角色上触发一个方法? Web 角色上的方法然后将通过 SignalR 向所有客户端广播更新。我还查看了 Event Hubs,但据我了解,事件消费者仍将 运行 作为工作者角色。

我认为您已经具备足够的知识和实施经验来实现它。使用 SignalR 是绝妙的方法,我从你那里学到了。

另一种略有不同的方法,我使用 Azure Scheduler 进行持续轮询,以便将 GET 消息发送到 WebRole。

IMO,作为 webapi 服务器的本质,轮询是遵循 web 服务器设计的最合适和可靠的方法。

好的,经过一些额外的尝试和一些研究,我想出了一个可能的解决方案......我正在使用这个代码

AzureServiceBus.QueueClient.OnMessage((message) =>
            {
                try
                {
                    // Process message from queue
                    Trace.WriteLine("Message received: " + ((BrokeredMessage)message).Label);

                    var breezeController = new BreezeController();
                    breezeController.TestSignal();

                    // Remove message from queue
                    message.Complete();
                }
                catch (Exception)
                {
                    // Indicates a problem, unlock message in queue
                    message.Abandon();
                }
            });

在 Web 角色(不是 Worker 角色)的 OnStart 方法中,以便我可以在 Web 角色(在本例中为 TestSignal())中引用我的方法,但事实证明,调用时 IHubContext 始终为 null在此 OnMessage 事件处理程序中,因为它(很可能)属于不同的 AppDomain,因此甚至不共享 signalR 的静态集线器。因此,我已将相同的整个代码移动到 Global.asax.cs 中,以便它共享相同的 AppDomain,现在它可以工作了。我想我会继续使用这种方法,因为我比连续轮询更喜欢它。

这个问题有点晚了 :) 但我们已经实现的是指定一个响应队列和会话 ID,Web 角色在将消息发送到辅助角色后等待它们。您可以调整它以避免在 Web 角色等待工作者角色回复时阻塞响应(在我们的场景中我们想要专门等待)

网络角色

string sessionId = Guid.NewGuid().ToString(); 
[...]
// put message in sync queue
var message = new BrokeredMessage(request)
{
   ReplyToSessionId = sessionId
};
await ServiceBusConnector.Instance.SyncClient.SendAsync(message);

// now listen to reply on session response queue (only accepts message on same session id)
MessageSession session = await ServiceBusConnector.Instance.SyncResponseClient.AcceptMessageSessionAsync(sessionId);

BrokeredMessage responseMessage = await session.ReceiveAsync(TimeSpan.FromMinutes(5));
await responseMessage.CompleteAsync();
await session.CloseAsync();

Response response = responseMessage.GetBody<Response>();
// process Worker Role's response

工人角色

    // if a ReplyToSessionId has been specified, it means that the sender is
    // actively waiting for a response
    if (!String.IsNullOrEmpty(receivedMessage.ReplyToSessionId))
    {
        // now respond on sync response queue
        var responseMessage = new BrokeredMessage(response)
        {
            SessionId = receivedMessage.ReplyToSessionId
        };

        // consider allowing client to specify a ReplyTo response queue (not needed for now)
        await ServiceBusConnector.Instance.SyncResponseClient.SendAsync(responseMessage);
    }

另请查看 Jessie's approach 此处以通过 HttpClient 直接与应用程序通信。

public class Functions
{
    public static async Task ProcessQueueMessage([QueueTrigger("jobqueue")] Guid jobId, TextWriter log)
    {
        for (int i = 10; i <= 100; i+=10)
        {
            Thread.Sleep(400);

            await CommunicateProgress(jobId, i);
        }
    }

    private static async Task CommunicateProgress(Guid jobId, int percentage)
    {
        var httpClient = new HttpClient();

        var queryString = String.Format("?jobId={0}&progress={1}", jobId, percentage);
        var request = ConfigurationManager.AppSettings["ProgressNotificationEndpoint"] + queryString;

        await httpClient.GetAsync(request);
    }
}