带有回调的 Cassandra python 驱动程序 execute_async 未按预期工作

Cassandra python driver execute_async with callback not working as expected

我正在尝试使用 datastax python 驱动程序从 Cassandra 查询数据。我希望这个查询尽可能快。我使用的是同步 session.execute 方法,但想使用异步 session.execute_async 方法,因为它不必等待行处理代码完成可能会更有效。

当我尝试使用这种方法时,根据文档,它不起作用。我在 Ubuntu 14.0.4 使用 Cassandra 2.1.3 和 v2.5 cassandra python 驱动程序。

以下代码足以在我的机器上重现该问题:

def print_row_count(rows, label):
    for i, row in enumerate(rows):
        do_something = row
    print "{}: processed {} rows".format(label, i+1)

def print_err(reason):
    print "Error: {}".format(reason)

cluster = Cluster(['192.168.1.100'])
session = cluster.connect()
session.set_keyspace("foo")
session.default_fetch_size = 200
future_res = session.execute_async("SELECT * FROM bar LIMIT 1000;")
future_res.add_callback(print_row_count, 'Async')
future_res.add_errback(print_err)

当我执行这段代码时,什么也没有打印出来。

然而,当我将阻塞调用添加到 ResponseFuture.result 时,同步调用和原始异步调用一样被执行。

block_future_res = future_res.result()
print_row_count(block_future_res, 'BlockFuture') 

添加运行这两行后的输出:

Async: processed 200 rows
Async: processed 200 rows
Async: processed 200 rows
Async: processed 200 rows
Async: processed 200 rows
BlockFuture: processed 1000 rows

也许我误解了回调系统的工作原理,但我在 docs 中看不到您需要先调用 result() 才能执行回调的任何地方。

感谢任何帮助。

更新:

@WangST 回答了关于回调行为的初始问题 但是我希望能够通过 async_executions 进行分页,下面的代码允许这样做:

future = session.execute_async("SELECT * FROM metadata LIMIT 1000;")
future.add_callback(print_row_count, 'Async')
future.add_errback(print_err)

# Call this once so that the future has_more_pages value is set
future_res = future.result()
while future.has_more_pages:
    future.start_fetching_next_page()
    future_res = future.result()

没有:

block_future_res = future_res.result()

execute_async() 查询完成之前,主线程无事可做并退出。所以没有打印出来。