Azure Eventhub Python SDK:来自所有分区的所有内容的批处理 window

Azure Eventhub Python SDK : batch of everything in time window from all partitions

我目前正在尝试从使用 32 个分区创建的 Azure EventHub 执行批量读取。更准确地说,我正在尝试读取包含过去 60 分钟内收到的所有事件的批处理。

问题是,在使用 consumer.receive_batch() 方法时,它会触发对同一分区内批处理的 on_event() 回调方法的调用。例如,分区 6 中的 6 个事件触发一个调用,而分区 9 中的 7 个事件触发另一个调用。 我希望每个分区的所有事件都触发一次 on_event() 回调方法 调用。我目前正在使用类似的东西将 EventHubConsumerClient 设置为收听模式:


self.consumer = EventHubConsumerClient.from_connection_string(
            conn_str=conn_str,
            consumer_group=consumer_group,
            eventhub_name=eventhub_name
        )

with self.consumer as consumer:
     consumer.receive_batch(
         on_event_batch=on_event_callback_method,
         starting_position=timestamp_60_minutes_ago,
         starting_position_inclusive=True,
         max_batch_size=999999999999,
         max_wait_time=60 # to let time to the receiver to gather all the messages
         )

我目前正在考虑 N 线程(此处 N 为 32)从 1-32 读取分区,然后在一个统一的事件列表中减少,但不确定如何继续,或者即使我不深入研究兔子洞。很高兴听到您对此的看法!我们当前的实现依赖于 Databricks 对 EventHub 的支持(这似乎能够做到这一点),但我们想与其保持一定距离并使用官方 SDK。

您的建议并没有落空,但如果您愿意进行您描述的略有不同的那种聚合,则可能会为您自己做比需要更多的工作。

与其产生 32 个消费者,您可能需要为它们管理故障模式、生命周期、线程管理和所有这些开销,为什么不按照您描述的方式将其简化为一个列表,而是像现在一样通过单个事件中心消费者? import queue (here) 中的 queue.Queue() 应该为您提供所需的所有同步逻辑 out-of-box 让您有一个工作人员处理从该漏斗中读取的聚合。

我会预先声明,这与仅在 per-partition 回调中执行您的逻辑并没有提供巨大的语义差异,因为它们将以大致相同的顺序馈送到您的聚合队列中,但是,如果您的目标是处理更大的连续 windows,或者看到 cross-partition 在一次逻辑读取中传播,则以上内容可以让您实现这一目标。 (作为记录,您的提议也是如此,但它需要更多的线程+客户端管理,而不是只有一个生产者和一个消费者)。

如果这不能解决您的问题或需要更清楚的说明,请随时告诉我;完全公开我是 event-hubs python SDK 的维护者之一,您也可以随时通过向我们抛出问题来随时与我们联系 our github

为了后代(感谢@Kibrantn 的帮助),我最终使用了 Threading/Queue 模式,如下所示。这将为每个可用分区启动一个线程,并发接收,在 thread-safe 队列中聚合,在接收方 close() 之前执行 N 秒,最后,将所有内容聚合在一个列表中:

from queue import Queue
import threading

from azure.eventhub import EventHubConsumerClient


class ReceiverClass:

    def _callback_process_data(self, partition_context, events):
        """
        Generic method used as a processing callback for all the events batches
        captured. This follows the Transform => ML => Post downstream workflow.
        """
        # Aggregate data into the aggregation Queue
        self.events_aggregation_queue.put(events)

    def receive_data(self):

        # Initialize the consumer
        self.consumer = EventHubConsumerClient.from_connection_string(
                    conn_str=self.config.connection_string,
                    consumer_group=self.config.consumer_group,
                    eventhub_name=self.config.eventhub_name
                )

        # Initialize the aggregation queue to gather all the EventData together (the callback take care of that)
        self.events_aggregation_queue = Queue()

        # Create a reception thread for each partition
        for partition_id in self.consumer.get_partition_ids():
            worker = threading.Thread(target=self.consumer.receive_batch,
                kwargs={"on_event_batch": self._callback_process_data,
                "starting_position": self.config.data_window.receiving_from_time,
                "starting_position_inclusive": True,
                "partition_id": partition_id})
            worker.start()

        # Aggregating for N seconds before stopping
        time.sleep(self.config.aggregation_wait_time_in_seconds)

        # Aggregate the data from the async Queue
        events_data_nested = [self.events_aggregation_queue.get() for _ in range(self.events_aggregation_queue.qsize())]
        events_data = [event_data for sublist in events_data_nested for event_data in sublist]
        logger.info(f"Received a list of {len(events_data)} EventData...")

        return events_data