Cassandra - 有没有办法限制异步查询的数量?
Cassandra - Is there a way to limit number of async queries?
我想知道是否有办法限制 cassandra java 驱动程序同时执行的查询数量?
目前,我执行了很多如下查询:
...
PreparedStatement stmt = session.prepare("SELECT * FROM users WHERE id = ?");
BoundStatement boundStatement = new BoundStatement(stmt);
List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(list.length);
for(String id : list ) {
futures.add(session.executeAsync(boundStatement.bind(id)));
}
for (ListenableFuture<ResultSet> future : futures) {
ResultSet rs = future.get();
... // do some stuff
}
不幸的是,这可能会导致 NoHostAvailableException。
谢谢。
您可以使用信号量来限制并发查询的数量:
final Semaphore semaphore = new Semaphore(numberOfConcurrentQueries);
...
semaphore.acquire();
try {
ResultSetFuture future = session.executeAsync("...");
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
semaphore.release();
}
@Override
public void onFailure(Throwable t) {
semaphore.release();
}
});
} catch (Exception e) {
semaphore.release();
}
但归根结底,情况并没有太大不同:当您超出容量时,信号量不会得到 NoHostAvailableException
,而是会阻塞(如果您使用定时版本的 acquire,则将抛出)。因此,您可能还想对触发这些查询的组件应用背压。
您可能还想调整连接池以调整容量,请参阅 our docs(适用于 2.1,如果您使用的是 2.0,请使用页面顶部的下拉菜单)。
我想知道是否有办法限制 cassandra java 驱动程序同时执行的查询数量?
目前,我执行了很多如下查询:
...
PreparedStatement stmt = session.prepare("SELECT * FROM users WHERE id = ?");
BoundStatement boundStatement = new BoundStatement(stmt);
List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(list.length);
for(String id : list ) {
futures.add(session.executeAsync(boundStatement.bind(id)));
}
for (ListenableFuture<ResultSet> future : futures) {
ResultSet rs = future.get();
... // do some stuff
}
不幸的是,这可能会导致 NoHostAvailableException。
谢谢。
您可以使用信号量来限制并发查询的数量:
final Semaphore semaphore = new Semaphore(numberOfConcurrentQueries);
...
semaphore.acquire();
try {
ResultSetFuture future = session.executeAsync("...");
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
semaphore.release();
}
@Override
public void onFailure(Throwable t) {
semaphore.release();
}
});
} catch (Exception e) {
semaphore.release();
}
但归根结底,情况并没有太大不同:当您超出容量时,信号量不会得到 NoHostAvailableException
,而是会阻塞(如果您使用定时版本的 acquire,则将抛出)。因此,您可能还想对触发这些查询的组件应用背压。
您可能还想调整连接池以调整容量,请参阅 our docs(适用于 2.1,如果您使用的是 2.0,请使用页面顶部的下拉菜单)。