如何从 Cassandra 流式传输所有记录?
How to stream all records from Cassandra?
我需要从 Cassandra 流式传输所有记录。目前我正在使用 akka-persistence-cassandra
来传输数据:
val querier =
PersistenceQuery(system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val selectDistinctPersistenceIds = new SimpleStatement(
"SELECT DISTINCT persistence_id, partition_nr FROM messages")
.setFetchSize(100000)
querier.session.select(selectDistinctPersistenceIds).map { row =>
val id = row.getString(0)
id
}
当记录数在 150 万条左右时,这可以正常工作。但是当记录数超过 150 万条记录时,我会得到 read timeout
错误。
我正在使用:
"com.typesafe.akka" %% "akka-persistence-cassandra" % "0.58"
"com.typesafe.akka" %% "akka-persistence" % "2.6.12"
"com.typesafe.akka" %% "akka-persistence-query" % "2.6.12"
编辑:
错误日志:
com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response", exceptionStackTrace="java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response
at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:552)
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:513)
at akka.persistence.cassandra.package$ListenableFutureConverter$$anon.$anonfun$run(package.scala:25)
...
问题出在您的驱动程序会话设置上,请根据您的需要进行调整。
可能是 gap-timeout 的问题
或增加重试次数。和 超时 设置。
我可以通过为 cassandra-journal.socket.read-timeout-millis
设置比默认值 12000 毫秒更高的值来解决此问题。
cassandra-journal {
...
socket {
# the per-host read timeout in milliseconds. Should be higher than the timeout settings
# used on the Cassandra side.
read-timeout-millis = 30000
}
我需要从 Cassandra 流式传输所有记录。目前我正在使用 akka-persistence-cassandra
来传输数据:
val querier =
PersistenceQuery(system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val selectDistinctPersistenceIds = new SimpleStatement(
"SELECT DISTINCT persistence_id, partition_nr FROM messages")
.setFetchSize(100000)
querier.session.select(selectDistinctPersistenceIds).map { row =>
val id = row.getString(0)
id
}
当记录数在 150 万条左右时,这可以正常工作。但是当记录数超过 150 万条记录时,我会得到 read timeout
错误。
我正在使用:
"com.typesafe.akka" %% "akka-persistence-cassandra" % "0.58"
"com.typesafe.akka" %% "akka-persistence" % "2.6.12"
"com.typesafe.akka" %% "akka-persistence-query" % "2.6.12"
编辑: 错误日志:
com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response", exceptionStackTrace="java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response
at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:552)
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:513)
at akka.persistence.cassandra.package$ListenableFutureConverter$$anon.$anonfun$run(package.scala:25)
...
问题出在您的驱动程序会话设置上,请根据您的需要进行调整。
可能是 gap-timeout 的问题 或增加重试次数。和 超时 设置。
我可以通过为 cassandra-journal.socket.read-timeout-millis
设置比默认值 12000 毫秒更高的值来解决此问题。
cassandra-journal {
...
socket {
# the per-host read timeout in milliseconds. Should be higher than the timeout settings
# used on the Cassandra side.
read-timeout-millis = 30000
}