Why does Azure Eventhub python library throw KeyError: 'all-partitions' when it reaches the maximum size?

Why does Azure Eventhub python library throw KeyError: 'all-partitions' when it reaches the maximum size?

我们正在将一些使用 python libraries for Azure Event hub 的脚本升级到最新版本 (5.0)。我主要关注标题为 Publish events to an Event Hub 的文档中的示例。当我第一次阅读代码时,我认为它很有趣,因为它依赖于命中 ValueError 异常。似乎不是最好的设计。但无论如何,我同意了。我将把示例代码放在这里为读者限制tab-switching:

# THIS IS THE EXAMPLE CODE FROM MICROSOFT
event_data_batch = client.create_batch()
can_add = True
while can_add:
    try:
        event_data_batch.add(EventData('Message inside EventBatchData'))
    except ValueError:
        can_add = False  # EventDataBatch object reaches max_size.

with client:
    client.send_batch(event_data_batch)

所以,我们查询不同的 api,然后将该数据发送到 Eventhub,所以我已经有了一个 For 循环,遍历事件并一次发送 1 个。我们希望批处理能让它更快更高效。以下是我如何将示例集成到我们的 for 循环中:

# THIS IS OUR CUSTOM SCRIPT
self.output_client = EventHubProducerClient.from_connection_string(conn_str, eventhub_name=eventhub_name)


if "eventhub" in self.output_config.keys():
    if self.output_config['eventhub'] is True:
        if events:
            i = 0
            event_data_batch = self.output_client.create_batch()
            for event in events:
                try:
                    event_data_batch.add(EventData(json.dumps(event)))
                except ValueError:  # EventDataBatch object reaches max_size.
                    # Ship events
                    with self.output_client:
                        self.output_client.send_batch(event_data_batch)
                    # Set up the next batch
                    event_data_batch = self.output_client.create_batch()
                except Exception as e:
                    self.output_error = True
                    self.logger.error("Error shipping event to EventHub: {}".format(e))
                    i += 1

        if not self.output_error:
            if events:
                with self.output_client:
                    self.output_client.send_batch(event_data_batch)
            self.logger.info("Sent %d events" % (len(events)))
        else:
            self.logger.error("Error(s) sending %d / %d events" % (i, len(events)))

请注意我们如何在 if not self.output_error 块中发送事件,因为有时我们可能不会达到示例所具有的最大 ValueError 大小。无论如何,在测试这个时,如果我们没有达到限制,一切正常,但如果我们达到最大尺寸,我们会得到这个错误(我们还没有解决):

2020-03-02 12:59:43,697 - DEBUG - o365-dev - Period is 30
2020-03-02 12:59:43,699 - DEBUG - o365-dev - Output handling 1952 events.

Traceback (most recent call last):
  File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 230, in output
    event_data_batch.add(EventData(json.dumps(event)))
  File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_common.py", line 364, in add
    self.max_size_in_bytes
ValueError: EventDataBatch has reached its size limit: 1046528

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 216, in send_batch
    cast(EventHubProducer, self._producers[partition_id]).send(
KeyError: 'all-partitions'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "cloud-connector.py", line 175, in <module>
    main()
  File "cloud-connector.py", line 171, in main
    cloud.setup_connections()
  File "cloud-connector.py", line 135, in setup_connections
    self.connections[conn['name']] = self.modules[conn['module']].Module(conn['name'], self.config['output'], loglevel=self.logger.getEffectiveLevel())
  File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 89, in __init__
    self.run()
  File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 173, in run
    self.output(events)
  File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 234, in output
    self.output_client.send_batch(event_data_batch)
  File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 220, in send_batch
    self._start_producer(partition_id, send_timeout)
  File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 126, in _start_producer
    not self._producers[partition_id]
KeyError: 'all-partitions'

@jthack,"with self.output_client:" 在代码块完成后关闭 output_client。您使用了两次,所以第二次尝试使用封闭客户端时,客户端处于错误状态。 我建议你把代码放在一个 with 语句中。

# THIS IS OUR CUSTOM SCRIPT
self.output_client = EventHubProducerClient.from_connection_string(conn_str, eventhub_name=eventhub_name)

with self.output_client:
    if "eventhub" in self.output_config.keys():
        if self.output_config['eventhub'] is True:
            if events:
                i = 0
                event_data_batch = self.output_client.create_batch()
                for event in events:
                    try:
                        event_data_batch.add(EventData(json.dumps(event)))
                    except ValueError:  # EventDataBatch object reaches max_size.
                        # Ship events
                        self.output_client.send_batch(event_data_batch)
                        # Set up the next batch
                        event_data_batch = self.output_client.create_batch()
                    except Exception as e:
                        self.output_error = True
                        self.logger.error("Error shipping event to EventHub: {}".format(e))
                        i += 1

            if not self.output_error:
                if events:
                    self.output_client.send_batch(event_data_batch)
                self.logger.info("Sent %d events" % (len(events)))
            else:
                self.logger.error("Error(s) sending %d / %d events" % (i, len(events)))