Azure EventHub:C# EventProcessorClient 中的检查点最佳实践
Azure EventHub: checkpoint best practices in C# EventProcessorClient
根据 documentation EventProcessorClient
推荐使用 eventhub 流的方式:
EventProcessorClient should be used for reading events from all partitions in a production scenario, as it offers a much more robust experience with higher throughput.
EventProcessorClient
允许仅在消息处理程序内更新消费者的偏移量,即对于每条消息,类似于:
var eventProcessorClient = new EventProcessorClient(...);
eventProcessorClient.ProcessEventAsync += ProcessEventHandler;
eventProcessorClient.StartProcessingAsync(stoppingToken);
...
static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
Console.WriteLine("Received event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}
同时在documentation for eventhub:
Updating after each successfully processed event can have performance and cost implications as it triggers a write operation to the underlying checkpoint store. Also, checkpointing every single event is indicative of a queued messaging pattern for which a Service Bus queue might be a better option than an event hub. The idea behind Event Hubs is that you get "at least once" delivery at great scale.
如何高效更新分区检查点?我应该使用 kafka 客户端而不是 Azure.Messaging.EventHubs
吗?
在每条消息之后设置检查点不是一个好主意。我建议您在处理完每批消息之后设置检查点,例如每 50 条消息之后(我认为数量取决于您正在处理的消息数量等...)
接下来,您应该考虑如何处理被处理两次的消息。例如,您的 EventProcessorClient
可能会崩溃,这意味着他将在最后一个检查点位置重新开始从 EventHub 读取数据。在这种情况下,您将阅读一些您可能已经处理过的消息。因此,在这种情况下,您需要考虑一种可以处理该问题的机制。
也许带有差异检查的计时器比计数更好。
有了number count,如果长时间没有新的事件,你永远不会更新checkpoint。
public class EventListener
{
private readonly Timer updateCheckpointTimer;
private ProcessEventArgs? lastEventArgs;
public EventListener(...)
{
...
this.updateCheckpointTimer = new Timer(this.UpdateCheckpoint, null, TimeSpan.Zero, TimeSpan.FromSeconds(7));
}
public async Task ProcessEventAsync(ProcessEventArgs args)
{
// business logic
...
this.lastEventArgs = args;
}
private async void UpdateCheckpoint(object state)
{
if (this.lastEventArgs is ProcessEventArgs args)
{
await args.UpdateCheckpointAsync(args.CancellationToken);
this.lastEventArgs = null;
}
}
}
}
根据 documentation EventProcessorClient
推荐使用 eventhub 流的方式:
EventProcessorClient should be used for reading events from all partitions in a production scenario, as it offers a much more robust experience with higher throughput.
EventProcessorClient
允许仅在消息处理程序内更新消费者的偏移量,即对于每条消息,类似于:
var eventProcessorClient = new EventProcessorClient(...);
eventProcessorClient.ProcessEventAsync += ProcessEventHandler;
eventProcessorClient.StartProcessingAsync(stoppingToken);
...
static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
Console.WriteLine("Received event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}
同时在documentation for eventhub:
Updating after each successfully processed event can have performance and cost implications as it triggers a write operation to the underlying checkpoint store. Also, checkpointing every single event is indicative of a queued messaging pattern for which a Service Bus queue might be a better option than an event hub. The idea behind Event Hubs is that you get "at least once" delivery at great scale.
如何高效更新分区检查点?我应该使用 kafka 客户端而不是 Azure.Messaging.EventHubs
吗?
在每条消息之后设置检查点不是一个好主意。我建议您在处理完每批消息之后设置检查点,例如每 50 条消息之后(我认为数量取决于您正在处理的消息数量等...)
接下来,您应该考虑如何处理被处理两次的消息。例如,您的 EventProcessorClient
可能会崩溃,这意味着他将在最后一个检查点位置重新开始从 EventHub 读取数据。在这种情况下,您将阅读一些您可能已经处理过的消息。因此,在这种情况下,您需要考虑一种可以处理该问题的机制。
也许带有差异检查的计时器比计数更好。
有了number count,如果长时间没有新的事件,你永远不会更新checkpoint。
public class EventListener
{
private readonly Timer updateCheckpointTimer;
private ProcessEventArgs? lastEventArgs;
public EventListener(...)
{
...
this.updateCheckpointTimer = new Timer(this.UpdateCheckpoint, null, TimeSpan.Zero, TimeSpan.FromSeconds(7));
}
public async Task ProcessEventAsync(ProcessEventArgs args)
{
// business logic
...
this.lastEventArgs = args;
}
private async void UpdateCheckpoint(object state)
{
if (this.lastEventArgs is ProcessEventArgs args)
{
await args.UpdateCheckpointAsync(args.CancellationToken);
this.lastEventArgs = null;
}
}
}
}