Why does Azure Eventhub python library throw KeyError: 'all-partitions' when it reaches the maximum size?
Why does Azure Eventhub python library throw KeyError: 'all-partitions' when it reaches the maximum size?
我们正在将一些使用 python libraries for Azure Event hub 的脚本升级到最新版本 (5.0)。我主要关注标题为 Publish events to an Event Hub
的文档中的示例。当我第一次阅读代码时,我认为它很有趣,因为它依赖于命中 ValueError 异常。似乎不是最好的设计。但无论如何,我同意了。我将把示例代码放在这里为读者限制tab-switching:
# THIS IS THE EXAMPLE CODE FROM MICROSOFT
event_data_batch = client.create_batch()
can_add = True
while can_add:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.
with client:
client.send_batch(event_data_batch)
所以,我们查询不同的 api,然后将该数据发送到 Eventhub,所以我已经有了一个 For 循环,遍历事件并一次发送 1 个。我们希望批处理能让它更快更高效。以下是我如何将示例集成到我们的 for
循环中:
# THIS IS OUR CUSTOM SCRIPT
self.output_client = EventHubProducerClient.from_connection_string(conn_str, eventhub_name=eventhub_name)
if "eventhub" in self.output_config.keys():
if self.output_config['eventhub'] is True:
if events:
i = 0
event_data_batch = self.output_client.create_batch()
for event in events:
try:
event_data_batch.add(EventData(json.dumps(event)))
except ValueError: # EventDataBatch object reaches max_size.
# Ship events
with self.output_client:
self.output_client.send_batch(event_data_batch)
# Set up the next batch
event_data_batch = self.output_client.create_batch()
except Exception as e:
self.output_error = True
self.logger.error("Error shipping event to EventHub: {}".format(e))
i += 1
if not self.output_error:
if events:
with self.output_client:
self.output_client.send_batch(event_data_batch)
self.logger.info("Sent %d events" % (len(events)))
else:
self.logger.error("Error(s) sending %d / %d events" % (i, len(events)))
请注意我们如何在 if not self.output_error
块中发送事件,因为有时我们可能不会达到示例所具有的最大 ValueError 大小。无论如何,在测试这个时,如果我们没有达到限制,一切正常,但如果我们达到最大尺寸,我们会得到这个错误(我们还没有解决):
2020-03-02 12:59:43,697 - DEBUG - o365-dev - Period is 30
2020-03-02 12:59:43,699 - DEBUG - o365-dev - Output handling 1952 events.
Traceback (most recent call last):
File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 230, in output
event_data_batch.add(EventData(json.dumps(event)))
File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_common.py", line 364, in add
self.max_size_in_bytes
ValueError: EventDataBatch has reached its size limit: 1046528
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 216, in send_batch
cast(EventHubProducer, self._producers[partition_id]).send(
KeyError: 'all-partitions'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "cloud-connector.py", line 175, in <module>
main()
File "cloud-connector.py", line 171, in main
cloud.setup_connections()
File "cloud-connector.py", line 135, in setup_connections
self.connections[conn['name']] = self.modules[conn['module']].Module(conn['name'], self.config['output'], loglevel=self.logger.getEffectiveLevel())
File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 89, in __init__
self.run()
File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 173, in run
self.output(events)
File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 234, in output
self.output_client.send_batch(event_data_batch)
File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 220, in send_batch
self._start_producer(partition_id, send_timeout)
File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 126, in _start_producer
not self._producers[partition_id]
KeyError: 'all-partitions'
@jthack,"with self.output_client:" 在代码块完成后关闭 output_client。您使用了两次,所以第二次尝试使用封闭客户端时,客户端处于错误状态。
我建议你把代码放在一个 with 语句中。
# THIS IS OUR CUSTOM SCRIPT
self.output_client = EventHubProducerClient.from_connection_string(conn_str, eventhub_name=eventhub_name)
with self.output_client:
if "eventhub" in self.output_config.keys():
if self.output_config['eventhub'] is True:
if events:
i = 0
event_data_batch = self.output_client.create_batch()
for event in events:
try:
event_data_batch.add(EventData(json.dumps(event)))
except ValueError: # EventDataBatch object reaches max_size.
# Ship events
self.output_client.send_batch(event_data_batch)
# Set up the next batch
event_data_batch = self.output_client.create_batch()
except Exception as e:
self.output_error = True
self.logger.error("Error shipping event to EventHub: {}".format(e))
i += 1
if not self.output_error:
if events:
self.output_client.send_batch(event_data_batch)
self.logger.info("Sent %d events" % (len(events)))
else:
self.logger.error("Error(s) sending %d / %d events" % (i, len(events)))
我们正在将一些使用 python libraries for Azure Event hub 的脚本升级到最新版本 (5.0)。我主要关注标题为 Publish events to an Event Hub
的文档中的示例。当我第一次阅读代码时,我认为它很有趣,因为它依赖于命中 ValueError 异常。似乎不是最好的设计。但无论如何,我同意了。我将把示例代码放在这里为读者限制tab-switching:
# THIS IS THE EXAMPLE CODE FROM MICROSOFT
event_data_batch = client.create_batch()
can_add = True
while can_add:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.
with client:
client.send_batch(event_data_batch)
所以,我们查询不同的 api,然后将该数据发送到 Eventhub,所以我已经有了一个 For 循环,遍历事件并一次发送 1 个。我们希望批处理能让它更快更高效。以下是我如何将示例集成到我们的 for
循环中:
# THIS IS OUR CUSTOM SCRIPT
self.output_client = EventHubProducerClient.from_connection_string(conn_str, eventhub_name=eventhub_name)
if "eventhub" in self.output_config.keys():
if self.output_config['eventhub'] is True:
if events:
i = 0
event_data_batch = self.output_client.create_batch()
for event in events:
try:
event_data_batch.add(EventData(json.dumps(event)))
except ValueError: # EventDataBatch object reaches max_size.
# Ship events
with self.output_client:
self.output_client.send_batch(event_data_batch)
# Set up the next batch
event_data_batch = self.output_client.create_batch()
except Exception as e:
self.output_error = True
self.logger.error("Error shipping event to EventHub: {}".format(e))
i += 1
if not self.output_error:
if events:
with self.output_client:
self.output_client.send_batch(event_data_batch)
self.logger.info("Sent %d events" % (len(events)))
else:
self.logger.error("Error(s) sending %d / %d events" % (i, len(events)))
请注意我们如何在 if not self.output_error
块中发送事件,因为有时我们可能不会达到示例所具有的最大 ValueError 大小。无论如何,在测试这个时,如果我们没有达到限制,一切正常,但如果我们达到最大尺寸,我们会得到这个错误(我们还没有解决):
2020-03-02 12:59:43,697 - DEBUG - o365-dev - Period is 30
2020-03-02 12:59:43,699 - DEBUG - o365-dev - Output handling 1952 events.
Traceback (most recent call last):
File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 230, in output
event_data_batch.add(EventData(json.dumps(event)))
File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_common.py", line 364, in add
self.max_size_in_bytes
ValueError: EventDataBatch has reached its size limit: 1046528
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 216, in send_batch
cast(EventHubProducer, self._producers[partition_id]).send(
KeyError: 'all-partitions'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "cloud-connector.py", line 175, in <module>
main()
File "cloud-connector.py", line 171, in main
cloud.setup_connections()
File "cloud-connector.py", line 135, in setup_connections
self.connections[conn['name']] = self.modules[conn['module']].Module(conn['name'], self.config['output'], loglevel=self.logger.getEffectiveLevel())
File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 89, in __init__
self.run()
File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 173, in run
self.output(events)
File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 234, in output
self.output_client.send_batch(event_data_batch)
File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 220, in send_batch
self._start_producer(partition_id, send_timeout)
File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 126, in _start_producer
not self._producers[partition_id]
KeyError: 'all-partitions'
@jthack,"with self.output_client:" 在代码块完成后关闭 output_client。您使用了两次,所以第二次尝试使用封闭客户端时,客户端处于错误状态。 我建议你把代码放在一个 with 语句中。
# THIS IS OUR CUSTOM SCRIPT
self.output_client = EventHubProducerClient.from_connection_string(conn_str, eventhub_name=eventhub_name)
with self.output_client:
if "eventhub" in self.output_config.keys():
if self.output_config['eventhub'] is True:
if events:
i = 0
event_data_batch = self.output_client.create_batch()
for event in events:
try:
event_data_batch.add(EventData(json.dumps(event)))
except ValueError: # EventDataBatch object reaches max_size.
# Ship events
self.output_client.send_batch(event_data_batch)
# Set up the next batch
event_data_batch = self.output_client.create_batch()
except Exception as e:
self.output_error = True
self.logger.error("Error shipping event to EventHub: {}".format(e))
i += 1
if not self.output_error:
if events:
self.output_client.send_batch(event_data_batch)
self.logger.info("Sent %d events" % (len(events)))
else:
self.logger.error("Error(s) sending %d / %d events" % (i, len(events)))