使用 Axon Framework 的超低延迟流程
Ultra low latency processes using Axon Framework
所以,我正在为使用 axon 和 Spring 引导框架的低延迟交易引擎开发 PoC。单个流程是否有可能实现低至 10 - 50 毫秒的延迟?该过程将包括验证、订单和风险管理。我已经在一个简单的应用程序上完成了一些初始测试,以更新订单状态并执行它,我的延迟时间超过 300 毫秒。这让我好奇我可以用 Axon 优化多少?
编辑:
延迟问题与 Axon 无关。使用 InMemoryEventStorageEngine
和 DisruptorCommandBus
.
设法将其降低到每个流程约 5 毫秒
消息的流程是这样的。 NewOrderCommand(从客户端发布)-> OrderCreated(从聚合发布)-> ExecuteOrder(从聚合发布) saga) -> OrderExecutionRequested -> ConfirmOrderExecution(从 saga 发布)-> OrderExecuted(从聚合发布)
编辑 2:
最后切换到 Axon 服务器,但正如预期的那样,平均延迟上升到 ~150 毫秒。 Axon 服务器是使用 Docker 安装的。如何使用 AxonServer 优化应用程序以实现亚毫秒级延迟?感谢任何指点。
编辑 3:
@Steven,根据你的建议,我已经设法将延迟降低到平均 10 毫秒,这是一个好的开始!但是,是否有可能进一步降低它?因为我现在正在测试的只是最终执行订单之前要完成的一系列流程中的一个小流程,如验证、风险管理和头寸跟踪。所有这些都应该在 5ms 或更短的时间内完成。容忍的最坏情况是 10 毫秒(这些是更新的时间预算)。此外,请注意下面的配置,新读数基于 InMemorySagaStore
,由 WeakReferenceCache
支持。非常感谢您的帮助!
订单汇总:
@Aggregate
internal class OrderAggregate {
@AggregateIdentifier(routingKey = "orderId")
private lateinit var clientOrderId: String
private var orderId: String = UUID.randomUUID().toString()
private lateinit var state: OrderState
private lateinit var createdAtSource: LocalTime
private val log by Logger()
constructor() {}
@CommandHandler
constructor(command: NewOrderCommand) {
log.info("received new order command")
val (orderId, created) = command
apply(
OrderCreatedEvent(
clientOrderId = orderId,
created = created
)
)
}
@CommandHandler
fun handle(command: ConfirmOrderExecutionCommand) {
apply(OrderExecutedEvent(orderId = command.orderId, accountId = accountId))
}
@CommandHandler
fun execute(command: ExecuteOrderCommand) {
log.info("execute order event received")
apply(
OrderExecutionRequestedEvent(
clientOrderId = clientOrderId
)
)
}
@EventSourcingHandler
fun on(event: OrderCreatedEvent) {
log.info("order created event received")
clientOrderId = event.clientOrderId
createdAtSource = event.created
setState(Confirmed)
}
@EventSourcingHandler
fun on(event: OrderExecutedEvent) {
val now = LocalTime.now()
log.info(
"elapse to execute: ${
createdAtSource.until(
now,
MILLIS
)
}ms. created at source: $createdAtSource, now: $now"
)
setState(Executed)
}
private fun setState(state: OrderState) {
this.state = state
}
}
OrderManagerSaga:
@Profile("rabbit-executor")
@Saga(sagaStore = "sagaStore")
class OrderManagerSaga {
@Autowired
private lateinit var commandGateway: CommandGateway
@Autowired
private lateinit var executor: RabbitMarketOrderExecutor
private val log by Logger()
@StartSaga
@SagaEventHandler(associationProperty = "clientOrderId")
fun on(event: OrderCreatedEvent) {
log.info("saga received order created event")
commandGateway.send<Any>(ExecuteOrderCommand(orderId = event.clientOrderId, accountId = event.accountId))
}
@SagaEventHandler(associationProperty = "clientOrderId")
fun on(event: OrderExecutionRequestedEvent) {
log.info("saga received order execution requested event")
try {
//execute order
commandGateway.send<Any>(ConfirmOrderExecutionCommand(orderId = event.clientOrderId))
} catch (e: Exception) {
log.error("failed to send order: $e")
commandGateway.send<Any>(
RejectOrderCommand(
orderId = event.clientOrderId
)
)
}
}
}
豆类:
@Bean
fun eventSerializer(mapper: ObjectMapper): JacksonSerializer{
return JacksonSerializer.Builder()
.objectMapper(mapper)
.build()
}
@Bean
fun commandBusCache(): Cache {
return WeakReferenceCache()
}
@Bean
fun sagaCache(): Cache {
return WeakReferenceCache()
}
@Bean
fun associationsCache(): Cache {
return WeakReferenceCache()
}
@Bean
fun sagaStore(sagaCache: Cache, associationsCache: Cache): CachingSagaStore<Any>{
val sagaStore = InMemorySagaStore()
return CachingSagaStore.Builder<Any>()
.delegateSagaStore(sagaStore)
.associationsCache(associationsCache)
.sagaCache(sagaCache)
.build()
}
@Bean
fun commandBus(
commandBusCache: Cache,
orderAggregateFactory: SpringPrototypeAggregateFactory<Order>,
eventStore: EventStore,
txManager: TransactionManager,
axonConfiguration: AxonConfiguration,
snapshotter: SpringAggregateSnapshotter
): DisruptorCommandBus {
val commandBus = DisruptorCommandBus.builder()
.waitStrategy(BusySpinWaitStrategy())
.executor(Executors.newFixedThreadPool(8))
.publisherThreadCount(1)
.invokerThreadCount(1)
.transactionManager(txManager)
.cache(commandBusCache)
.messageMonitor(axonConfiguration.messageMonitor(DisruptorCommandBus::class.java, "commandBus"))
.build()
commandBus.registerHandlerInterceptor(CorrelationDataInterceptor(axonConfiguration.correlationDataProviders()))
return commandBus
}
Application.yml:
axon:
server:
enabled: true
eventhandling:
processors:
name:
mode: tracking
source: eventBus
serializer:
general : jackson
events : jackson
messages : jackson
原始回复
您的设置描述很详尽,但我认为我仍然可以推荐一些选项。这涉及到框架内的许多位置,因此,如果他们在 Axon 中的立场或目标对建议有任何不清楚的地方,请随时添加评论,以便我可以更新我的回复。
现在,让我们列出我的想法:
- 为聚合设置快照如果加载时间过长。可配置
AggregateLoadTimeSnapshotTriggerDefinition
.
- 为您的聚合引入缓存。顺便说一句,我将从尝试
WeakReferenceCache
. If this doesn't suffice, it would be worth investigating the EhCache and JCache adapters. Or, construct your own. Here's 关于聚合缓存的部分开始。
- 为您的 saga 引入缓存。顺便说一下,我将从尝试
WeakReferenceCache
. If this doesn't suffice, it would be worth investigating the EhCache and JCache adapters. Or, construct your own. Here's 关于 Saga 缓存的部分开始。
- 在这个设置中你真的需要 Saga 吗?这个过程看起来很简单,它可以 运行 在一个常规的事件处理组件中。如果是这样的话,not 在 Saga 流程中移动也可能会加速。
- 您是否尝试过优化
DisruptorCommandBus
?尝试使用 WaitStrategy
、发布者线程数、调用者线程数和使用的 Executor
。
- 尝试
PooledStreamingEventProcessor
(简称 PSEP)而不是 TrackingEventProcessor
(简称 TEP)。前者提供了更多的配置选项。顺便说一下,与 TEP 相比,默认值已经提供了更高的吞吐量。增加“批量大小”可以让您一次摄取更多的事件。您还可以更改 Executor
PSEP 用于事件检索工作(由 协调器 完成)和事件处理(worker 执行器负责这个)。
- 您还可以在 Axon 服务器上配置一些可能会增加吞吐量的东西。试试
event.events-per-segment-prefetch
、event.read-buffer-size
或 command-thread
。可能还有其他可行的选项,因此可能值得查看整个选项列表 here.
- 虽然很难推断这是否会产生立竿见影的好处,但您可以为 Axon 服务器提供 运行启用更多内存/CPU。至少 2Gb 堆和 4 个内核。玩转这些数字可能也有帮助。
可能还有更多内容要分享,但这些是我最关心的事情。希望这对您有所帮助大卫!
第二次回应
为了进一步推断我们可以在哪些方面实现更高的性能,我认为了解您的应用程序在哪个进程上花费的时间最长是很重要的。如果我们可以改进它,这将使我们能够推断出应该改进的地方。
您是否尝试过制作线程转储来推断哪个部分占用的时间最多?如果您可以将其分享为您问题的更新,我们可以开始考虑以下步骤。
所以,我正在为使用 axon 和 Spring 引导框架的低延迟交易引擎开发 PoC。单个流程是否有可能实现低至 10 - 50 毫秒的延迟?该过程将包括验证、订单和风险管理。我已经在一个简单的应用程序上完成了一些初始测试,以更新订单状态并执行它,我的延迟时间超过 300 毫秒。这让我好奇我可以用 Axon 优化多少?
编辑:
延迟问题与 Axon 无关。使用 InMemoryEventStorageEngine
和 DisruptorCommandBus
.
消息的流程是这样的。 NewOrderCommand(从客户端发布)-> OrderCreated(从聚合发布)-> ExecuteOrder(从聚合发布) saga) -> OrderExecutionRequested -> ConfirmOrderExecution(从 saga 发布)-> OrderExecuted(从聚合发布)
编辑 2: 最后切换到 Axon 服务器,但正如预期的那样,平均延迟上升到 ~150 毫秒。 Axon 服务器是使用 Docker 安装的。如何使用 AxonServer 优化应用程序以实现亚毫秒级延迟?感谢任何指点。
编辑 3:
@Steven,根据你的建议,我已经设法将延迟降低到平均 10 毫秒,这是一个好的开始!但是,是否有可能进一步降低它?因为我现在正在测试的只是最终执行订单之前要完成的一系列流程中的一个小流程,如验证、风险管理和头寸跟踪。所有这些都应该在 5ms 或更短的时间内完成。容忍的最坏情况是 10 毫秒(这些是更新的时间预算)。此外,请注意下面的配置,新读数基于 InMemorySagaStore
,由 WeakReferenceCache
支持。非常感谢您的帮助!
订单汇总:
@Aggregate
internal class OrderAggregate {
@AggregateIdentifier(routingKey = "orderId")
private lateinit var clientOrderId: String
private var orderId: String = UUID.randomUUID().toString()
private lateinit var state: OrderState
private lateinit var createdAtSource: LocalTime
private val log by Logger()
constructor() {}
@CommandHandler
constructor(command: NewOrderCommand) {
log.info("received new order command")
val (orderId, created) = command
apply(
OrderCreatedEvent(
clientOrderId = orderId,
created = created
)
)
}
@CommandHandler
fun handle(command: ConfirmOrderExecutionCommand) {
apply(OrderExecutedEvent(orderId = command.orderId, accountId = accountId))
}
@CommandHandler
fun execute(command: ExecuteOrderCommand) {
log.info("execute order event received")
apply(
OrderExecutionRequestedEvent(
clientOrderId = clientOrderId
)
)
}
@EventSourcingHandler
fun on(event: OrderCreatedEvent) {
log.info("order created event received")
clientOrderId = event.clientOrderId
createdAtSource = event.created
setState(Confirmed)
}
@EventSourcingHandler
fun on(event: OrderExecutedEvent) {
val now = LocalTime.now()
log.info(
"elapse to execute: ${
createdAtSource.until(
now,
MILLIS
)
}ms. created at source: $createdAtSource, now: $now"
)
setState(Executed)
}
private fun setState(state: OrderState) {
this.state = state
}
}
OrderManagerSaga:
@Profile("rabbit-executor")
@Saga(sagaStore = "sagaStore")
class OrderManagerSaga {
@Autowired
private lateinit var commandGateway: CommandGateway
@Autowired
private lateinit var executor: RabbitMarketOrderExecutor
private val log by Logger()
@StartSaga
@SagaEventHandler(associationProperty = "clientOrderId")
fun on(event: OrderCreatedEvent) {
log.info("saga received order created event")
commandGateway.send<Any>(ExecuteOrderCommand(orderId = event.clientOrderId, accountId = event.accountId))
}
@SagaEventHandler(associationProperty = "clientOrderId")
fun on(event: OrderExecutionRequestedEvent) {
log.info("saga received order execution requested event")
try {
//execute order
commandGateway.send<Any>(ConfirmOrderExecutionCommand(orderId = event.clientOrderId))
} catch (e: Exception) {
log.error("failed to send order: $e")
commandGateway.send<Any>(
RejectOrderCommand(
orderId = event.clientOrderId
)
)
}
}
}
豆类:
@Bean
fun eventSerializer(mapper: ObjectMapper): JacksonSerializer{
return JacksonSerializer.Builder()
.objectMapper(mapper)
.build()
}
@Bean
fun commandBusCache(): Cache {
return WeakReferenceCache()
}
@Bean
fun sagaCache(): Cache {
return WeakReferenceCache()
}
@Bean
fun associationsCache(): Cache {
return WeakReferenceCache()
}
@Bean
fun sagaStore(sagaCache: Cache, associationsCache: Cache): CachingSagaStore<Any>{
val sagaStore = InMemorySagaStore()
return CachingSagaStore.Builder<Any>()
.delegateSagaStore(sagaStore)
.associationsCache(associationsCache)
.sagaCache(sagaCache)
.build()
}
@Bean
fun commandBus(
commandBusCache: Cache,
orderAggregateFactory: SpringPrototypeAggregateFactory<Order>,
eventStore: EventStore,
txManager: TransactionManager,
axonConfiguration: AxonConfiguration,
snapshotter: SpringAggregateSnapshotter
): DisruptorCommandBus {
val commandBus = DisruptorCommandBus.builder()
.waitStrategy(BusySpinWaitStrategy())
.executor(Executors.newFixedThreadPool(8))
.publisherThreadCount(1)
.invokerThreadCount(1)
.transactionManager(txManager)
.cache(commandBusCache)
.messageMonitor(axonConfiguration.messageMonitor(DisruptorCommandBus::class.java, "commandBus"))
.build()
commandBus.registerHandlerInterceptor(CorrelationDataInterceptor(axonConfiguration.correlationDataProviders()))
return commandBus
}
Application.yml:
axon:
server:
enabled: true
eventhandling:
processors:
name:
mode: tracking
source: eventBus
serializer:
general : jackson
events : jackson
messages : jackson
原始回复
您的设置描述很详尽,但我认为我仍然可以推荐一些选项。这涉及到框架内的许多位置,因此,如果他们在 Axon 中的立场或目标对建议有任何不清楚的地方,请随时添加评论,以便我可以更新我的回复。
现在,让我们列出我的想法:
- 为聚合设置快照如果加载时间过长。可配置
AggregateLoadTimeSnapshotTriggerDefinition
. - 为您的聚合引入缓存。顺便说一句,我将从尝试
WeakReferenceCache
. If this doesn't suffice, it would be worth investigating the EhCache and JCache adapters. Or, construct your own. Here's 关于聚合缓存的部分开始。 - 为您的 saga 引入缓存。顺便说一下,我将从尝试
WeakReferenceCache
. If this doesn't suffice, it would be worth investigating the EhCache and JCache adapters. Or, construct your own. Here's 关于 Saga 缓存的部分开始。 - 在这个设置中你真的需要 Saga 吗?这个过程看起来很简单,它可以 运行 在一个常规的事件处理组件中。如果是这样的话,not 在 Saga 流程中移动也可能会加速。
- 您是否尝试过优化
DisruptorCommandBus
?尝试使用WaitStrategy
、发布者线程数、调用者线程数和使用的Executor
。 - 尝试
PooledStreamingEventProcessor
(简称 PSEP)而不是TrackingEventProcessor
(简称 TEP)。前者提供了更多的配置选项。顺便说一下,与 TEP 相比,默认值已经提供了更高的吞吐量。增加“批量大小”可以让您一次摄取更多的事件。您还可以更改Executor
PSEP 用于事件检索工作(由 协调器 完成)和事件处理(worker 执行器负责这个)。 - 您还可以在 Axon 服务器上配置一些可能会增加吞吐量的东西。试试
event.events-per-segment-prefetch
、event.read-buffer-size
或command-thread
。可能还有其他可行的选项,因此可能值得查看整个选项列表 here. - 虽然很难推断这是否会产生立竿见影的好处,但您可以为 Axon 服务器提供 运行启用更多内存/CPU。至少 2Gb 堆和 4 个内核。玩转这些数字可能也有帮助。
可能还有更多内容要分享,但这些是我最关心的事情。希望这对您有所帮助大卫!
第二次回应
为了进一步推断我们可以在哪些方面实现更高的性能,我认为了解您的应用程序在哪个进程上花费的时间最长是很重要的。如果我们可以改进它,这将使我们能够推断出应该改进的地方。
您是否尝试过制作线程转储来推断哪个部分占用的时间最多?如果您可以将其分享为您问题的更新,我们可以开始考虑以下步骤。