Spring 集成 JpaExecutor
Spring Integration JpaExecutor
我想像这样用 JpaExecutor
做更新语句
JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(jdbcTemplate,"select * from EPAM_EVENT_STORE_T where EVENT_STATUS = 0");
adapter.setUpdateSql("update EPAM_EVENT_STORE_T set EVENT_STATUS = 1 where event_store_id in (:eventStoreId)");
但是JpaExecutor
没有更新方法。
我在这个 and there 中看到了肮脏的黑客攻击。
但真的,没有肮脏的技巧就可以做到吗?
所以,这就是我解决这个问题的方法。
@MessageEndpoint
@Log
public class MessageFromEvent {
private final EntityManagerFactory entityManagerFactory;
private final JpaTransactionManager transactionManager;
@Autowired
public MessageFromEvent(EntityManagerFactory entityManagerFactory,
JpaTransactionManager transactionManager) {
this.entityManagerFactory = entityManagerFactory;
this.transactionManager = transactionManager;
}
@Bean
@InboundChannelAdapter(channel = "selectEventChannel", poller = @Poller(fixedDelay = "10000"))
public MessageSource<?> selectEvent() {
log.info("STEP 1: Getting data form EventStore table");
return new JpaPollingChannelAdapter(selectEventExecutor());
}
@Bean
public JpaExecutor selectEventExecutor() {
JpaExecutor executor = new JpaExecutor(this.entityManagerFactory);
executor.setJpaQuery("select event from Event event where event.eventStatus = 0");
executor.setUsePayloadAsParameterSource(true);
executor.setEntityClass(Event.class);
return executor;
}
@Bean
@ServiceActivator(inputChannel = "selectEventChannel")
public MessageHandler updateEventStatus(JpaExecutor updateEventExecutor) {
log.info("STEP 2: Updating event status");
JpaOutboundGateway gateway = new JpaOutboundGateway(updateEventExecutor);
gateway.setGatewayType(OutboundGatewayType.UPDATING);
MatchAlwaysTransactionAttributeSource attributeSource = new MatchAlwaysTransactionAttributeSource();
attributeSource.setTransactionAttribute(new DefaultTransactionAttribute());
TransactionInterceptor interceptor = new TransactionInterceptor(transactionManager, attributeSource);
gateway.setAdviceChain(singletonList(interceptor));
return gateway;
}
@Bean
public JpaExecutor updateEventExecutor() {
JpaExecutor executor = new JpaExecutor(this.entityManagerFactory);
executor.setJpaQuery("update Event E set E.eventStatus = 1 where E.eventStoreId in (:eventStoreId)");
executor.setJpaParameters(Collections.singletonList(new JpaParameter("Event.eventStoreId", null, "payload")));
executor.setUsePayloadAsParameterSource(true);
executor.setEntityClass(Event.class);
return executor;
}
}
我想像这样用 JpaExecutor
做更新语句
JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(jdbcTemplate,"select * from EPAM_EVENT_STORE_T where EVENT_STATUS = 0");
adapter.setUpdateSql("update EPAM_EVENT_STORE_T set EVENT_STATUS = 1 where event_store_id in (:eventStoreId)");
但是JpaExecutor
没有更新方法。
我在这个
所以,这就是我解决这个问题的方法。
@MessageEndpoint
@Log
public class MessageFromEvent {
private final EntityManagerFactory entityManagerFactory;
private final JpaTransactionManager transactionManager;
@Autowired
public MessageFromEvent(EntityManagerFactory entityManagerFactory,
JpaTransactionManager transactionManager) {
this.entityManagerFactory = entityManagerFactory;
this.transactionManager = transactionManager;
}
@Bean
@InboundChannelAdapter(channel = "selectEventChannel", poller = @Poller(fixedDelay = "10000"))
public MessageSource<?> selectEvent() {
log.info("STEP 1: Getting data form EventStore table");
return new JpaPollingChannelAdapter(selectEventExecutor());
}
@Bean
public JpaExecutor selectEventExecutor() {
JpaExecutor executor = new JpaExecutor(this.entityManagerFactory);
executor.setJpaQuery("select event from Event event where event.eventStatus = 0");
executor.setUsePayloadAsParameterSource(true);
executor.setEntityClass(Event.class);
return executor;
}
@Bean
@ServiceActivator(inputChannel = "selectEventChannel")
public MessageHandler updateEventStatus(JpaExecutor updateEventExecutor) {
log.info("STEP 2: Updating event status");
JpaOutboundGateway gateway = new JpaOutboundGateway(updateEventExecutor);
gateway.setGatewayType(OutboundGatewayType.UPDATING);
MatchAlwaysTransactionAttributeSource attributeSource = new MatchAlwaysTransactionAttributeSource();
attributeSource.setTransactionAttribute(new DefaultTransactionAttribute());
TransactionInterceptor interceptor = new TransactionInterceptor(transactionManager, attributeSource);
gateway.setAdviceChain(singletonList(interceptor));
return gateway;
}
@Bean
public JpaExecutor updateEventExecutor() {
JpaExecutor executor = new JpaExecutor(this.entityManagerFactory);
executor.setJpaQuery("update Event E set E.eventStatus = 1 where E.eventStoreId in (:eventStoreId)");
executor.setJpaParameters(Collections.singletonList(new JpaParameter("Event.eventStoreId", null, "payload")));
executor.setUsePayloadAsParameterSource(true);
executor.setEntityClass(Event.class);
return executor;
}
}