为什么我在使用 eventhub 时在 Azure 上收到此错误?

Why am I receiving this error on Azure when using eventhubs?

我最近开始使用 Azure,这是一次压倒性的体验。我开始尝试 eventhubs and I'm basically following the official tutorials 如何使用 nodejs 从事件中心发送和接收消息。

一切都很完美,所以我构建了一个小型网络应用程序(静态前端应用程序),并将其连接到节点后端,与事件中心进行通信。所以基本上我的应用程序是这样构建的:

frontend <----> node server <-----> eventhubs

如您所见,非常简单。节点服务器正在从 eventhub 获取数据并将其转发到显示值的前端。这是一次很棒的体验,在出现此错误之前我一直在享受 MS Azure:

 azure.eventhub.common.EventHubError: ErrorCodes.ResourceLimitExceeded: Exceeded the maximum number of allowed receivers per partition in a consumer group which is 5. List of connected receivers - nil, nil, nil, nil, nil.

这个错误真是令人费解。我使用默认的消费者组并且只有一个应用程序。我从未尝试从另一个应用程序访问这个消费者群体。它说限制是 5,我只使用一个应用程序所以它应该没问题还是我遗漏了什么?我不检查这里发生了什么。

我浪费了太多时间在谷歌上搜索和研究这个,但我没有明白。最后,我想也许每次我在 azure 上部署应用程序(我的前端和我的节点服务器)时,这都算作一个消费者,因为我部署了该应用程序超过 5 次,所以这个错误就会出现。我是对的还是胡说八道?

编辑

我使用 websockets 作为我的应用程序(前端)和我的节点服务器(后端)之间的通信协议。节点服务器使用的是默认消费者组(我什么都没改),我只是跟着this official example from Microsoft。我基本上使用的是 MS 文档中的代码,这就是为什么我没有 post 我的节点服务器中的任何代码片段,并且由于错误发生在后端而不是前端,所以如果我 posted 任何前端代码。

最后,我正在使用 websocket 连接前端和后端。它完美地工作了一两天,然后这个错误开始发生。有时我会打开多个客户端(例如浏览器客户端和智能手机客户端)。

我觉得我不明白这个消费群体的概念。就像每个客户都是消费者一样吗?所以如果我在浏览器的 5 个不同选项卡中打开我的应用程序(同一个应用程序),那么我有 5 个消费者吗?


我不太明白下面的答案以及“池化客户端”的含义,因此,我将尝试在此处 post 代码示例向您展示我正在尝试做的事情。

代码片段

这是我在服务器端使用的函数,用于与 eventhub 和 receive/consume 消息通信

async function receiveEventhubMessage(socket, eventHubName, connectionString) {

 
  const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);


  const subscription = consumerClient.subscribe({
      processEvents: async (events, context) => {

        for (const event of events) {
          console.log("[    consumer    ] Message received : " + event.body);

            io.emit('msg-received', event.body);
        }
      },

      processError: async (err, context) => {
        console.log(`Error : ${err}`);
      }
    }
  );

如果您注意到,我将 eventhub 和连接字符串作为参数提供,以便能够更改它。现在在前端,我有一个包含多个主题的列表,每个主题都有自己的 eventhubname 但它们具有相同的 eventhub 命名空间。

这是我拥有的两个 eventhubname 的示例:

{
"EventHubName": "eh-test-command"
"EventHubName": "eh-test-telemetry"
}

如果用户选择发送命令(从前端,我只有一个按钮列表,用户可以单击这些按钮以通过 websockets 触发事件)然后 CommandEventHubName 将从前端发送到节点服务器。服务器将收到该 eventhubname 并在我上面 post 编辑的函数中切换 consumerClient。

这是我调用它的代码:

// io is a socket.io object
io.on('connection', socket => {
   socket.on('onUserChoice', choice => {
   // choice is an object sent from the frontend based on what the user choosed. e.g if the user choosed command then choice = {"EventhubName": "eh-test-command", "payload": "whatever"}

   receiveEventhubMessage(socket, choice.EventHubName, choice.EventHubNameSpace)
              .catch(err => console.log(`[    consumerClient    ] Error while receiving eventhub messages: ${err}`));
      }
}

我正在构建的应用程序将来会扩展到汽车领域的真实用例,这就是为什么这对我很重要。因此,我想弄清楚如何在每次 eventhubname 更改时在不创建新的 consumerClient 的情况下在 eventhub 之间切换?

我必须说我不理解“池客户端”的示例。我正在寻求更多的阐述,或者理想情况下,一个最小的例子只是为了让我上路。

根据问题中的对话,问题的根本原因似乎是您的后端正在为来自前端的每个请求创建一个新的 EventHubConsumerClient。因为每个客户端都将打开到该服务的专用连接,所以如果您对使用同一使用者组的同一事件中心实例的请求超过 5 个,就会超出配额。

要解决此问题,您需要考虑合并 EventHubConsumerClient 实例,这样一开始每个事件中心实例一个。您可以安全地使用池客户端通过调用 subscribe 来处理前端请求。这将允许您在多个前端请求之间共享连接。

关键思想是您的 consumerClient 不是为每个请求创建的,而是在请求之间共享一个实例。使用您的代码片段来说明最简单的方法,您最终会将您的客户端创建提升到要接收的功能之外。它可能看起来像:

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);

async function receiveEventhubMessage(socket, eventHubName, connectionString) {
  const subscription = consumerClient.subscribe({
      processEvents: async (events, context) => {

        for (const event of events) {
          console.log("[    consumer    ] Message received : " + event.body);
          io.emit('msg-received', event.body);
        }
      },

      processError: async (err, context) => {
        console.log(`Error : ${err}`);
      }
    }
  );

也就是说,根据应用程序的体系结构,以上内容可能不适合您的环境。如果为每个请求动态创建托管 receiveEventHubMessage 的内容,则不会发生任何变化。在这种情况下,您需要考虑使用单例或依赖注入之类的方法来帮助延长生命周期。

如果您最终在扩展以满足您的请求时遇到问题,您可以考虑增加每个事件中心的客户端数量 and/or 将请求分散到不同的使用者组。