如果一个 Stream 被馈送到 Slick 批插入,那么它是否实现了整个流?
If a Stream is fed to the Slick batch insert then does it materialize the whole stream?
我尝试在 Slick 的帮助下快速将大约 100M 的记录插入 MySQL table。
我天真地期望如果我提供一个测试数据集作为 Stream
那么 Slick 将不会贪婪地使用它:
val testData = Stream.continually(
UUIDRecord(uuid = UUID.randomUUID().toString, value = (Math.random()*100).toLong)
).take(100000000)
val batchInsert:DBIO[Option[Int]] = records ++= testData
val insertResult = db.run(batchInsert)
但我认为我计算错误了,Slick 在将流传递给 MySQL 之前无论如何都会实现流,因为我在 运行ning:
时收到此错误
#
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000b9700000, 281542656, 0) failed; error='Cannot allocate memory' (errno=12)
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 281542656 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /ssd2/projects/StreamingDB/hs_err_pid28154.log
Process finished with exit code 1
你能给点建议吗?我知道 Slick 可以 运行 以流模式查询(即它是一个反应流发布者),但是有没有办法以 "streaming" 方式插入大量记录?
首先你可能对此感兴趣GitHub issue。简而言之,批处理模式需要 JDBC 驱动程序的支持。
即使假设为您启用了批处理模式,它仍然很可能不会像您预期的那样工作。不幸的是,您没有为您的 OOM 提供实际的堆栈跟踪,但我敢打赌它在 MultiInsertAction.run
内部,更具体地说在 st.addBatch()
调用内部,其中 st
是 java.sql.PreparedStatement
的子类。而且问题是即使是batch模式,也必须先累加batch。换句话说,客户端应该累积所有将作为 INSERT
语句的一部分传递的数据,这需要以某种形式实际实现它。所以重点是,即使 Slick 没有具体化流,JDBC 也会。
我能想到的唯一解决方法是将您的数据流显式拆分为一些批次并插入这些较小的批次。你可以考虑这样的事情:
val testData = Stream.continually(
UUIDRecord(uuid = UUID.randomUUID().toString, value = (Math.random()*100).toLong)
).take(100000000)
val BATCH_SIZE = 1000
val futures = testData.grouped(BATCH_SIZE).map(batch => {
val batchInsert: DBIO[Option[Int]] = records ++= batch
db.run(batchInsert)
})
val all: Future[Int] = Future.sequence(futures).map(it => it.map(_.getOrElse(0)).sum)
我尝试在 Slick 的帮助下快速将大约 100M 的记录插入 MySQL table。
我天真地期望如果我提供一个测试数据集作为 Stream
那么 Slick 将不会贪婪地使用它:
val testData = Stream.continually(
UUIDRecord(uuid = UUID.randomUUID().toString, value = (Math.random()*100).toLong)
).take(100000000)
val batchInsert:DBIO[Option[Int]] = records ++= testData
val insertResult = db.run(batchInsert)
但我认为我计算错误了,Slick 在将流传递给 MySQL 之前无论如何都会实现流,因为我在 运行ning:
时收到此错误#
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000b9700000, 281542656, 0) failed; error='Cannot allocate memory' (errno=12)
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 281542656 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /ssd2/projects/StreamingDB/hs_err_pid28154.log
Process finished with exit code 1
你能给点建议吗?我知道 Slick 可以 运行 以流模式查询(即它是一个反应流发布者),但是有没有办法以 "streaming" 方式插入大量记录?
首先你可能对此感兴趣GitHub issue。简而言之,批处理模式需要 JDBC 驱动程序的支持。
即使假设为您启用了批处理模式,它仍然很可能不会像您预期的那样工作。不幸的是,您没有为您的 OOM 提供实际的堆栈跟踪,但我敢打赌它在 MultiInsertAction.run
内部,更具体地说在 st.addBatch()
调用内部,其中 st
是 java.sql.PreparedStatement
的子类。而且问题是即使是batch模式,也必须先累加batch。换句话说,客户端应该累积所有将作为 INSERT
语句的一部分传递的数据,这需要以某种形式实际实现它。所以重点是,即使 Slick 没有具体化流,JDBC 也会。
我能想到的唯一解决方法是将您的数据流显式拆分为一些批次并插入这些较小的批次。你可以考虑这样的事情:
val testData = Stream.continually(
UUIDRecord(uuid = UUID.randomUUID().toString, value = (Math.random()*100).toLong)
).take(100000000)
val BATCH_SIZE = 1000
val futures = testData.grouped(BATCH_SIZE).map(batch => {
val batchInsert: DBIO[Option[Int]] = records ++= batch
db.run(batchInsert)
})
val all: Future[Int] = Future.sequence(futures).map(it => it.map(_.getOrElse(0)).sum)