使用 python 天蓝色事件中心中的事件
consume events in python azure event hub
我正在事件中心接收 JSON 数据。
每天一次,我想从事件中心读取此数据并将其存储在数据库中。为了从事件中心读取数据,我遵循了这个文档:https://docs.microsoft.com/en-us/python/api/overview/azure/eventhub-readme?view=azure-python
我能够打印事件中心中的所有事件,但我不知道如何获取这些事件和此函数之外的 return pandas 数据框。
我试过这个:
def on_event_batch(partition_context, events):
final_dataframe = pd.DataFrame()
print("Received event from partition {}".format(partition_context.partition_id))
for event in events:
body = json.loads(next(event.body).decode('UTF-8'))
event_df = pd.DataFrame(body,index = [0])
final_dataframe = pd.concat([final_dataframe,event_df],ignore_index= True)
partition_context.update_checkpoint()
client.close()
print(final_dataframe)
return final_dataframe
with client:
final_dataframe = client.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# client.receive_batch(on_event_batch=on_event_batch, partition_id='0')
但它不起作用。
client.receive_batch(on_event_batch=on_event_batch, partition_id='0') 有 return 类型 None。我不确定您是否能够通过在回调函数中执行 return 来实现此目的。
不过,我认为更简单的方法如下所示
from azure.eventhub import EventHubConsumerClient
import pandas as pd
import json
def get_messages() :
connection_str = '<YOUR CONNECTION STRING>'
consumer_group = '<YOUR CONSUMER GROUP>'
eventhub_name = '<YOUR EVENT HUB>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
final_df = pd.DataFrame()
def on_event_batch(partition_context, events):
print("Received event from partition {}".format(partition_context.partition_id))
print(len(events))
#Checking whether there is any event returned as we have set max_wait_time
if(len(events) == 0):
#closing the client if there is no event triggered.
client.close()
else:
for event in events:
#Event.body operation
body=event.body
event_df = pd.DataFrame(body,index = [0])
nonlocal final_df
final_df = pd.concat([final_df,event_df],ignore_index= True)
partition_context.update_checkpoint()
with client:
client.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1",max_wait_time = 5,max_batch_size=2 # "-1" is from the beginning of the partition.
#Max_wait_time - no activitiy for that much - call back function is called with No events.
)
return final_df
df = get_messages()
df.head()
上面的代码实际上会在正常退出后将值设置到数据帧df。
我正在事件中心接收 JSON 数据。
每天一次,我想从事件中心读取此数据并将其存储在数据库中。为了从事件中心读取数据,我遵循了这个文档:https://docs.microsoft.com/en-us/python/api/overview/azure/eventhub-readme?view=azure-python
我能够打印事件中心中的所有事件,但我不知道如何获取这些事件和此函数之外的 return pandas 数据框。
我试过这个:
def on_event_batch(partition_context, events):
final_dataframe = pd.DataFrame()
print("Received event from partition {}".format(partition_context.partition_id))
for event in events:
body = json.loads(next(event.body).decode('UTF-8'))
event_df = pd.DataFrame(body,index = [0])
final_dataframe = pd.concat([final_dataframe,event_df],ignore_index= True)
partition_context.update_checkpoint()
client.close()
print(final_dataframe)
return final_dataframe
with client:
final_dataframe = client.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# client.receive_batch(on_event_batch=on_event_batch, partition_id='0')
但它不起作用。
client.receive_batch(on_event_batch=on_event_batch, partition_id='0') 有 return 类型 None。我不确定您是否能够通过在回调函数中执行 return 来实现此目的。
不过,我认为更简单的方法如下所示
from azure.eventhub import EventHubConsumerClient
import pandas as pd
import json
def get_messages() :
connection_str = '<YOUR CONNECTION STRING>'
consumer_group = '<YOUR CONSUMER GROUP>'
eventhub_name = '<YOUR EVENT HUB>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
final_df = pd.DataFrame()
def on_event_batch(partition_context, events):
print("Received event from partition {}".format(partition_context.partition_id))
print(len(events))
#Checking whether there is any event returned as we have set max_wait_time
if(len(events) == 0):
#closing the client if there is no event triggered.
client.close()
else:
for event in events:
#Event.body operation
body=event.body
event_df = pd.DataFrame(body,index = [0])
nonlocal final_df
final_df = pd.concat([final_df,event_df],ignore_index= True)
partition_context.update_checkpoint()
with client:
client.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1",max_wait_time = 5,max_batch_size=2 # "-1" is from the beginning of the partition.
#Max_wait_time - no activitiy for that much - call back function is called with No events.
)
return final_df
df = get_messages()
df.head()
上面的代码实际上会在正常退出后将值设置到数据帧df。