带有回调的 Cassandra python 驱动程序 execute_async 未按预期工作
Cassandra python driver execute_async with callback not working as expected
我正在尝试使用 datastax python 驱动程序从 Cassandra 查询数据。我希望这个查询尽可能快。我使用的是同步 session.execute
方法,但想使用异步 session.execute_async
方法,因为它不必等待行处理代码完成可能会更有效。
当我尝试使用这种方法时,根据文档,它不起作用。我在 Ubuntu 14.0.4 使用 Cassandra 2.1.3 和 v2.5 cassandra python 驱动程序。
以下代码足以在我的机器上重现该问题:
def print_row_count(rows, label):
for i, row in enumerate(rows):
do_something = row
print "{}: processed {} rows".format(label, i+1)
def print_err(reason):
print "Error: {}".format(reason)
cluster = Cluster(['192.168.1.100'])
session = cluster.connect()
session.set_keyspace("foo")
session.default_fetch_size = 200
future_res = session.execute_async("SELECT * FROM bar LIMIT 1000;")
future_res.add_callback(print_row_count, 'Async')
future_res.add_errback(print_err)
当我执行这段代码时,什么也没有打印出来。
然而,当我将阻塞调用添加到 ResponseFuture.result
时,同步调用和原始异步调用一样被执行。
block_future_res = future_res.result()
print_row_count(block_future_res, 'BlockFuture')
添加运行这两行后的输出:
Async: processed 200 rows
Async: processed 200 rows
Async: processed 200 rows
Async: processed 200 rows
Async: processed 200 rows
BlockFuture: processed 1000 rows
也许我误解了回调系统的工作原理,但我在 docs 中看不到您需要先调用 result()
才能执行回调的任何地方。
感谢任何帮助。
更新:
@WangST 回答了关于回调行为的初始问题
但是我希望能够通过 async_executions 进行分页,下面的代码允许这样做:
future = session.execute_async("SELECT * FROM metadata LIMIT 1000;")
future.add_callback(print_row_count, 'Async')
future.add_errback(print_err)
# Call this once so that the future has_more_pages value is set
future_res = future.result()
while future.has_more_pages:
future.start_fetching_next_page()
future_res = future.result()
没有:
block_future_res = future_res.result()
在 execute_async()
查询完成之前,主线程无事可做并退出。所以没有打印出来。
我正在尝试使用 datastax python 驱动程序从 Cassandra 查询数据。我希望这个查询尽可能快。我使用的是同步 session.execute
方法,但想使用异步 session.execute_async
方法,因为它不必等待行处理代码完成可能会更有效。
当我尝试使用这种方法时,根据文档,它不起作用。我在 Ubuntu 14.0.4 使用 Cassandra 2.1.3 和 v2.5 cassandra python 驱动程序。
以下代码足以在我的机器上重现该问题:
def print_row_count(rows, label):
for i, row in enumerate(rows):
do_something = row
print "{}: processed {} rows".format(label, i+1)
def print_err(reason):
print "Error: {}".format(reason)
cluster = Cluster(['192.168.1.100'])
session = cluster.connect()
session.set_keyspace("foo")
session.default_fetch_size = 200
future_res = session.execute_async("SELECT * FROM bar LIMIT 1000;")
future_res.add_callback(print_row_count, 'Async')
future_res.add_errback(print_err)
当我执行这段代码时,什么也没有打印出来。
然而,当我将阻塞调用添加到 ResponseFuture.result
时,同步调用和原始异步调用一样被执行。
block_future_res = future_res.result()
print_row_count(block_future_res, 'BlockFuture')
添加运行这两行后的输出:
Async: processed 200 rows
Async: processed 200 rows
Async: processed 200 rows
Async: processed 200 rows
Async: processed 200 rows
BlockFuture: processed 1000 rows
也许我误解了回调系统的工作原理,但我在 docs 中看不到您需要先调用 result()
才能执行回调的任何地方。
感谢任何帮助。
更新:
@WangST 回答了关于回调行为的初始问题 但是我希望能够通过 async_executions 进行分页,下面的代码允许这样做:
future = session.execute_async("SELECT * FROM metadata LIMIT 1000;")
future.add_callback(print_row_count, 'Async')
future.add_errback(print_err)
# Call this once so that the future has_more_pages value is set
future_res = future.result()
while future.has_more_pages:
future.start_fetching_next_page()
future_res = future.result()
没有:
block_future_res = future_res.result()
在 execute_async()
查询完成之前,主线程无事可做并退出。所以没有打印出来。