线程池是如何工作的,以及如何在像 NodeJS 这样的 async/await 环境中实现它?

How does thread pooling works, and how to implement it in an async/await env like NodeJS?

我需要 运行 一个带有 10_000 参数的函数 int f(int i),由于 I/O 的时间,它需要大约 1 秒的时间来执行。
在像 Python 这样的语言中,我可以使用线程(或者 async/await,我知道,但我稍后会谈到)来并行化这个任务。
如果我想始终拥有 10 个 运行ning 线程,并在它们之间拆分任务,我可以使用 ThreadingPool :

def f(p):
    x = [...]
    return x

p = ThreadPool()
xs = p.map(f, range(10_000))

但是它是如何工作的?如果我想用 NodeJS 和 f = http("www.google.com", callback) 实现类似的东西,我应该从哪里开始? 这种问题的算法是什么?
同样,我想同时收到 10 个请求,当一个完成后,下一个应该开始。

到目前为止我一直在想什么(很难看,因为回调正在开始对 f() 函数的新调用):

queue = ["www.google.com", "www.facebook.com"]
var f = function(url) {
  http.get(url, (e) => {
    const newUrl = queue.pop();
    f(newUrl);
  });
};

for (var i = 0; i < 10; i++) {
  f(queue.pop());
}

不确定线程​​池和其他库是如何实现的,但这里有一个提示:使用队列来计算有多少 tasks/threads 是 运行。
我没有尝试这段代码,但它可以给你一个想法:我们创建一个线程,每 0.2 秒检查一次是否应该启动另一个线程。
然而,这意味着大量的上下文切换并且可能效率不高。

class Pool:
    def __init__(self, func: Callable, params: list, thread_max = 10):
        self.func = func
        self.params = params
        self.running = 0
        self.finished = []
        self.thread_max = thread_max
        self.threads = []

    def start(self):
        Thread(target=check, args=(0.2)).start()

    def check(self, t_sleep=0.5):
        done = False
        while not done:
            sleep(t_sleep)
            # first check for finished threads
            for t in threads:
                if not t.isAlive():
                    # do something with return value
                    # ...
                    self.threads.remove(t)

            if not len(self.params): # mean there is no more task left to LAUNCH
                done = len(self.threads) # gonna be 0 when every tasks is COMPLETE
                continue # avoid the next part (launching thread)

            # now start some threads if needed
            while len(self.threads) < self.thread_max:
                arg = self.params.pop()
                thread = Thread(target=self.func, args=(arg, ))
                threads.insert(thread)
                thread.start()

但是我对 async/await 没有任何线索(关键字现在在 python 中可用)

重新实现我链接到的那个 Bluebird 函数:

const mapWithConcurrency = async (values, concurrency, fn) => {
    let i = 0;
    let results = values.map(() => null);

    const work = async () => {
        while (i < values.length) {
            const current = i++;
            results[current] = await fn(values[current]);
        }
    };

    await Promise.all(Array.from({length: concurrency}, work));

    return results;
};

mapWithConcurrency(Array.from({length: 30 * 15}, (_, i) => i), 10, async i => {
    const el = document.body.appendChild(document.createElement('i'));
    el.style.left = 5 * (i % 30) + 'px';
    el.style.top = 5 * (i / 30 | 0) + 'px';
    await new Promise(resolve => { setTimeout(resolve, Math.random() * 500); });
    el.style.background = 'black';
    return 2 * i;
}).then(results => {
    console.log(results.length, results.every((x, i) => x === 2 * i));
});
i {
    background: grey;
    transition: background 0.3s ease-out;
    position: absolute;
    width: 5px;
    height: 5px;
}

在python中,线程池只使用了1个cpu核心。但是由于你的任务是 I/O 有界的,它会比串行执行 10k 函数调用做得更好。

为了做得更好,您可以尝试进程池,它可以利用多个内核。或者甚至将 asyncio 与进程结合起来。根据您的问题,使用线程池作为基线,使用这两种方法可能会或可能不会进一步加速。

参见 this example of combining thread/process with asyncio。它应该直接适用于您的情况。您的函数 f 等同于它们的函数 block.

在Python 3.6中,asyncio代码的一般形式是创建一个事件循环到运行一个异步函数。一个非常简单的例子是

import asyncio

async def coroutine():
    print('in coroutine')

coro = coroutine()
event_loop = asyncio.get_event_loop()

event_loop.run_until_complete(coro)
event_loop.close()

为了简单起见,可以把async def函数的return想成是要执行的东西(协程),循环执行。如果有N个任务要异步执行,你可以用N个async def函数定义它们,另外一个awaits它们。最后一个 async 函数定义了 'finish' 对 N 个任务的意义。例如,可能 'finish' 表示所有 N 个任务都已完成,或者只要完成其中 1 个任务,等等。循环执行第 N+1 个函数。

在 Python 3.7 中,asyncio API 发生了一些变化,不需要显式创建循环。 您可以在 my blog post.

中找到一些示例

迟到的答案,但我通常处理最大线程限制为X的多线程的方式如下:

import threading
import requests, json
import time
from urllib.parse import urlparse

final_dict = {} # will hold final results

def parser(u):
    try:
        parsed_uri = urlparse(u) # parse url to get domain name that'l be used as key in final_dict
        domain = "{uri.netloc}".format(uri=parsed_uri)
        x = requests.get(u)
        status_code = x.status_code
        headers = x.headers
        cookies = x.cookies
        # OR cookies = ";".join(f"{k}:{v}" for k,v in x.cookies.iteritems())
        html = x.text
        # do something with the parsed url, in this case, I created a dictionary containing info about the parsed url: timestamp, url, status_code, html, headers and cookies
        if not domain in final_dict:
            final_dict[domain] = []
        final_dict[domain].append( {'ts': time.time(), 'url': u, 'status': status_code , 'headers': str(headers), 'cookies': str(cookies), 'html': html} )

    except Exception as e:
        pass
        print(e)
        return {}

max_threads = 10
urls = ['https://google.com','https://www.facebook.com', 'https://google.com/search?q=hello+world', 'https://www.facebook.com/messages/', 'https://google.com/search?q=learn+python', 'https://www.facebook.com/me/photos', 'https://google.com/search?q=visit+lisboa', 'https://www.facebook.com/me/photos_albums']

for u in urls:
    threading.Thread(target=parser, args=[u]).start()
    tc = threading.active_count()
    while tc == max_threads:
        tc = threading.active_count()
        time.sleep(0.2)

while tc != 1: # wait for threads to finish, when tc == 1 no more threads are running apart from the main process.
    tc = threading.active_count()
    time.sleep(0.2)

print(json.dumps(final_dict))

'''
# save to file
with open("output.json", "w") as f:
    f.write(json.dumps(final_dict))

# load from file
with open("output.json") as f:
    _json = json.loads(f.read())
'''

输出:

  1. 请检查上面生成的 jsonhttps://jsoneditoronline.org/?id=403e55d841394a5a83dbbda98d5f2ccd
  2. 上面的代码是,某种程度上,"my own code",我的意思是它在以前的项目中使用过,它可能无法完全回答你的问题,但希望它对未来的用户来说是一个很好的资源.
  3. Linux 上,我通常将 max_threads 设置为 250,在 Windows 上设置为 大约 150.

要具有与 nodejs 相似的行为,您必须使用响应式 x 编程。您正在寻找的是 rxpy 。 https://github.com/ReactiveX/RxPY

看看我新发布的模块:concurrency-controller

它可以在给定的并发度下并发调用函数。