Spring 使用 JPA 2.2 resultStream 的数据到 Kotlin 的流
Spring Data with JPA 2.2 resultStream to Kotlin's Flow
在 JPA 2.2 之前,如果我想将 ScrollableResults
发送到 Kotlin 的 Flow
,我必须这样做:
override fun findSomeUsers(batch: Int): Flow<User> {
return flow {
(em.delegate as Session).sessionFactory.openSession().use { session ->
val query = session.createQuery("select u from User u where ...")
query.fetchSize = batch
query.isReadOnly = true
query.scroll(ScrollMode.FORWARD_ONLY).use { results ->
while (results.next()) {
val u = results.get(0) as User
emit(u)
}
}
}
}
}
我必须将 EntityManager
向下转换为 Hibernate 的 Session
。
但是由于 JPA 2.2 的 Query
支持 getResultStream
,应该有一种更简洁的方法来实现这一点:
@ExperimentalCoroutinesApi
override fun findSomeUsers(batchSize: Int): Flow<User> {
return channelFlow {
em.createQuery("select u from User u where ...")
.setHint(HINT_FETCH_SIZE, batchSize) // "org.hibernate.fetchSize"
.unwrap(javax.persistence.Query::class.java)
.resultStream
.asSequence()
.map { it as User }
.forEach { u ->
runBlocking {
send(u)
}
}
}
}
嗯,效果很好,但有些可疑。
首先,为什么我不能直接编码 resultStream.asSequence.map {it as User}.asFlow()
? (client端无反应结束)
其次,runBlocking
块很丑。 runBlocking
只应在测试中使用。但是我在代码中没有找到规避的方法。
有什么办法可以解决吗?
第三,与问题无关。好像Spring-Data-JPA还是不支持这样的查询方式:
@Query("select u from User u where ...")
@MaybeSomeQueryHint(batchSize=:batchSize)
fun findSomeUsers(@Param("name="batchSize") batchSize: Int): Flow<User>
它加载所有用户,然后抱怨重复的行...
客户端(测试)端代码就这么简单:
@ExperimentalCoroutinesApi
@Test
@Transactional
open fun testUsers() {
runBlocking {
userDao.findSomeUsers(100).collectIndexed { index, u: User ->
logger.info("[{}] {}", index , u)
}
}
}
@Marko,Stream
版本运行良好:
override fun findSomeUserStream(batchSize: Int): Stream<User> {
return em.createQuery("select u from User u where ...")
.setHint(HINT_FETCH_SIZE, batchSize) // "org.hibernate.fetchSize"
.unwrap(javax.persistence.Query::class.java)
.resultStream
.map { it as User }
}
@Transactional // without this annotation , "Operation not allowed after ResultSet closed" will be thrown
@Test
open fun testUserStream() {
runBlocking {
userDao.findSomeUserStream(100).forEach { u ->
logger.info("{}" , u)
}
}
}
// it works !!
@Transactional
@Test
open fun testUserStream2() {
runBlocking {
userDao.findSomeUserStream(100).asSequence().asFlow().collect { u ->
logger.info("{}" , u)
}
}
}
不是修补 Stream.toSequence()
的结果,而是定义 Stream
到 Flow
的这种转换:
fun <T> Stream<T>.asFlow() = flow {
for (t in iterator()) {
emit(t)
}
}
如果您将它与此代码示例一起使用:
suspend fun main() {
Stream.of("a", "b")
.asFlow()
.collect { println(it) }
}
它将打印
a
b
您的函数应如下所示:
override fun findSomeUsers(batchSize: Int): Flow<User> {
return em.createQuery("select u from User u where ...")
.setHint(HINT_FETCH_SIZE, batchSize) // "org.hibernate.fetchSize"
.unwrap(javax.persistence.Query::class.java)
.resultStream
.asFlow()
.map { it as User }
}
在 JPA 2.2 之前,如果我想将 ScrollableResults
发送到 Kotlin 的 Flow
,我必须这样做:
override fun findSomeUsers(batch: Int): Flow<User> {
return flow {
(em.delegate as Session).sessionFactory.openSession().use { session ->
val query = session.createQuery("select u from User u where ...")
query.fetchSize = batch
query.isReadOnly = true
query.scroll(ScrollMode.FORWARD_ONLY).use { results ->
while (results.next()) {
val u = results.get(0) as User
emit(u)
}
}
}
}
}
我必须将 EntityManager
向下转换为 Hibernate 的 Session
。
但是由于 JPA 2.2 的 Query
支持 getResultStream
,应该有一种更简洁的方法来实现这一点:
@ExperimentalCoroutinesApi
override fun findSomeUsers(batchSize: Int): Flow<User> {
return channelFlow {
em.createQuery("select u from User u where ...")
.setHint(HINT_FETCH_SIZE, batchSize) // "org.hibernate.fetchSize"
.unwrap(javax.persistence.Query::class.java)
.resultStream
.asSequence()
.map { it as User }
.forEach { u ->
runBlocking {
send(u)
}
}
}
}
嗯,效果很好,但有些可疑。
首先,为什么我不能直接编码 resultStream.asSequence.map {it as User}.asFlow()
? (client端无反应结束)
其次,runBlocking
块很丑。 runBlocking
只应在测试中使用。但是我在代码中没有找到规避的方法。
有什么办法可以解决吗?
第三,与问题无关。好像Spring-Data-JPA还是不支持这样的查询方式:
@Query("select u from User u where ...")
@MaybeSomeQueryHint(batchSize=:batchSize)
fun findSomeUsers(@Param("name="batchSize") batchSize: Int): Flow<User>
它加载所有用户,然后抱怨重复的行...
客户端(测试)端代码就这么简单:
@ExperimentalCoroutinesApi
@Test
@Transactional
open fun testUsers() {
runBlocking {
userDao.findSomeUsers(100).collectIndexed { index, u: User ->
logger.info("[{}] {}", index , u)
}
}
}
@Marko,Stream
版本运行良好:
override fun findSomeUserStream(batchSize: Int): Stream<User> {
return em.createQuery("select u from User u where ...")
.setHint(HINT_FETCH_SIZE, batchSize) // "org.hibernate.fetchSize"
.unwrap(javax.persistence.Query::class.java)
.resultStream
.map { it as User }
}
@Transactional // without this annotation , "Operation not allowed after ResultSet closed" will be thrown
@Test
open fun testUserStream() {
runBlocking {
userDao.findSomeUserStream(100).forEach { u ->
logger.info("{}" , u)
}
}
}
// it works !!
@Transactional
@Test
open fun testUserStream2() {
runBlocking {
userDao.findSomeUserStream(100).asSequence().asFlow().collect { u ->
logger.info("{}" , u)
}
}
}
不是修补 Stream.toSequence()
的结果,而是定义 Stream
到 Flow
的这种转换:
fun <T> Stream<T>.asFlow() = flow {
for (t in iterator()) {
emit(t)
}
}
如果您将它与此代码示例一起使用:
suspend fun main() {
Stream.of("a", "b")
.asFlow()
.collect { println(it) }
}
它将打印
a
b
您的函数应如下所示:
override fun findSomeUsers(batchSize: Int): Flow<User> {
return em.createQuery("select u from User u where ...")
.setHint(HINT_FETCH_SIZE, batchSize) // "org.hibernate.fetchSize"
.unwrap(javax.persistence.Query::class.java)
.resultStream
.asFlow()
.map { it as User }
}