multiprocessing.dummy.Pool 中线程的本地对象
Object local to a thread in multiprocessing.dummy.Pool
我正在使用 multiprocessing.dummy.Pool
并行发出 RESTful API 调用。
现在代码如下:
from multiprocessing.dummy import Pool
def onecall(args):
env = args[0]
option = args[1]
return env.call(option) # call() returns a list
def call_all():
threadpool = Pool(processes=4)
all_item = []
for item in threadpool.imap_unordered(onecall, ((create_env(), x) for x in range(100))):
all_item.extend(item)
return all_item
在上面的代码中,env
对象包裹了一个requests.Session()
对象,因此负责维护连接会话。 100 个任务使用 100 个不同的 env
对象。因此,每个任务只创建 1 个连接,进行 1 个 API 调用,然后断开连接。
但是,为了享受 HTTP keep-alive 的好处,我希望 100 个任务共享 4 个 env
对象(每个线程一个对象)以便每个连接服务多个 API 调用一个 -一个。我该如何实现?
使用 threading.local
似乎可行。
from multiprocessing.dummy import Pool
import threading
tlocal = threading.local()
def getEnv():
try:
return tlocal.env
except AttributeError:
tlocal.env = create_env()
return tlocal.env
def onecall(args):
option = args[0]
return getEnv().call(option) # call() returns a list
def call_all():
threadpool = Pool(processes=4)
all_item = []
for item in threadpool.imap_unordered(onecall, ((x,) for x in range(100))):
all_item.extend(item)
return all_item
我正在使用 multiprocessing.dummy.Pool
并行发出 RESTful API 调用。
现在代码如下:
from multiprocessing.dummy import Pool
def onecall(args):
env = args[0]
option = args[1]
return env.call(option) # call() returns a list
def call_all():
threadpool = Pool(processes=4)
all_item = []
for item in threadpool.imap_unordered(onecall, ((create_env(), x) for x in range(100))):
all_item.extend(item)
return all_item
在上面的代码中,env
对象包裹了一个requests.Session()
对象,因此负责维护连接会话。 100 个任务使用 100 个不同的 env
对象。因此,每个任务只创建 1 个连接,进行 1 个 API 调用,然后断开连接。
但是,为了享受 HTTP keep-alive 的好处,我希望 100 个任务共享 4 个 env
对象(每个线程一个对象)以便每个连接服务多个 API 调用一个 -一个。我该如何实现?
使用 threading.local
似乎可行。
from multiprocessing.dummy import Pool
import threading
tlocal = threading.local()
def getEnv():
try:
return tlocal.env
except AttributeError:
tlocal.env = create_env()
return tlocal.env
def onecall(args):
option = args[0]
return getEnv().call(option) # call() returns a list
def call_all():
threadpool = Pool(processes=4)
all_item = []
for item in threadpool.imap_unordered(onecall, ((x,) for x in range(100))):
all_item.extend(item)
return all_item