Python Twisted - 运行 多个回调
Python Twisted - running multiple callbacks
我对 Twisted 很陌生,我真的需要一件事 - 运行 任意数量的函数(从同一开始),收集所有函数的结果并进行一些处理。
这是我的资料:
from twisted.internet import defer
import time
# slow computing query
def process_data(num, data):
time.sleep(5)
array = []
# mock the results obtained from processed data
for i in range(0, 5):
array.append(num)
return array
def process_results(arrays):
# this should collect return arrays of all callbacks
print arrays
data = []
callbacks_refs = []
for i in range(0, 5):
d=defer.Deferred()
d.addCallback(process_data)
callbacks_refs.append(d)
callbacks = defer.DeferredList(callbacks_refs)
callbacks.addCallback(process_results)
for i, d in enumerate(callbacks_refs):
d.callback(i, data)
我希望最后一个 for 循环将开始异步执行所有回调(就像通常使用 Promises 一样)并且所有结果将传递给将在所有回调之后执行的 process_results 函数来自 callbacks_refs 完成,但我觉得我大错特错了。
我不知道这是否有变通方法,但是根据您制作 defer.callback()
的方式,您将错误的参数传递给您的回调。
如果您要附加一个 errback 和一个回调,您可能会发现您只是得到了一堆失败的结果...所以它可以工作,但没有按预期工作。
我看到两个修复程序。
from functools import partial
for i in range(0, 5):
d=defer.Deferred()
d.addCallback(partial(process_data,i,data[i]))
# This partial is still kinda crooked, but hopefully I have made my point
callbacks_refs.append(d)
或更改您在回调函数中传递数据的方式
# slow computing query
def process_data(data_dict):
#data_dict['num']
#data_dict['data']
#...and further down
d.callback({'num':4,'data':(1,2,3)})
抱歉,我对 deferredlist 不是很熟悉,但我认为一旦你修复了 deferred,deferredlist 可能会自动工作。
我不知道您的示例与您的实际代码有多相似,但示例代码显示了对 Twisted 功能的一些误解。 Twisted 不会神奇地使您的同步代码异步。您正在阻止 time.sleep
中的事件循环。如果你正在做一些 CPU-bound(相对于 I/O bound),你可以使用多个线程或进程。
我假设process_data
是一个阻塞调用,并为您提供基于多线程的解决方案:
import time
from twisted.internet import defer, task, threads
# slow computing query
def process_data(num):
time.sleep(5)
array = []
# mock the results obtained from processed data
for i in range(0, 5):
array.append(num)
return array
def process_results(arrays):
# this should collect return arrays of all callbacks
print arrays
def main(_):
callbacks_refs = []
for i in range(0, 5):
callbacks_refs.append(threads.deferToThread(process_data, i))
callbacks = defer.DeferredList(callbacks_refs)
callbacks.addCallback(process_results)
return callbacks
task.react(main)
我还会给你一个关于 Twisted 编程的一般性建议 - 如果你发现自己在输入 d = defer.Deferred()
,你的设计可能有问题。
我对 Twisted 很陌生,我真的需要一件事 - 运行 任意数量的函数(从同一开始),收集所有函数的结果并进行一些处理。
这是我的资料:
from twisted.internet import defer
import time
# slow computing query
def process_data(num, data):
time.sleep(5)
array = []
# mock the results obtained from processed data
for i in range(0, 5):
array.append(num)
return array
def process_results(arrays):
# this should collect return arrays of all callbacks
print arrays
data = []
callbacks_refs = []
for i in range(0, 5):
d=defer.Deferred()
d.addCallback(process_data)
callbacks_refs.append(d)
callbacks = defer.DeferredList(callbacks_refs)
callbacks.addCallback(process_results)
for i, d in enumerate(callbacks_refs):
d.callback(i, data)
我希望最后一个 for 循环将开始异步执行所有回调(就像通常使用 Promises 一样)并且所有结果将传递给将在所有回调之后执行的 process_results 函数来自 callbacks_refs 完成,但我觉得我大错特错了。
我不知道这是否有变通方法,但是根据您制作 defer.callback()
的方式,您将错误的参数传递给您的回调。
如果您要附加一个 errback 和一个回调,您可能会发现您只是得到了一堆失败的结果...所以它可以工作,但没有按预期工作。
我看到两个修复程序。
from functools import partial
for i in range(0, 5):
d=defer.Deferred()
d.addCallback(partial(process_data,i,data[i]))
# This partial is still kinda crooked, but hopefully I have made my point
callbacks_refs.append(d)
或更改您在回调函数中传递数据的方式
# slow computing query
def process_data(data_dict):
#data_dict['num']
#data_dict['data']
#...and further down
d.callback({'num':4,'data':(1,2,3)})
抱歉,我对 deferredlist 不是很熟悉,但我认为一旦你修复了 deferred,deferredlist 可能会自动工作。
我不知道您的示例与您的实际代码有多相似,但示例代码显示了对 Twisted 功能的一些误解。 Twisted 不会神奇地使您的同步代码异步。您正在阻止 time.sleep
中的事件循环。如果你正在做一些 CPU-bound(相对于 I/O bound),你可以使用多个线程或进程。
我假设process_data
是一个阻塞调用,并为您提供基于多线程的解决方案:
import time
from twisted.internet import defer, task, threads
# slow computing query
def process_data(num):
time.sleep(5)
array = []
# mock the results obtained from processed data
for i in range(0, 5):
array.append(num)
return array
def process_results(arrays):
# this should collect return arrays of all callbacks
print arrays
def main(_):
callbacks_refs = []
for i in range(0, 5):
callbacks_refs.append(threads.deferToThread(process_data, i))
callbacks = defer.DeferredList(callbacks_refs)
callbacks.addCallback(process_results)
return callbacks
task.react(main)
我还会给你一个关于 Twisted 编程的一般性建议 - 如果你发现自己在输入 d = defer.Deferred()
,你的设计可能有问题。