Azure Eventhub Python SDK:来自所有分区的所有内容的批处理 window
Azure Eventhub Python SDK : batch of everything in time window from all partitions
我目前正在尝试从使用 32 个分区创建的 Azure EventHub 执行批量读取。更准确地说,我正在尝试读取包含过去 60 分钟内收到的所有事件的批处理。
问题是,在使用 consumer.receive_batch()
方法时,它会触发对同一分区内批处理的 on_event()
回调方法的调用。例如,分区 6 中的 6 个事件触发一个调用,而分区 9 中的 7 个事件触发另一个调用。 我希望每个分区的所有事件都触发一次 on_event()
回调方法 调用。我目前正在使用类似的东西将 EventHubConsumerClient
设置为收听模式:
self.consumer = EventHubConsumerClient.from_connection_string(
conn_str=conn_str,
consumer_group=consumer_group,
eventhub_name=eventhub_name
)
with self.consumer as consumer:
consumer.receive_batch(
on_event_batch=on_event_callback_method,
starting_position=timestamp_60_minutes_ago,
starting_position_inclusive=True,
max_batch_size=999999999999,
max_wait_time=60 # to let time to the receiver to gather all the messages
)
我目前正在考虑 N 线程(此处 N 为 32)从 1-32 读取分区,然后在一个统一的事件列表中减少,但不确定如何继续,或者即使我不深入研究兔子洞。很高兴听到您对此的看法!我们当前的实现依赖于 Databricks 对 EventHub 的支持(这似乎能够做到这一点),但我们想与其保持一定距离并使用官方 SDK。
您的建议并没有落空,但如果您愿意进行您描述的略有不同的那种聚合,则可能会为您自己做比需要更多的工作。
与其产生 32 个消费者,您可能需要为它们管理故障模式、生命周期、线程管理和所有这些开销,为什么不按照您描述的方式将其简化为一个列表,而是像现在一样通过单个事件中心消费者? import queue
(here) 中的 queue.Queue()
应该为您提供所需的所有同步逻辑 out-of-box 让您有一个工作人员处理从该漏斗中读取的聚合。
我会预先声明,这与仅在 per-partition 回调中执行您的逻辑并没有提供巨大的语义差异,因为它们将以大致相同的顺序馈送到您的聚合队列中,但是,如果您的目标是处理更大的连续 windows,或者看到 cross-partition 在一次逻辑读取中传播,则以上内容可以让您实现这一目标。 (作为记录,您的提议也是如此,但它需要更多的线程+客户端管理,而不是只有一个生产者和一个消费者)。
如果这不能解决您的问题或需要更清楚的说明,请随时告诉我;完全公开我是 event-hubs python SDK 的维护者之一,您也可以随时通过向我们抛出问题来随时与我们联系 our github。
为了后代(感谢@Kibrantn 的帮助),我最终使用了 Threading/Queue 模式,如下所示。这将为每个可用分区启动一个线程,并发接收,在 thread-safe 队列中聚合,在接收方 close()
之前执行 N 秒,最后,将所有内容聚合在一个列表中:
from queue import Queue
import threading
from azure.eventhub import EventHubConsumerClient
class ReceiverClass:
def _callback_process_data(self, partition_context, events):
"""
Generic method used as a processing callback for all the events batches
captured. This follows the Transform => ML => Post downstream workflow.
"""
# Aggregate data into the aggregation Queue
self.events_aggregation_queue.put(events)
def receive_data(self):
# Initialize the consumer
self.consumer = EventHubConsumerClient.from_connection_string(
conn_str=self.config.connection_string,
consumer_group=self.config.consumer_group,
eventhub_name=self.config.eventhub_name
)
# Initialize the aggregation queue to gather all the EventData together (the callback take care of that)
self.events_aggregation_queue = Queue()
# Create a reception thread for each partition
for partition_id in self.consumer.get_partition_ids():
worker = threading.Thread(target=self.consumer.receive_batch,
kwargs={"on_event_batch": self._callback_process_data,
"starting_position": self.config.data_window.receiving_from_time,
"starting_position_inclusive": True,
"partition_id": partition_id})
worker.start()
# Aggregating for N seconds before stopping
time.sleep(self.config.aggregation_wait_time_in_seconds)
# Aggregate the data from the async Queue
events_data_nested = [self.events_aggregation_queue.get() for _ in range(self.events_aggregation_queue.qsize())]
events_data = [event_data for sublist in events_data_nested for event_data in sublist]
logger.info(f"Received a list of {len(events_data)} EventData...")
return events_data
我目前正在尝试从使用 32 个分区创建的 Azure EventHub 执行批量读取。更准确地说,我正在尝试读取包含过去 60 分钟内收到的所有事件的批处理。
问题是,在使用 consumer.receive_batch()
方法时,它会触发对同一分区内批处理的 on_event()
回调方法的调用。例如,分区 6 中的 6 个事件触发一个调用,而分区 9 中的 7 个事件触发另一个调用。 我希望每个分区的所有事件都触发一次 on_event()
回调方法 调用。我目前正在使用类似的东西将 EventHubConsumerClient
设置为收听模式:
self.consumer = EventHubConsumerClient.from_connection_string(
conn_str=conn_str,
consumer_group=consumer_group,
eventhub_name=eventhub_name
)
with self.consumer as consumer:
consumer.receive_batch(
on_event_batch=on_event_callback_method,
starting_position=timestamp_60_minutes_ago,
starting_position_inclusive=True,
max_batch_size=999999999999,
max_wait_time=60 # to let time to the receiver to gather all the messages
)
我目前正在考虑 N 线程(此处 N 为 32)从 1-32 读取分区,然后在一个统一的事件列表中减少,但不确定如何继续,或者即使我不深入研究兔子洞。很高兴听到您对此的看法!我们当前的实现依赖于 Databricks 对 EventHub 的支持(这似乎能够做到这一点),但我们想与其保持一定距离并使用官方 SDK。
您的建议并没有落空,但如果您愿意进行您描述的略有不同的那种聚合,则可能会为您自己做比需要更多的工作。
与其产生 32 个消费者,您可能需要为它们管理故障模式、生命周期、线程管理和所有这些开销,为什么不按照您描述的方式将其简化为一个列表,而是像现在一样通过单个事件中心消费者? import queue
(here) 中的 queue.Queue()
应该为您提供所需的所有同步逻辑 out-of-box 让您有一个工作人员处理从该漏斗中读取的聚合。
我会预先声明,这与仅在 per-partition 回调中执行您的逻辑并没有提供巨大的语义差异,因为它们将以大致相同的顺序馈送到您的聚合队列中,但是,如果您的目标是处理更大的连续 windows,或者看到 cross-partition 在一次逻辑读取中传播,则以上内容可以让您实现这一目标。 (作为记录,您的提议也是如此,但它需要更多的线程+客户端管理,而不是只有一个生产者和一个消费者)。
如果这不能解决您的问题或需要更清楚的说明,请随时告诉我;完全公开我是 event-hubs python SDK 的维护者之一,您也可以随时通过向我们抛出问题来随时与我们联系 our github。
为了后代(感谢@Kibrantn 的帮助),我最终使用了 Threading/Queue 模式,如下所示。这将为每个可用分区启动一个线程,并发接收,在 thread-safe 队列中聚合,在接收方 close()
之前执行 N 秒,最后,将所有内容聚合在一个列表中:
from queue import Queue
import threading
from azure.eventhub import EventHubConsumerClient
class ReceiverClass:
def _callback_process_data(self, partition_context, events):
"""
Generic method used as a processing callback for all the events batches
captured. This follows the Transform => ML => Post downstream workflow.
"""
# Aggregate data into the aggregation Queue
self.events_aggregation_queue.put(events)
def receive_data(self):
# Initialize the consumer
self.consumer = EventHubConsumerClient.from_connection_string(
conn_str=self.config.connection_string,
consumer_group=self.config.consumer_group,
eventhub_name=self.config.eventhub_name
)
# Initialize the aggregation queue to gather all the EventData together (the callback take care of that)
self.events_aggregation_queue = Queue()
# Create a reception thread for each partition
for partition_id in self.consumer.get_partition_ids():
worker = threading.Thread(target=self.consumer.receive_batch,
kwargs={"on_event_batch": self._callback_process_data,
"starting_position": self.config.data_window.receiving_from_time,
"starting_position_inclusive": True,
"partition_id": partition_id})
worker.start()
# Aggregating for N seconds before stopping
time.sleep(self.config.aggregation_wait_time_in_seconds)
# Aggregate the data from the async Queue
events_data_nested = [self.events_aggregation_queue.get() for _ in range(self.events_aggregation_queue.qsize())]
events_data = [event_data for sublist in events_data_nested for event_data in sublist]
logger.info(f"Received a list of {len(events_data)} EventData...")
return events_data