Python 请求 - threads/processes 与 IO

Python requests - threads/processes vs. IO

我正在通过 HTTP 连接到本地服务器 (OSRM) 以提交路线并取回行驶时间。我注意到 I/O 比线程慢,因为计算的等待时间似乎小于发送请求和处理 JSON 输出所需的时间(我认为 I/O 是当服务器需要一些时间来处理您的请求时更好->您不希望它被阻塞,因为您必须等待,这不是我的情况)。线程受到全局解释器锁定的影响,因此看起来(和下面的证据)我最快的选择是使用多处理。

多处理的问题是它太快了,以至于它耗尽了我的套接字,我得到了一个错误(每次请求都会发出一个新的连接)。我可以(串行)使用 requests.Sessions() 对象来保持连接,但是我不能让它并行工作(每个进程都有自己的会话)。

目前我必须使用的最接近的代码是这个多处理代码:

conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count())

def ReqOsrm(url_input):
    ul, qid = url_input      
    try:
        response = conn_pool.request('GET', ul)
        json_geocode = json.loads(response.data.decode('utf-8'))
        status = int(json_geocode['status'])
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from, used_to = json_geocode['via_points']
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
            return out
        else:
            print("Done but no route: %d %s" % (qid, req_url))
            return [qid, 999, 0, 0, 0, 0, 0, 0]
    except Exception as err:
        print("%s: %d %s" % (err, qid, req_url))
        return [qid, 999, 0, 0, 0, 0, 0, 0]

# run:
pool = Pool(cpu_count())
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()

但是,我无法让 HTTPConnectionPool 正常工作,它每次都创建新的套接字(我认为)然后给我错误:

HTTPConnectionPool(host='127.0.0.1', port=5005): Max retries exceeded with url: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))


我的目标是从 OSRM-routing server 获取距离计算,我在本地 运行(尽快)。

我有两个部分的问题 - 基本上我正在尝试使用 multiprocessing.Pool() 将一些代码转换为更好的代码(正确的异步函数 - 这样执行就不会中断并且运行得尽可能快).

我遇到的问题是我尝试的一切似乎都比多处理慢(我在下面展示了几个我尝试过的例子)。

一些可能的方法是:gevents、grequests、tornado、requests-futures、asyncio 等

A - Multiprocessing.Pool()

我最初是这样开始的:

def ReqOsrm(url_input):
    req_url, query_id = url_input
    try_c = 0
    #print(req_url)
    while try_c < 5:
        try:
            response = requests.get(req_url)
            json_geocode = response.json()
            status = int(json_geocode['status'])
            # Found route between points
            if status == 200:
            ....

pool = Pool(cpu_count()-1) 
calc_routes = pool.map(ReqOsrm, url_routes)

我连接到本地服务器(localhost,端口:5005)的地方,它在 8 个线程和 supports parallel execution 上启动。

经过一番搜索,我意识到我遇到的错误是因为请求是 opening a new connection/socket for each-request。所以这实际上太快了,过了一会儿就会耗尽套接字。解决这个问题的方法似乎是使用 requests.Session() - 但是我无法使用多处理(每个进程都有自己的会话)。

问题一

在某些计算机上运行良好,例如:

与以后比较:45% 服务器使用率和每秒 1700 个请求

然而,在某些情况下它没有,我不完全理解为什么:

HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))

我的猜测是,由于请求在使用时锁定套接字 - 有时服务器太慢而无法响应旧请求并生成新请求。服务器支持排队,但是请求不支持,而不是添加到队列我得到错误?

问题2.

我发现:

Blocking Or Non-Blocking?

With the default Transport Adapter in place, Requests does not provide any kind of non-blocking IO. The Response.content property will block until the entire response has been downloaded. If you require more granularity, the streaming features of the library (see Streaming Requests) allow you to retrieve smaller quantities of the response at a time. However, these calls will still block.

If you are concerned about the use of blocking IO, there are lots of projects out there that combine Requests with one of Python’s asynchronicity frameworks.

Two excellent examples are grequests and requests-futures.

B - 请求期货

为了解决这个问题,我需要重写我的代码以使用异步请求,所以我尝试了以下使用:

from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed

(顺便说一下,我在启动服务器时选择了使用所有线程)

以及主要代码:

calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
    # Submit requests and process in background
    for i in range(len(url_routes)):
        url_in, qid = url_routes[i]  # url |query-id
        future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
        futures[future] = qid
    # Process the futures as they become complete
    for future in as_completed(futures):
        r = future.result()
        try:
            row = [futures[future]] + r.data
        except Exception as err:
            print('No route')
            row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
        calc_routes.append(row)

我的函数 (ReqOsrm) 现在重写为:

def ReqOsrm(sess, resp):
    json_geocode = resp.json()
    status = int(json_geocode['status'])
    # Found route between points
    if status == 200:
        tot_time_s = json_geocode['route_summary']['total_time']
        tot_dist_m = json_geocode['route_summary']['total_distance']
        used_from = json_geocode['via_points'][0]
        used_to = json_geocode['via_points'][1]
        out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
    # Cannot find route between points (code errors as 999)
    else:
        out = [999, 0, 0, 0, 0, 0, 0]
    resp.data = out

但是,这段代码 比多处理代码慢 !在我每秒收到大约 1700 个请求之前,现在我得到了 600 秒。我想这是因为我没有充分利用 CPU,但我不确定如何增加它?

C - 线程

我尝试了另一种方法 (creating threads) - 但是再次不确定如何使用它来最大化 CPU 使用率(理想情况下我希望看到我的服务器使用 50%,不是吗?) :

def doWork():
    while True:
        url,qid = q.get()
        status, resp = getReq(url)
        processReq(status, resp, qid)
        q.task_done()

def getReq(url):
    try:
        resp = requests.get(url)
        return resp.status_code, resp
    except:
        return 999, None

def processReq(status, resp, qid):
    try:
        json_geocode = resp.json()
        # Found route between points
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from = json_geocode['via_points'][0]
            used_to = json_geocode['via_points'][1]
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
        else:
            print("Done but no route")
            out = [qid, 999, 0, 0, 0, 0, 0, 0]
    except Exception as err:
        print("Error: %s" % err)
        out = [qid, 999, 0, 0, 0, 0, 0, 0]
    qres.put(out)
    return

#Run:
concurrent = 1000
qres = Queue()
q = Queue(concurrent)

for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in url_routes:
        q.put(url)
        q.join()
    except Exception:
        pass

# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]

我觉得这个方法比requests_futures快,但是我不知道要设置多少线程才能最大化这个-

D - 龙卷风(不工作)

我现在正在尝试龙卷风 - 但是不能完全让它工作它会破坏现有代码 -1073741819 如果我使用 curl - 如果我使用 simple_httpclient 它可以工作但是然后我得到超时错误:

ERROR:tornado.application:Multiple exceptions in yield list Traceback (most recent call last): File "C:\Anaconda3\lib\site-packages\tornado\gen.py", line 789, in callback result_list.append(f.result()) File "C:\Anaconda3\lib\site-packages\tornado\concurrent.py", line 232, in result raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info tornado.httpclient.HTTPError: HTTP 599: Timeout

def handle_req(r):
    try:
        json_geocode = json_decode(r)
        status = int(json_geocode['status'])
        tot_time_s = json_geocode['route_summary']['total_time']
        tot_dist_m = json_geocode['route_summary']['total_distance']
        used_from = json_geocode['via_points'][0]
        used_to = json_geocode['via_points'][1]
        out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
        print(out)
    except Exception as err:
        print(err)
        out = [999, 0, 0, 0, 0, 0, 0]
    return out

# Configure
# For some reason curl_httpclient crashes my computer
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10)

@gen.coroutine
def run_experiment(urls):
    http_client = AsyncHTTPClient()
    responses = yield [http_client.fetch(url) for url, qid in urls]
    responses_out = [handle_req(r.body) for r in responses]
    raise gen.Return(value=responses_out)

# Initialise
_ioloop = ioloop.IOLoop.instance()
run_func = partial(run_experiment, url_routes)
calc_routes = _ioloop.run_sync(run_func)

E-asyncio/aiohttp

决定尝试使用 asyncio 和 aiohttp 的另一种方法(尽管让 tornado 正常工作会很棒)。

import asyncio
import aiohttp

def handle_req(data, qid):
    json_geocode = json.loads(data.decode('utf-8'))
    status = int(json_geocode['status'])
    if status == 200:
        tot_time_s = json_geocode['route_summary']['total_time']
        tot_dist_m = json_geocode['route_summary']['total_distance']
        used_from = json_geocode['via_points'][0]
        used_to = json_geocode['via_points'][1]
        out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
    else:
        print("Done, but not route for {0} - status: {1}".format(qid, status))
        out = [qid, 999, 0, 0, 0, 0, 0, 0]
    return out

def chunked_http_client(num_chunks):
    # Use semaphore to limit number of requests
    semaphore = asyncio.Semaphore(num_chunks)
    @asyncio.coroutine
    # Return co-routine that will download files asynchronously and respect
    # locking fo semaphore
    def http_get(url, qid):
        nonlocal semaphore
        with (yield from semaphore):
            response = yield from aiohttp.request('GET', url)
            body = yield from response.content.read()
            yield from response.wait_for_close()
        return body, qid
    return http_get

def run_experiment(urls):
    http_client = chunked_http_client(500)
    # http_client returns futures
    # save all the futures to a list
    tasks = [http_client(url, qid) for url, qid in urls]
    response = []
    # wait for futures to be ready then iterate over them
    for future in asyncio.as_completed(tasks):
        data, qid = yield from future
        try:
            out = handle_req(data, qid)
        except Exception as err:
            print("Error for {0} - {1}".format(qid,err))
            out = [qid, 999, 0, 0, 0, 0, 0, 0]
        response.append(out)
    return response

# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))

这工作正常,但仍然比多处理慢!

问题一

你得到错误,因为这种方法:

def ReqOsrm(url_input):
    req_url, query_id = url_input
    try_c = 0
    #print(req_url)
    while try_c < 5:
        try:
            response = requests.get(req_url)
            json_geocode = response.json()
            status = int(json_geocode['status'])
            # Found route between points
            if status == 200:
            ....

pool = Pool(cpu_count()-1) 
calc_routes = pool.map(ReqOsrm, url_routes)

为每个请求创建一个新的 TCP 连接 URL 并且在某些时候它失败只是因为系统没有可用的本地端口。要确认您可以 运行 netstat,而您的代码正在执行:

netstat -a -n | find /c "localhost:5005"

这将为您提供与服务器的连接数。

另外,达到 1700 RPS 对于这种方法来说看起来很不现实,因为 requests.get 是相当昂贵的操作,而且您不太可能通过这种方式获得 50 RPS。因此,您可能需要仔细检查您的 RPS 计算。

为避免错误,您需要使用会话而不是从头开始创建连接:

import multiprocessing
import requests
import time


class Worker(multiprocessing.Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self.qin = qin
        self.qout = qout

    def run(self):
        s = requests.session()
        while not self.qin.empty():
            result = s.get(self.qin.get())
            self.qout.put(result)
            self.qin.task_done()

if __name__ == '__main__':
    start = time.time()

    qin = multiprocessing.JoinableQueue()
    [qin.put('http://localhost:8080/') for _ in range(10000)]

    qout = multiprocessing.Queue()

    [Worker(qin, qout).start() for _ in range(multiprocessing.cpu_count())]

    qin.join()

    result = []
    while not qout.empty():
        result.append(qout.get())

    print time.time() - start
    print result

问题二

除非I/O比计算花费更多时间(例如高网络延迟、大响应等),否则线程或异步方法不会获得更高的 RPS,因为线程受 GIL 影响,因为 运行ning 在同一个 Python 进程和异步库中可能会被长 运行ning 计算阻塞。

尽管线程或异步库可以提高性能,运行在多个进程中使用相同的线程或异步代码无论如何都会给您带来更高的性能。

查看问题顶部的多处理代码。似乎每次调用 ReqOsrm 时都会调用 HttpConnectionPool()。因此,为每个 url 创建了一个新池。相反,使用 initializerargs 参数为每​​个进程创建一个池。

conn_pool = None

def makePool(host, port):
    global conn_pool
    pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)

def ReqOsrm(url_input):
    ul, qid = url_input

    try:
        response = conn_pool.request('GET', ul)
        json_geocode = json.loads(response.data.decode('utf-8'))
        status = int(json_geocode['status'])
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from, used_to = json_geocode['via_points']
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
            return out

        else:
            print("Done but no route: %d %s" % (qid, req_url))
            return [qid, 999, 0, 0, 0, 0, 0, 0]

    except Exception as err:
        print("%s: %d %s" % (err, qid, req_url))
        return [qid, 999, 0, 0, 0, 0, 0, 0]

if __name__ == "__main__":
    # run:
    pool = Pool(initializer=makePool, initargs=('127.0.0.1', 5005))
    calc_routes = pool.map(ReqOsrm, url_routes)
    pool.close()
    pool.join()

request-futures 版本似乎有缩进错误。循环 for future in as_completed(futures): 在外循环下缩进 for i in range(len(url_routes)):。因此,在外循环中发出请求,然后内循环在外循环的下一次迭代之前等待那个未来到 return。这使得请求 运行 串行而不是并行。

我觉得代码应该是这样的:

calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
    # Submit all the requests and process in background
    for i in range(len(url_routes)):
        url_in, qid = url_routes[i]  # url |query-id
        future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
        futures[future] = qid

    # this was indented under the code in section B of the question
    # process the futures as they become copmlete
    for future in as_completed(futures):
        r = future.result()
        try:
            row = [futures[future]] + r.data

        except Exception as err:
            print('No route')
            row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
        print(row)
        calc_routes.append(row)

感谢大家的帮助。我想 post 我的结论:

由于我的 HTTP 请求是发送给本地服务器的,它会立即处理请求,因此使用异步方法对我来说没有多大意义(与通过 Internet 发送请求的大多数情况相比)。对我来说,代价高昂的因素实际上是发送请求和处理反馈,这意味着我使用多个进程(线程受 GIL 影响)可以获得更快的速度。我还应该使用会话来提高速度(无需关闭并重新打开与同一服务器的连接)并帮助防止端口耗尽。

以下是使用示例 RPS 尝试(有效)的所有方法:

连载

S1。串行 GET 请求(无会话)-> 215 RPS

def ReqOsrm(data):
    url, qid = data
    try:
        response = requests.get(url)
        json_geocode = json.loads(response.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        return [qid, 999, 0, 0]
# Run:      
calc_routes = [ReqOsrm(x) for x in url_routes]

S2。串行 GET 请求 (requests.Session()) -> 335 RPS

session = requests.Session()
def ReqOsrm(data):
    url, qid = data
    try:
        response = session.get(url)
        json_geocode = json.loads(response.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        return [qid, 999, 0, 0]
# Run:      
calc_routes = [ReqOsrm(x) for x in url_routes]

S3。串行 GET 请求 (urllib3.HTTPConnectionPool) -> 545 RPS

conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=1)
def ReqOsrm(data):
    url, qid = data
    try:
        response = conn_pool.request('GET', url)
        json_geocode = json.loads(response.data.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        return [qid, 999, 0, 0]
# Run:      
calc_routes = [ReqOsrm(x) for x in url_routes]

异步 IO

A4。 AsyncIO with aiohttp -> 450 RPS

import asyncio
import aiohttp
concurrent = 100
def handle_req(data, qid):
    json_geocode = json.loads(data.decode('utf-8'))
    tot_time_s = json_geocode['paths'][0]['time']
    tot_dist_m = json_geocode['paths'][0]['distance']
    return [qid, 200, tot_time_s, tot_dist_m]
def chunked_http_client(num_chunks):
    # Use semaphore to limit number of requests
    semaphore = asyncio.Semaphore(num_chunks)
    @asyncio.coroutine
    # Return co-routine that will download files asynchronously and respect
    # locking fo semaphore
    def http_get(url, qid):
        nonlocal semaphore
        with (yield from semaphore):
            with aiohttp.ClientSession() as session:
                response = yield from session.get(url)
                body = yield from response.content.read()
                yield from response.wait_for_close()
        return body, qid
    return http_get
def run_experiment(urls):
    http_client = chunked_http_client(num_chunks=concurrent)
    # http_client returns futures, save all the futures to a list
    tasks = [http_client(url, qid) for url, qid in urls]
    response = []
    # wait for futures to be ready then iterate over them
    for future in asyncio.as_completed(tasks):
        data, qid = yield from future
        try:
            out = handle_req(data, qid)
        except Exception as err:
            print("Error for {0} - {1}".format(qid,err))
            out = [qid, 999, 0, 0]
        response.append(out)
    return response
# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))

A5。没有会话的线程 -> 330 RPS

from threading import Thread
from queue import Queue
concurrent = 100
def doWork():
    while True:
        url,qid = q.get()
        status, resp = getReq(url)
        processReq(status, resp, qid)
        q.task_done()
def getReq(url):
    try:
        resp = requests.get(url)
        return resp.status_code, resp
    except:
        return 999, None
def processReq(status, resp, qid):
    try:
        json_geocode = json.loads(resp.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        out = [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid, url)
        out = [qid, 999, 0, 0]
    qres.put(out)
    return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
for url in url_routes:
    q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]

A6。使用 HTTPConnectionPool 的线程 -> 1550 RPS

from threading import Thread
from queue import Queue
from urllib3 import HTTPConnectionPool
concurrent = 100
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=concurrent)
def doWork():
    while True:
        url,qid = q.get()
        status, resp = getReq(url)
        processReq(status, resp, qid)
        q.task_done()
def getReq(url):
    try:
        resp = conn_pool.request('GET', url)
        return resp.status, resp
    except:
        return 999, None
def processReq(status, resp, qid):
    try:
        json_geocode = json.loads(resp.data.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        out = [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid, url)
        out = [qid, 999, 0, 0]
    qres.put(out)
    return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
for url in url_routes:
    q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]

A7。请求期货 -> 520 RPS

from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
concurrent = 100
def ReqOsrm(sess, resp):
    try:
        json_geocode = resp.json()
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        out = [200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err)
        out = [999, 0, 0]
    resp.data = out
#Run:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=concurrent)) as session:
    # Submit requests and process in background
    for i in range(len(url_routes)):
        url_in, qid = url_routes[i]  # url |query-id
        future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
        futures[future] = qid
    # Process the futures as they become complete
    for future in as_completed(futures):
        r = future.result()
        try:
            row = [futures[future]] + r.data
        except Exception as err:
            print('No route')
            row = [futures[future], 999, 0, 0]
        calc_routes.append(row)

多进程

P8。 multiprocessing.worker + 队列 + requests.session() -> 1058 RPS

from multiprocessing import *
class Worker(Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self.qin = qin
        self.qout = qout
    def run(self):
        s = requests.session()
        while not self.qin.empty():
            url, qid = self.qin.get()
            data = s.get(url)
            self.qout.put(ReqOsrm(data, qid))
            self.qin.task_done()
def ReqOsrm(resp, qid):
    try:
        json_geocode = json.loads(resp.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid)
        return [qid, 999, 0, 0]
# Run:
qout = Queue()
qin = JoinableQueue()
[qin.put(url_q) for url_q in url_routes]
[Worker(qin, qout).start() for _ in range(cpu_count())]
qin.join()
calc_routes = []
while not qout.empty():
    calc_routes.append(qout.get())

P9. multiprocessing.worker + 队列 + HTTPConnectionPool() -> 1230 RPS

P10。多处理 v2(不太确定这有什么不同)-> 1350 RPS

conn_pool = None
def makePool(host, port):
    global conn_pool
    pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)
def ReqOsrm(data):
    url, qid = data
    try:
        response = conn_pool.request('GET', url)
        json_geocode = json.loads(response.data.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid, url)
        return [qid, 999, 0, 0]
# Run:
pool = Pool(initializer=makePool, initargs=(ghost, gport))
calc_routes = pool.map(ReqOsrm, url_routes)

所以总而言之,对我来说最好的方法似乎是#10(令人惊讶的是#6)

这是我在 gevent 中使用的模式,它是基于协程的,可能不会受到 GIL 的影响。这可能比使用线程更快,并且在与多处理结合使用时可能最快(目前它只使用 1 个核心):

from gevent import monkey
monkey.patch_all()

import logging
import random
import time
from threading import Thread

from gevent.queue import JoinableQueue
from logger import initialize_logger

initialize_logger()
log = logging.getLogger(__name__)


class Worker(Thread):

    def __init__(self, worker_idx, queue):
        # initialize the base class
        super(Worker, self).__init__()
        self.worker_idx = worker_idx
        self.queue = queue

    def log(self, msg):
        log.info("WORKER %s - %s" % (self.worker_idx, msg))

    def do_work(self, line):
        #self.log(line)
        time.sleep(random.random() / 10)

    def run(self):
        while True:
            line = self.queue.get()
            self.do_work(line)
            self.queue.task_done()


def main(number_of_workers=20):
    start_time = time.time()

    queue = JoinableQueue()
    for idx in range(number_of_workers):
        worker = Worker(idx, queue)
        # "daemonize" a thread to ensure that the threads will
        # close when the main program finishes
        worker.daemon = True
        worker.start()

    for idx in xrange(100):
        queue.put("%s" % idx)

    queue.join()
    time_taken = time.time() - start_time
    log.info("Parallel work took %s seconds." % time_taken)

    start_time = time.time()
    for idx in xrange(100):
        #log.info(idx)
        time.sleep(random.random() / 10)
    time_taken = time.time() - start_time
    log.info("Sync work took %s seconds." % time_taken)


if __name__ == "__main__":
    main()