如何将数据发送到设置 PartitionId 而不是 PartitionKey 的 EventHub (Python)
How to send data to an EventHub setting the PartitionId NOT the PartitionKey (Python)
我在 Microsoft Docs 上看到有一种方法可以通过设置 PartitionId 而不是 PartitionKey(使用 C#)将数据发送到我想要的分区。
CreatePartitionSender(String) Create a PartitionSender which can
publish EventData's directly to a specific EventHub partition.
但是,我在 Python 中找不到相同的内容。
有什么办法吗?
我不是很确定,但是使用 python ,这里是打开连接的方法
def open(self):
"""
Open the Sender using the supplied conneciton.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.
:param connection: The underlying client shared connection.
:type: connection: ~uamqp.connection.Connection
"""
self.running = True
if self.redirected:
self.target = self.redirected.address
self._handler = SendClient(
self.target,
auth=self.client.get_auth(),
debug=self.client.debug,
msg_timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties())
self._handler.open()
while not self._handler.client_ready():
time.sleep(0.05)
这是 Init
def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True):
"""
Instantiate an EventHub event Sender handler.
:param client: The parent EventHubClient.
:type client: ~azure.eventhub.client.EventHubClient.
:param target: The URI of the EventHub to send to.
:type target: str
:param partition: The specific partition ID to send to. Default is None, in which case the service
will assign to all partitions using round-robin.
:type partition: str
:param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is
queued. Default value is 60 seconds. If set to 0, there will be no timeout.
:type send_timeout: int
:param keep_alive: The time interval in seconds between pinging the connection to keep it alive during
periods of inactivity. The default value is None, i.e. no keep alive pings.
:type keep_alive: int
:param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs.
Default value is `True`.
:type auto_reconnect: bool
"""
self.running = False
self.client = client
self.target = target
self.partition = partition
self.timeout = send_timeout
self.redirected = None
self.error = None
self.keep_alive = keep_alive
self.auto_reconnect = auto_reconnect
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
self.reconnect_backoff = 1
self.name = "EHSender-{}".format(uuid.uuid4())
if partition:
self.target += "/Partitions/" + partition
self.name += "-partition{}".format(partition)
self._handler = SendClient(
self.target,
auth=self.client.get_auth(),
debug=self.client.debug,
msg_timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties())
self._outcome = None
self._condition = None
我相信,下面这行函数只会创建一个分区发件人
if partition:
self.target += "/Partitions/" + partition
self.name += "-partition{}".format(partition)
参考
https://github.com/Azure/azure-event-hubs-python/blob/master/azure/eventhub/sender.py
希望对您有所帮助。
向Azure Event Hubs发送数据有两种方式,HTTP REST API和AMQP 1.0协议。
对于使用 HTTP REST API 或 Azure EventHub Python Client Library,只有 partitionId
参数支持将新事件发送到事件中心中的指定分区,如下所示。
REST API Send partition event
需要端点 https://{servicebusNamespace}.servicebus.windows.net/{eventHubPath}/partitions/{partitionId}/messages
中的 partitionId
参数,并且它是唯一的一个 REST API支持发送分区功能。
Sender.py
源码注释对partition
参数的解释如下
:param partition: The specific partition ID to send to. Default is None, in which case the service
will assign to all partitions using round-robin.
:type partition: str
所以实际上,您不能使用 partitionKey
值将事件发送到指定的 EventHub 分区,除非在 Python 中使用 AMQP 1.0。使用AMQP 1.0请看官方文档AMQP 1.0 in Azure Service Bus and Event Hubs protocol guide
,在页面搜索partition-key
,结果如下
我在 Microsoft Docs 上看到有一种方法可以通过设置 PartitionId 而不是 PartitionKey(使用 C#)将数据发送到我想要的分区。
CreatePartitionSender(String) Create a PartitionSender which can publish EventData's directly to a specific EventHub partition.
但是,我在 Python 中找不到相同的内容。
有什么办法吗?
我不是很确定,但是使用 python ,这里是打开连接的方法
def open(self):
"""
Open the Sender using the supplied conneciton.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.
:param connection: The underlying client shared connection.
:type: connection: ~uamqp.connection.Connection
"""
self.running = True
if self.redirected:
self.target = self.redirected.address
self._handler = SendClient(
self.target,
auth=self.client.get_auth(),
debug=self.client.debug,
msg_timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties())
self._handler.open()
while not self._handler.client_ready():
time.sleep(0.05)
这是 Init
def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True):
"""
Instantiate an EventHub event Sender handler.
:param client: The parent EventHubClient.
:type client: ~azure.eventhub.client.EventHubClient.
:param target: The URI of the EventHub to send to.
:type target: str
:param partition: The specific partition ID to send to. Default is None, in which case the service
will assign to all partitions using round-robin.
:type partition: str
:param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is
queued. Default value is 60 seconds. If set to 0, there will be no timeout.
:type send_timeout: int
:param keep_alive: The time interval in seconds between pinging the connection to keep it alive during
periods of inactivity. The default value is None, i.e. no keep alive pings.
:type keep_alive: int
:param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs.
Default value is `True`.
:type auto_reconnect: bool
"""
self.running = False
self.client = client
self.target = target
self.partition = partition
self.timeout = send_timeout
self.redirected = None
self.error = None
self.keep_alive = keep_alive
self.auto_reconnect = auto_reconnect
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
self.reconnect_backoff = 1
self.name = "EHSender-{}".format(uuid.uuid4())
if partition:
self.target += "/Partitions/" + partition
self.name += "-partition{}".format(partition)
self._handler = SendClient(
self.target,
auth=self.client.get_auth(),
debug=self.client.debug,
msg_timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties())
self._outcome = None
self._condition = None
我相信,下面这行函数只会创建一个分区发件人
if partition:
self.target += "/Partitions/" + partition
self.name += "-partition{}".format(partition)
参考
https://github.com/Azure/azure-event-hubs-python/blob/master/azure/eventhub/sender.py
希望对您有所帮助。
向Azure Event Hubs发送数据有两种方式,HTTP REST API和AMQP 1.0协议。
对于使用 HTTP REST API 或 Azure EventHub Python Client Library,只有 partitionId
参数支持将新事件发送到事件中心中的指定分区,如下所示。
REST API
Send partition event
需要端点https://{servicebusNamespace}.servicebus.windows.net/{eventHubPath}/partitions/{partitionId}/messages
中的partitionId
参数,并且它是唯一的一个 REST API支持发送分区功能。Sender.py
源码注释对partition
参数的解释如下:param partition: The specific partition ID to send to. Default is None, in which case the service will assign to all partitions using round-robin. :type partition: str
所以实际上,您不能使用 partitionKey
值将事件发送到指定的 EventHub 分区,除非在 Python 中使用 AMQP 1.0。使用AMQP 1.0请看官方文档AMQP 1.0 in Azure Service Bus and Event Hubs protocol guide
,在页面搜索partition-key
,结果如下