线程池是如何工作的,以及如何在像 NodeJS 这样的 async/await 环境中实现它?
How does thread pooling works, and how to implement it in an async/await env like NodeJS?
我需要 运行 一个带有 10_000 参数的函数 int f(int i)
,由于 I/O 的时间,它需要大约 1 秒的时间来执行。
在像 Python 这样的语言中,我可以使用线程(或者 async/await
,我知道,但我稍后会谈到)来并行化这个任务。
如果我想始终拥有 10 个 运行ning 线程,并在它们之间拆分任务,我可以使用 ThreadingPool :
def f(p):
x = [...]
return x
p = ThreadPool()
xs = p.map(f, range(10_000))
但是它是如何工作的?如果我想用 NodeJS 和 f = http("www.google.com", callback)
实现类似的东西,我应该从哪里开始? 这种问题的算法是什么?
同样,我想同时收到 10 个请求,当一个完成后,下一个应该开始。
到目前为止我一直在想什么(很难看,因为回调正在开始对 f() 函数的新调用):
queue = ["www.google.com", "www.facebook.com"]
var f = function(url) {
http.get(url, (e) => {
const newUrl = queue.pop();
f(newUrl);
});
};
for (var i = 0; i < 10; i++) {
f(queue.pop());
}
不确定线程池和其他库是如何实现的,但这里有一个提示:使用队列来计算有多少 tasks/threads 是 运行。
我没有尝试这段代码,但它可以给你一个想法:我们创建一个线程,每 0.2 秒检查一次是否应该启动另一个线程。
然而,这意味着大量的上下文切换并且可能效率不高。
class Pool:
def __init__(self, func: Callable, params: list, thread_max = 10):
self.func = func
self.params = params
self.running = 0
self.finished = []
self.thread_max = thread_max
self.threads = []
def start(self):
Thread(target=check, args=(0.2)).start()
def check(self, t_sleep=0.5):
done = False
while not done:
sleep(t_sleep)
# first check for finished threads
for t in threads:
if not t.isAlive():
# do something with return value
# ...
self.threads.remove(t)
if not len(self.params): # mean there is no more task left to LAUNCH
done = len(self.threads) # gonna be 0 when every tasks is COMPLETE
continue # avoid the next part (launching thread)
# now start some threads if needed
while len(self.threads) < self.thread_max:
arg = self.params.pop()
thread = Thread(target=self.func, args=(arg, ))
threads.insert(thread)
thread.start()
但是我对 async/await 没有任何线索(关键字现在在 python 中可用)
重新实现我链接到的那个 Bluebird 函数:
const mapWithConcurrency = async (values, concurrency, fn) => {
let i = 0;
let results = values.map(() => null);
const work = async () => {
while (i < values.length) {
const current = i++;
results[current] = await fn(values[current]);
}
};
await Promise.all(Array.from({length: concurrency}, work));
return results;
};
mapWithConcurrency(Array.from({length: 30 * 15}, (_, i) => i), 10, async i => {
const el = document.body.appendChild(document.createElement('i'));
el.style.left = 5 * (i % 30) + 'px';
el.style.top = 5 * (i / 30 | 0) + 'px';
await new Promise(resolve => { setTimeout(resolve, Math.random() * 500); });
el.style.background = 'black';
return 2 * i;
}).then(results => {
console.log(results.length, results.every((x, i) => x === 2 * i));
});
i {
background: grey;
transition: background 0.3s ease-out;
position: absolute;
width: 5px;
height: 5px;
}
在python中,线程池只使用了1个cpu核心。但是由于你的任务是 I/O 有界的,它会比串行执行 10k 函数调用做得更好。
为了做得更好,您可以尝试进程池,它可以利用多个内核。或者甚至将 asyncio 与进程结合起来。根据您的问题,使用线程池作为基线,使用这两种方法可能会或可能不会进一步加速。
参见 this example of combining thread/process with asyncio。它应该直接适用于您的情况。您的函数 f
等同于它们的函数 block
.
在Python 3.6中,asyncio代码的一般形式是创建一个事件循环到运行一个异步函数。一个非常简单的例子是
import asyncio
async def coroutine():
print('in coroutine')
coro = coroutine()
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(coro)
event_loop.close()
为了简单起见,可以把async def
函数的return想成是要执行的东西(协程),循环执行。如果有N个任务要异步执行,你可以用N个async def
函数定义它们,另外一个await
s它们。最后一个 async
函数定义了 'finish' 对 N 个任务的意义。例如,可能 'finish' 表示所有 N 个任务都已完成,或者只要完成其中 1 个任务,等等。循环执行第 N+1 个函数。
在 Python 3.7 中,asyncio API 发生了一些变化,不需要显式创建循环。
您可以在 my blog post.
中找到一些示例
迟到的答案,但我通常处理最大线程限制为X
的多线程的方式如下:
import threading
import requests, json
import time
from urllib.parse import urlparse
final_dict = {} # will hold final results
def parser(u):
try:
parsed_uri = urlparse(u) # parse url to get domain name that'l be used as key in final_dict
domain = "{uri.netloc}".format(uri=parsed_uri)
x = requests.get(u)
status_code = x.status_code
headers = x.headers
cookies = x.cookies
# OR cookies = ";".join(f"{k}:{v}" for k,v in x.cookies.iteritems())
html = x.text
# do something with the parsed url, in this case, I created a dictionary containing info about the parsed url: timestamp, url, status_code, html, headers and cookies
if not domain in final_dict:
final_dict[domain] = []
final_dict[domain].append( {'ts': time.time(), 'url': u, 'status': status_code , 'headers': str(headers), 'cookies': str(cookies), 'html': html} )
except Exception as e:
pass
print(e)
return {}
max_threads = 10
urls = ['https://google.com','https://www.facebook.com', 'https://google.com/search?q=hello+world', 'https://www.facebook.com/messages/', 'https://google.com/search?q=learn+python', 'https://www.facebook.com/me/photos', 'https://google.com/search?q=visit+lisboa', 'https://www.facebook.com/me/photos_albums']
for u in urls:
threading.Thread(target=parser, args=[u]).start()
tc = threading.active_count()
while tc == max_threads:
tc = threading.active_count()
time.sleep(0.2)
while tc != 1: # wait for threads to finish, when tc == 1 no more threads are running apart from the main process.
tc = threading.active_count()
time.sleep(0.2)
print(json.dumps(final_dict))
'''
# save to file
with open("output.json", "w") as f:
f.write(json.dumps(final_dict))
# load from file
with open("output.json") as f:
_json = json.loads(f.read())
'''
输出:
- 请检查上面生成的
json
:https://jsoneditoronline.org/?id=403e55d841394a5a83dbbda98d5f2ccd
- 上面的代码是,某种程度上,"my own code",我的意思是它在以前的项目中使用过,它可能无法完全回答你的问题,但希望它对未来的用户来说是一个很好的资源.
- 在
Linux
上,我通常将 max_threads
设置为 250
,在 Windows
上设置为
大约 150
.
要具有与 nodejs 相似的行为,您必须使用响应式 x 编程。您正在寻找的是 rxpy 。
https://github.com/ReactiveX/RxPY
看看我新发布的模块:concurrency-controller
它可以在给定的并发度下并发调用函数。
我需要 运行 一个带有 10_000 参数的函数 int f(int i)
,由于 I/O 的时间,它需要大约 1 秒的时间来执行。
在像 Python 这样的语言中,我可以使用线程(或者 async/await
,我知道,但我稍后会谈到)来并行化这个任务。
如果我想始终拥有 10 个 运行ning 线程,并在它们之间拆分任务,我可以使用 ThreadingPool :
def f(p):
x = [...]
return x
p = ThreadPool()
xs = p.map(f, range(10_000))
但是它是如何工作的?如果我想用 NodeJS 和 f = http("www.google.com", callback)
实现类似的东西,我应该从哪里开始? 这种问题的算法是什么?
同样,我想同时收到 10 个请求,当一个完成后,下一个应该开始。
到目前为止我一直在想什么(很难看,因为回调正在开始对 f() 函数的新调用):
queue = ["www.google.com", "www.facebook.com"]
var f = function(url) {
http.get(url, (e) => {
const newUrl = queue.pop();
f(newUrl);
});
};
for (var i = 0; i < 10; i++) {
f(queue.pop());
}
不确定线程池和其他库是如何实现的,但这里有一个提示:使用队列来计算有多少 tasks/threads 是 运行。
我没有尝试这段代码,但它可以给你一个想法:我们创建一个线程,每 0.2 秒检查一次是否应该启动另一个线程。
然而,这意味着大量的上下文切换并且可能效率不高。
class Pool:
def __init__(self, func: Callable, params: list, thread_max = 10):
self.func = func
self.params = params
self.running = 0
self.finished = []
self.thread_max = thread_max
self.threads = []
def start(self):
Thread(target=check, args=(0.2)).start()
def check(self, t_sleep=0.5):
done = False
while not done:
sleep(t_sleep)
# first check for finished threads
for t in threads:
if not t.isAlive():
# do something with return value
# ...
self.threads.remove(t)
if not len(self.params): # mean there is no more task left to LAUNCH
done = len(self.threads) # gonna be 0 when every tasks is COMPLETE
continue # avoid the next part (launching thread)
# now start some threads if needed
while len(self.threads) < self.thread_max:
arg = self.params.pop()
thread = Thread(target=self.func, args=(arg, ))
threads.insert(thread)
thread.start()
但是我对 async/await 没有任何线索(关键字现在在 python 中可用)
重新实现我链接到的那个 Bluebird 函数:
const mapWithConcurrency = async (values, concurrency, fn) => {
let i = 0;
let results = values.map(() => null);
const work = async () => {
while (i < values.length) {
const current = i++;
results[current] = await fn(values[current]);
}
};
await Promise.all(Array.from({length: concurrency}, work));
return results;
};
mapWithConcurrency(Array.from({length: 30 * 15}, (_, i) => i), 10, async i => {
const el = document.body.appendChild(document.createElement('i'));
el.style.left = 5 * (i % 30) + 'px';
el.style.top = 5 * (i / 30 | 0) + 'px';
await new Promise(resolve => { setTimeout(resolve, Math.random() * 500); });
el.style.background = 'black';
return 2 * i;
}).then(results => {
console.log(results.length, results.every((x, i) => x === 2 * i));
});
i {
background: grey;
transition: background 0.3s ease-out;
position: absolute;
width: 5px;
height: 5px;
}
在python中,线程池只使用了1个cpu核心。但是由于你的任务是 I/O 有界的,它会比串行执行 10k 函数调用做得更好。
为了做得更好,您可以尝试进程池,它可以利用多个内核。或者甚至将 asyncio 与进程结合起来。根据您的问题,使用线程池作为基线,使用这两种方法可能会或可能不会进一步加速。
参见 this example of combining thread/process with asyncio。它应该直接适用于您的情况。您的函数 f
等同于它们的函数 block
.
在Python 3.6中,asyncio代码的一般形式是创建一个事件循环到运行一个异步函数。一个非常简单的例子是
import asyncio
async def coroutine():
print('in coroutine')
coro = coroutine()
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(coro)
event_loop.close()
为了简单起见,可以把async def
函数的return想成是要执行的东西(协程),循环执行。如果有N个任务要异步执行,你可以用N个async def
函数定义它们,另外一个await
s它们。最后一个 async
函数定义了 'finish' 对 N 个任务的意义。例如,可能 'finish' 表示所有 N 个任务都已完成,或者只要完成其中 1 个任务,等等。循环执行第 N+1 个函数。
在 Python 3.7 中,asyncio API 发生了一些变化,不需要显式创建循环。 您可以在 my blog post.
中找到一些示例迟到的答案,但我通常处理最大线程限制为X
的多线程的方式如下:
import threading
import requests, json
import time
from urllib.parse import urlparse
final_dict = {} # will hold final results
def parser(u):
try:
parsed_uri = urlparse(u) # parse url to get domain name that'l be used as key in final_dict
domain = "{uri.netloc}".format(uri=parsed_uri)
x = requests.get(u)
status_code = x.status_code
headers = x.headers
cookies = x.cookies
# OR cookies = ";".join(f"{k}:{v}" for k,v in x.cookies.iteritems())
html = x.text
# do something with the parsed url, in this case, I created a dictionary containing info about the parsed url: timestamp, url, status_code, html, headers and cookies
if not domain in final_dict:
final_dict[domain] = []
final_dict[domain].append( {'ts': time.time(), 'url': u, 'status': status_code , 'headers': str(headers), 'cookies': str(cookies), 'html': html} )
except Exception as e:
pass
print(e)
return {}
max_threads = 10
urls = ['https://google.com','https://www.facebook.com', 'https://google.com/search?q=hello+world', 'https://www.facebook.com/messages/', 'https://google.com/search?q=learn+python', 'https://www.facebook.com/me/photos', 'https://google.com/search?q=visit+lisboa', 'https://www.facebook.com/me/photos_albums']
for u in urls:
threading.Thread(target=parser, args=[u]).start()
tc = threading.active_count()
while tc == max_threads:
tc = threading.active_count()
time.sleep(0.2)
while tc != 1: # wait for threads to finish, when tc == 1 no more threads are running apart from the main process.
tc = threading.active_count()
time.sleep(0.2)
print(json.dumps(final_dict))
'''
# save to file
with open("output.json", "w") as f:
f.write(json.dumps(final_dict))
# load from file
with open("output.json") as f:
_json = json.loads(f.read())
'''
输出:
- 请检查上面生成的
json
:https://jsoneditoronline.org/?id=403e55d841394a5a83dbbda98d5f2ccd - 上面的代码是,某种程度上,"my own code",我的意思是它在以前的项目中使用过,它可能无法完全回答你的问题,但希望它对未来的用户来说是一个很好的资源.
- 在
Linux
上,我通常将max_threads
设置为250
,在Windows
上设置为 大约150
.
要具有与 nodejs 相似的行为,您必须使用响应式 x 编程。您正在寻找的是 rxpy 。 https://github.com/ReactiveX/RxPY
看看我新发布的模块:concurrency-controller
它可以在给定的并发度下并发调用函数。