Celery 工人的利用率随着工人的增加而降低
Celery Workers' Utilization Decreases With More Workers
我试图在尽可能短的时间内发出数千个 GET 请求。我需要以可扩展的方式这样做:将我用来发出请求的服务器数量加倍应该将完成固定数量的 URL 的时间减半。
我将 Celery 与 eventlet 池一起使用,并将 RabbitMQ 作为代理。我在 --concurrency 100
每个工作服务器上生成一个工作进程,并有一个专门的主服务器发布任务(下面的代码)。我没有得到我期望的结果:当使用的工作服务器数量加倍时,完成时间根本没有减少。
似乎随着我添加更多工作服务器,每个工作服务器的利用率下降(如 Flower 所报告)。例如,有 2 个 worker,在整个执行过程中,每个 worker 的活动线程数徘徊在 80 到 90 之间(正如预期的那样,因为并发为 100)。但是,对于 6 个工作线程,每个工作线程的活动线程数徘徊在 10 到 20 之间。
这几乎就像队列大小太小,或者工作服务器无法足够快地从队列中拉出任务以充分利用,并且随着您添加更多的工作人员,他们很难快速从队列中拉出任务.
urls = ["https://...", ..., "https://..."]
tasks = []
num = 0
for url in urls:
num = num + 1
tasks.append(fetch_url.s(num, url))
job = group(tasks)
start = time.time()
res = job.apply_async()
res.join()
print time.time() - start
更新: 我附上了使用 1 个工作服务器、2 个工作服务器等最多 5 个工作服务器时成功任务与时间的关系图。如您所见,任务完成率从 1 台工作服务器增加到 2 台工作服务器,但随着我添加更多服务器,任务完成率开始趋于平稳。
对于未来的读者。有帮助的行动,最重要的利益优先:
- 将几个小工作单元组合成一个 celery 任务
- 将 Celery 代理从 RabbitMQ 切换到 Redis
原始评论讨论中未提及更多有用的提示,因此对于此问题的益处意义未知。
- 使用
httplib2
或 urllib3
或更好的 HTTP 库。 requests
无故燃烧 CPU
- 使用 HTTP 连接池。检查并确保重复使用与目标服务器的永久连接。
分块解释。
分块前
urls = [...]
function task(url)
response = http_fetch(url)
return process(response.body)
celery.apply_async url1
celery.apply_async url2
...
所以任务队列包含N=len(urls)个任务,每个任务都是获取单个url,对响应进行一些计算。
分块
function chunk(xs, n)
loop:
g, rest = xs[:n], xs[n:]
yield g
chunks = [ [url1, url2, url3], [4, 5, 6], ... ]
function task(chunk)
pool = eventlet.GreenPool()
result = {
response.url: process(response)
for response in pool.imap(http_fetch, chunk)
}
return result
celery.apply_async chunk1
celery.apply_async chunk2
...
现在任务队列包含M=len(urls)/chunksize个任务,每个任务都是获取chunksizeurls个并处理所有响应。现在你必须在单个块中多路复用并发 url 提取。这里用Eventlet GreenPool就搞定了。
注意,因为 Python,首先执行所有网络 IO,然后对块中的所有响应执行所有 CPU 计算,通过多个 celery worker 分摊 CPU 负载可能是有益的.
此答案中的所有代码仅显示一般方向。您必须使用更少的复制和分配实现更好的版本。
我试图在尽可能短的时间内发出数千个 GET 请求。我需要以可扩展的方式这样做:将我用来发出请求的服务器数量加倍应该将完成固定数量的 URL 的时间减半。
我将 Celery 与 eventlet 池一起使用,并将 RabbitMQ 作为代理。我在 --concurrency 100
每个工作服务器上生成一个工作进程,并有一个专门的主服务器发布任务(下面的代码)。我没有得到我期望的结果:当使用的工作服务器数量加倍时,完成时间根本没有减少。
似乎随着我添加更多工作服务器,每个工作服务器的利用率下降(如 Flower 所报告)。例如,有 2 个 worker,在整个执行过程中,每个 worker 的活动线程数徘徊在 80 到 90 之间(正如预期的那样,因为并发为 100)。但是,对于 6 个工作线程,每个工作线程的活动线程数徘徊在 10 到 20 之间。
这几乎就像队列大小太小,或者工作服务器无法足够快地从队列中拉出任务以充分利用,并且随着您添加更多的工作人员,他们很难快速从队列中拉出任务.
urls = ["https://...", ..., "https://..."]
tasks = []
num = 0
for url in urls:
num = num + 1
tasks.append(fetch_url.s(num, url))
job = group(tasks)
start = time.time()
res = job.apply_async()
res.join()
print time.time() - start
更新: 我附上了使用 1 个工作服务器、2 个工作服务器等最多 5 个工作服务器时成功任务与时间的关系图。如您所见,任务完成率从 1 台工作服务器增加到 2 台工作服务器,但随着我添加更多服务器,任务完成率开始趋于平稳。
对于未来的读者。有帮助的行动,最重要的利益优先:
- 将几个小工作单元组合成一个 celery 任务
- 将 Celery 代理从 RabbitMQ 切换到 Redis
原始评论讨论中未提及更多有用的提示,因此对于此问题的益处意义未知。
- 使用
httplib2
或urllib3
或更好的 HTTP 库。requests
无故燃烧 CPU - 使用 HTTP 连接池。检查并确保重复使用与目标服务器的永久连接。
分块解释。
分块前
urls = [...]
function task(url)
response = http_fetch(url)
return process(response.body)
celery.apply_async url1
celery.apply_async url2
...
所以任务队列包含N=len(urls)个任务,每个任务都是获取单个url,对响应进行一些计算。
分块
function chunk(xs, n)
loop:
g, rest = xs[:n], xs[n:]
yield g
chunks = [ [url1, url2, url3], [4, 5, 6], ... ]
function task(chunk)
pool = eventlet.GreenPool()
result = {
response.url: process(response)
for response in pool.imap(http_fetch, chunk)
}
return result
celery.apply_async chunk1
celery.apply_async chunk2
...
现在任务队列包含M=len(urls)/chunksize个任务,每个任务都是获取chunksizeurls个并处理所有响应。现在你必须在单个块中多路复用并发 url 提取。这里用Eventlet GreenPool就搞定了。
注意,因为 Python,首先执行所有网络 IO,然后对块中的所有响应执行所有 CPU 计算,通过多个 celery worker 分摊 CPU 负载可能是有益的.
此答案中的所有代码仅显示一般方向。您必须使用更少的复制和分配实现更好的版本。