Python 请求 - threads/processes 与 IO
Python requests - threads/processes vs. IO
我正在通过 HTTP 连接到本地服务器 (OSRM) 以提交路线并取回行驶时间。我注意到 I/O 比线程慢,因为计算的等待时间似乎小于发送请求和处理 JSON 输出所需的时间(我认为 I/O 是当服务器需要一些时间来处理您的请求时更好->您不希望它被阻塞,因为您必须等待,这不是我的情况)。线程受到全局解释器锁定的影响,因此看起来(和下面的证据)我最快的选择是使用多处理。
多处理的问题是它太快了,以至于它耗尽了我的套接字,我得到了一个错误(每次请求都会发出一个新的连接)。我可以(串行)使用 requests.Sessions() 对象来保持连接,但是我不能让它并行工作(每个进程都有自己的会话)。
目前我必须使用的最接近的代码是这个多处理代码:
conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count())
def ReqOsrm(url_input):
ul, qid = url_input
try:
response = conn_pool.request('GET', ul)
json_geocode = json.loads(response.data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from, used_to = json_geocode['via_points']
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
return out
else:
print("Done but no route: %d %s" % (qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("%s: %d %s" % (err, qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
# run:
pool = Pool(cpu_count())
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()
但是,我无法让 HTTPConnectionPool 正常工作,它每次都创建新的套接字(我认为)然后给我错误:
HTTPConnectionPool(host='127.0.0.1', port=5005): Max retries exceeded
with url:
/viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false
(Caused by NewConnectionError(': Failed to establish a new connection:
[WinError 10048] Only one usage of each socket address
(protocol/network address/port) is normally permitted',))
我的目标是从 OSRM-routing server 获取距离计算,我在本地 运行(尽快)。
我有两个部分的问题 - 基本上我正在尝试使用 multiprocessing.Pool() 将一些代码转换为更好的代码(正确的异步函数 - 这样执行就不会中断并且运行得尽可能快).
我遇到的问题是我尝试的一切似乎都比多处理慢(我在下面展示了几个我尝试过的例子)。
一些可能的方法是:gevents、grequests、tornado、requests-futures、asyncio 等
A - Multiprocessing.Pool()
我最初是这样开始的:
def ReqOsrm(url_input):
req_url, query_id = url_input
try_c = 0
#print(req_url)
while try_c < 5:
try:
response = requests.get(req_url)
json_geocode = response.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
....
pool = Pool(cpu_count()-1)
calc_routes = pool.map(ReqOsrm, url_routes)
我连接到本地服务器(localhost,端口:5005)的地方,它在 8 个线程和 supports parallel execution 上启动。
经过一番搜索,我意识到我遇到的错误是因为请求是 opening a new connection/socket for each-request。所以这实际上太快了,过了一会儿就会耗尽套接字。解决这个问题的方法似乎是使用 requests.Session() - 但是我无法使用多处理(每个进程都有自己的会话)。
问题一
在某些计算机上运行良好,例如:
与以后比较:45% 服务器使用率和每秒 1700 个请求
然而,在某些情况下它没有,我不完全理解为什么:
HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded
with url:
/viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false
(Caused by
NewConnectionError(': Failed to establish a new connection:
[WinError 10048] Only one usage of each socket address
(protocol/network address/port) is normally permitted',))
我的猜测是,由于请求在使用时锁定套接字 - 有时服务器太慢而无法响应旧请求并生成新请求。服务器支持排队,但是请求不支持,而不是添加到队列我得到错误?
问题2.
我发现:
Blocking Or Non-Blocking?
With the default Transport Adapter in place, Requests does not provide
any kind of non-blocking IO. The Response.content property will block
until the entire response has been downloaded. If you require more
granularity, the streaming features of the library (see Streaming
Requests) allow you to retrieve smaller quantities of the response at
a time. However, these calls will still block.
If you are concerned about the use of blocking IO, there are lots of
projects out there that combine Requests with one of Python’s
asynchronicity frameworks.
Two excellent examples are grequests and requests-futures.
B - 请求期货
为了解决这个问题,我需要重写我的代码以使用异步请求,所以我尝试了以下使用:
from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
(顺便说一下,我在启动服务器时选择了使用所有线程)
以及主要代码:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
# Submit requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid
# Process the futures as they become complete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data
except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
calc_routes.append(row)
我的函数 (ReqOsrm) 现在重写为:
def ReqOsrm(sess, resp):
json_geocode = resp.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
# Cannot find route between points (code errors as 999)
else:
out = [999, 0, 0, 0, 0, 0, 0]
resp.data = out
但是,这段代码 比多处理代码慢 !在我每秒收到大约 1700 个请求之前,现在我得到了 600 秒。我想这是因为我没有充分利用 CPU,但我不确定如何增加它?
C - 线程
我尝试了另一种方法 (creating threads) - 但是再次不确定如何使用它来最大化 CPU 使用率(理想情况下我希望看到我的服务器使用 50%,不是吗?) :
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()
def getReq(url):
try:
resp = requests.get(url)
return resp.status_code, resp
except:
return 999, None
def processReq(status, resp, qid):
try:
json_geocode = resp.json()
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done but no route")
out = [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("Error: %s" % err)
out = [qid, 999, 0, 0, 0, 0, 0, 0]
qres.put(out)
return
#Run:
concurrent = 1000
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in url_routes:
q.put(url)
q.join()
except Exception:
pass
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]
我觉得这个方法比requests_futures快,但是我不知道要设置多少线程才能最大化这个-
D - 龙卷风(不工作)
我现在正在尝试龙卷风 - 但是不能完全让它工作它会破坏现有代码 -1073741819 如果我使用 curl - 如果我使用 simple_httpclient 它可以工作但是然后我得到超时错误:
ERROR:tornado.application:Multiple exceptions in yield list Traceback
(most recent call last): File
"C:\Anaconda3\lib\site-packages\tornado\gen.py", line 789, in callback
result_list.append(f.result()) File "C:\Anaconda3\lib\site-packages\tornado\concurrent.py", line 232, in
result
raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info tornado.httpclient.HTTPError: HTTP 599: Timeout
def handle_req(r):
try:
json_geocode = json_decode(r)
status = int(json_geocode['status'])
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
print(out)
except Exception as err:
print(err)
out = [999, 0, 0, 0, 0, 0, 0]
return out
# Configure
# For some reason curl_httpclient crashes my computer
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10)
@gen.coroutine
def run_experiment(urls):
http_client = AsyncHTTPClient()
responses = yield [http_client.fetch(url) for url, qid in urls]
responses_out = [handle_req(r.body) for r in responses]
raise gen.Return(value=responses_out)
# Initialise
_ioloop = ioloop.IOLoop.instance()
run_func = partial(run_experiment, url_routes)
calc_routes = _ioloop.run_sync(run_func)
E-asyncio/aiohttp
决定尝试使用 asyncio 和 aiohttp 的另一种方法(尽管让 tornado 正常工作会很棒)。
import asyncio
import aiohttp
def handle_req(data, qid):
json_geocode = json.loads(data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done, but not route for {0} - status: {1}".format(qid, status))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
return out
def chunked_http_client(num_chunks):
# Use semaphore to limit number of requests
semaphore = asyncio.Semaphore(num_chunks)
@asyncio.coroutine
# Return co-routine that will download files asynchronously and respect
# locking fo semaphore
def http_get(url, qid):
nonlocal semaphore
with (yield from semaphore):
response = yield from aiohttp.request('GET', url)
body = yield from response.content.read()
yield from response.wait_for_close()
return body, qid
return http_get
def run_experiment(urls):
http_client = chunked_http_client(500)
# http_client returns futures
# save all the futures to a list
tasks = [http_client(url, qid) for url, qid in urls]
response = []
# wait for futures to be ready then iterate over them
for future in asyncio.as_completed(tasks):
data, qid = yield from future
try:
out = handle_req(data, qid)
except Exception as err:
print("Error for {0} - {1}".format(qid,err))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
response.append(out)
return response
# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))
这工作正常,但仍然比多处理慢!
问题一
你得到错误,因为这种方法:
def ReqOsrm(url_input):
req_url, query_id = url_input
try_c = 0
#print(req_url)
while try_c < 5:
try:
response = requests.get(req_url)
json_geocode = response.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
....
pool = Pool(cpu_count()-1)
calc_routes = pool.map(ReqOsrm, url_routes)
为每个请求创建一个新的 TCP 连接 URL 并且在某些时候它失败只是因为系统没有可用的本地端口。要确认您可以 运行 netstat
,而您的代码正在执行:
netstat -a -n | find /c "localhost:5005"
这将为您提供与服务器的连接数。
另外,达到 1700 RPS 对于这种方法来说看起来很不现实,因为 requests.get
是相当昂贵的操作,而且您不太可能通过这种方式获得 50 RPS。因此,您可能需要仔细检查您的 RPS 计算。
为避免错误,您需要使用会话而不是从头开始创建连接:
import multiprocessing
import requests
import time
class Worker(multiprocessing.Process):
def __init__(self, qin, qout, *args, **kwargs):
super(Worker, self).__init__(*args, **kwargs)
self.qin = qin
self.qout = qout
def run(self):
s = requests.session()
while not self.qin.empty():
result = s.get(self.qin.get())
self.qout.put(result)
self.qin.task_done()
if __name__ == '__main__':
start = time.time()
qin = multiprocessing.JoinableQueue()
[qin.put('http://localhost:8080/') for _ in range(10000)]
qout = multiprocessing.Queue()
[Worker(qin, qout).start() for _ in range(multiprocessing.cpu_count())]
qin.join()
result = []
while not qout.empty():
result.append(qout.get())
print time.time() - start
print result
问题二
除非I/O比计算花费更多时间(例如高网络延迟、大响应等),否则线程或异步方法不会获得更高的 RPS,因为线程受 GIL 影响,因为 运行ning 在同一个 Python 进程和异步库中可能会被长 运行ning 计算阻塞。
尽管线程或异步库可以提高性能,运行在多个进程中使用相同的线程或异步代码无论如何都会给您带来更高的性能。
查看问题顶部的多处理代码。似乎每次调用 ReqOsrm 时都会调用 HttpConnectionPool()
。因此,为每个 url 创建了一个新池。相反,使用 initializer
和 args
参数为每个进程创建一个池。
conn_pool = None
def makePool(host, port):
global conn_pool
pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)
def ReqOsrm(url_input):
ul, qid = url_input
try:
response = conn_pool.request('GET', ul)
json_geocode = json.loads(response.data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from, used_to = json_geocode['via_points']
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
return out
else:
print("Done but no route: %d %s" % (qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("%s: %d %s" % (err, qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
if __name__ == "__main__":
# run:
pool = Pool(initializer=makePool, initargs=('127.0.0.1', 5005))
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()
request-futures 版本似乎有缩进错误。循环
for future in as_completed(futures):
在外循环下缩进
for i in range(len(url_routes)):
。因此,在外循环中发出请求,然后内循环在外循环的下一次迭代之前等待那个未来到 return。这使得请求 运行 串行而不是并行。
我觉得代码应该是这样的:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
# Submit all the requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid
# this was indented under the code in section B of the question
# process the futures as they become copmlete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data
except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
print(row)
calc_routes.append(row)
感谢大家的帮助。我想 post 我的结论:
由于我的 HTTP 请求是发送给本地服务器的,它会立即处理请求,因此使用异步方法对我来说没有多大意义(与通过 Internet 发送请求的大多数情况相比)。对我来说,代价高昂的因素实际上是发送请求和处理反馈,这意味着我使用多个进程(线程受 GIL 影响)可以获得更快的速度。我还应该使用会话来提高速度(无需关闭并重新打开与同一服务器的连接)并帮助防止端口耗尽。
以下是使用示例 RPS 尝试(有效)的所有方法:
连载
S1。串行 GET 请求(无会话)-> 215 RPS
def ReqOsrm(data):
url, qid = data
try:
response = requests.get(url)
json_geocode = json.loads(response.content.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
return [qid, 999, 0, 0]
# Run:
calc_routes = [ReqOsrm(x) for x in url_routes]
S2。串行 GET 请求 (requests.Session()) -> 335 RPS
session = requests.Session()
def ReqOsrm(data):
url, qid = data
try:
response = session.get(url)
json_geocode = json.loads(response.content.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
return [qid, 999, 0, 0]
# Run:
calc_routes = [ReqOsrm(x) for x in url_routes]
S3。串行 GET 请求 (urllib3.HTTPConnectionPool) -> 545 RPS
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=1)
def ReqOsrm(data):
url, qid = data
try:
response = conn_pool.request('GET', url)
json_geocode = json.loads(response.data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
return [qid, 999, 0, 0]
# Run:
calc_routes = [ReqOsrm(x) for x in url_routes]
异步 IO
A4。 AsyncIO with aiohttp -> 450 RPS
import asyncio
import aiohttp
concurrent = 100
def handle_req(data, qid):
json_geocode = json.loads(data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
def chunked_http_client(num_chunks):
# Use semaphore to limit number of requests
semaphore = asyncio.Semaphore(num_chunks)
@asyncio.coroutine
# Return co-routine that will download files asynchronously and respect
# locking fo semaphore
def http_get(url, qid):
nonlocal semaphore
with (yield from semaphore):
with aiohttp.ClientSession() as session:
response = yield from session.get(url)
body = yield from response.content.read()
yield from response.wait_for_close()
return body, qid
return http_get
def run_experiment(urls):
http_client = chunked_http_client(num_chunks=concurrent)
# http_client returns futures, save all the futures to a list
tasks = [http_client(url, qid) for url, qid in urls]
response = []
# wait for futures to be ready then iterate over them
for future in asyncio.as_completed(tasks):
data, qid = yield from future
try:
out = handle_req(data, qid)
except Exception as err:
print("Error for {0} - {1}".format(qid,err))
out = [qid, 999, 0, 0]
response.append(out)
return response
# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))
A5。没有会话的线程 -> 330 RPS
from threading import Thread
from queue import Queue
concurrent = 100
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()
def getReq(url):
try:
resp = requests.get(url)
return resp.status_code, resp
except:
return 999, None
def processReq(status, resp, qid):
try:
json_geocode = json.loads(resp.content.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
out = [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err, qid, url)
out = [qid, 999, 0, 0]
qres.put(out)
return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
for url in url_routes:
q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]
A6。使用 HTTPConnectionPool 的线程 -> 1550 RPS
from threading import Thread
from queue import Queue
from urllib3 import HTTPConnectionPool
concurrent = 100
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=concurrent)
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()
def getReq(url):
try:
resp = conn_pool.request('GET', url)
return resp.status, resp
except:
return 999, None
def processReq(status, resp, qid):
try:
json_geocode = json.loads(resp.data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
out = [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err, qid, url)
out = [qid, 999, 0, 0]
qres.put(out)
return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
for url in url_routes:
q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]
A7。请求期货 -> 520 RPS
from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
concurrent = 100
def ReqOsrm(sess, resp):
try:
json_geocode = resp.json()
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
out = [200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err)
out = [999, 0, 0]
resp.data = out
#Run:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=concurrent)) as session:
# Submit requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid
# Process the futures as they become complete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data
except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0]
calc_routes.append(row)
多进程
P8。 multiprocessing.worker + 队列 + requests.session() -> 1058 RPS
from multiprocessing import *
class Worker(Process):
def __init__(self, qin, qout, *args, **kwargs):
super(Worker, self).__init__(*args, **kwargs)
self.qin = qin
self.qout = qout
def run(self):
s = requests.session()
while not self.qin.empty():
url, qid = self.qin.get()
data = s.get(url)
self.qout.put(ReqOsrm(data, qid))
self.qin.task_done()
def ReqOsrm(resp, qid):
try:
json_geocode = json.loads(resp.content.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err, qid)
return [qid, 999, 0, 0]
# Run:
qout = Queue()
qin = JoinableQueue()
[qin.put(url_q) for url_q in url_routes]
[Worker(qin, qout).start() for _ in range(cpu_count())]
qin.join()
calc_routes = []
while not qout.empty():
calc_routes.append(qout.get())
P9. multiprocessing.worker + 队列 + HTTPConnectionPool() -> 1230 RPS
P10。多处理 v2(不太确定这有什么不同)-> 1350 RPS
conn_pool = None
def makePool(host, port):
global conn_pool
pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)
def ReqOsrm(data):
url, qid = data
try:
response = conn_pool.request('GET', url)
json_geocode = json.loads(response.data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err, qid, url)
return [qid, 999, 0, 0]
# Run:
pool = Pool(initializer=makePool, initargs=(ghost, gport))
calc_routes = pool.map(ReqOsrm, url_routes)
所以总而言之,对我来说最好的方法似乎是#10(令人惊讶的是#6)
这是我在 gevent 中使用的模式,它是基于协程的,可能不会受到 GIL 的影响。这可能比使用线程更快,并且在与多处理结合使用时可能最快(目前它只使用 1 个核心):
from gevent import monkey
monkey.patch_all()
import logging
import random
import time
from threading import Thread
from gevent.queue import JoinableQueue
from logger import initialize_logger
initialize_logger()
log = logging.getLogger(__name__)
class Worker(Thread):
def __init__(self, worker_idx, queue):
# initialize the base class
super(Worker, self).__init__()
self.worker_idx = worker_idx
self.queue = queue
def log(self, msg):
log.info("WORKER %s - %s" % (self.worker_idx, msg))
def do_work(self, line):
#self.log(line)
time.sleep(random.random() / 10)
def run(self):
while True:
line = self.queue.get()
self.do_work(line)
self.queue.task_done()
def main(number_of_workers=20):
start_time = time.time()
queue = JoinableQueue()
for idx in range(number_of_workers):
worker = Worker(idx, queue)
# "daemonize" a thread to ensure that the threads will
# close when the main program finishes
worker.daemon = True
worker.start()
for idx in xrange(100):
queue.put("%s" % idx)
queue.join()
time_taken = time.time() - start_time
log.info("Parallel work took %s seconds." % time_taken)
start_time = time.time()
for idx in xrange(100):
#log.info(idx)
time.sleep(random.random() / 10)
time_taken = time.time() - start_time
log.info("Sync work took %s seconds." % time_taken)
if __name__ == "__main__":
main()
我正在通过 HTTP 连接到本地服务器 (OSRM) 以提交路线并取回行驶时间。我注意到 I/O 比线程慢,因为计算的等待时间似乎小于发送请求和处理 JSON 输出所需的时间(我认为 I/O 是当服务器需要一些时间来处理您的请求时更好->您不希望它被阻塞,因为您必须等待,这不是我的情况)。线程受到全局解释器锁定的影响,因此看起来(和下面的证据)我最快的选择是使用多处理。
多处理的问题是它太快了,以至于它耗尽了我的套接字,我得到了一个错误(每次请求都会发出一个新的连接)。我可以(串行)使用 requests.Sessions() 对象来保持连接,但是我不能让它并行工作(每个进程都有自己的会话)。
目前我必须使用的最接近的代码是这个多处理代码:
conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count())
def ReqOsrm(url_input):
ul, qid = url_input
try:
response = conn_pool.request('GET', ul)
json_geocode = json.loads(response.data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from, used_to = json_geocode['via_points']
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
return out
else:
print("Done but no route: %d %s" % (qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("%s: %d %s" % (err, qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
# run:
pool = Pool(cpu_count())
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()
但是,我无法让 HTTPConnectionPool 正常工作,它每次都创建新的套接字(我认为)然后给我错误:
HTTPConnectionPool(host='127.0.0.1', port=5005): Max retries exceeded with url: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))
我的目标是从 OSRM-routing server 获取距离计算,我在本地 运行(尽快)。
我有两个部分的问题 - 基本上我正在尝试使用 multiprocessing.Pool() 将一些代码转换为更好的代码(正确的异步函数 - 这样执行就不会中断并且运行得尽可能快).
我遇到的问题是我尝试的一切似乎都比多处理慢(我在下面展示了几个我尝试过的例子)。
一些可能的方法是:gevents、grequests、tornado、requests-futures、asyncio 等
A - Multiprocessing.Pool()
我最初是这样开始的:
def ReqOsrm(url_input):
req_url, query_id = url_input
try_c = 0
#print(req_url)
while try_c < 5:
try:
response = requests.get(req_url)
json_geocode = response.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
....
pool = Pool(cpu_count()-1)
calc_routes = pool.map(ReqOsrm, url_routes)
我连接到本地服务器(localhost,端口:5005)的地方,它在 8 个线程和 supports parallel execution 上启动。
经过一番搜索,我意识到我遇到的错误是因为请求是 opening a new connection/socket for each-request。所以这实际上太快了,过了一会儿就会耗尽套接字。解决这个问题的方法似乎是使用 requests.Session() - 但是我无法使用多处理(每个进程都有自己的会话)。
问题一
在某些计算机上运行良好,例如:
与以后比较:45% 服务器使用率和每秒 1700 个请求
然而,在某些情况下它没有,我不完全理解为什么:
HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))
我的猜测是,由于请求在使用时锁定套接字 - 有时服务器太慢而无法响应旧请求并生成新请求。服务器支持排队,但是请求不支持,而不是添加到队列我得到错误?
问题2.
我发现:
Blocking Or Non-Blocking?
With the default Transport Adapter in place, Requests does not provide any kind of non-blocking IO. The Response.content property will block until the entire response has been downloaded. If you require more granularity, the streaming features of the library (see Streaming Requests) allow you to retrieve smaller quantities of the response at a time. However, these calls will still block.
If you are concerned about the use of blocking IO, there are lots of projects out there that combine Requests with one of Python’s asynchronicity frameworks.
Two excellent examples are grequests and requests-futures.
B - 请求期货
为了解决这个问题,我需要重写我的代码以使用异步请求,所以我尝试了以下使用:
from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
(顺便说一下,我在启动服务器时选择了使用所有线程)
以及主要代码:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
# Submit requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid
# Process the futures as they become complete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data
except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
calc_routes.append(row)
我的函数 (ReqOsrm) 现在重写为:
def ReqOsrm(sess, resp):
json_geocode = resp.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
# Cannot find route between points (code errors as 999)
else:
out = [999, 0, 0, 0, 0, 0, 0]
resp.data = out
但是,这段代码 比多处理代码慢 !在我每秒收到大约 1700 个请求之前,现在我得到了 600 秒。我想这是因为我没有充分利用 CPU,但我不确定如何增加它?
C - 线程
我尝试了另一种方法 (creating threads) - 但是再次不确定如何使用它来最大化 CPU 使用率(理想情况下我希望看到我的服务器使用 50%,不是吗?) :
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()
def getReq(url):
try:
resp = requests.get(url)
return resp.status_code, resp
except:
return 999, None
def processReq(status, resp, qid):
try:
json_geocode = resp.json()
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done but no route")
out = [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("Error: %s" % err)
out = [qid, 999, 0, 0, 0, 0, 0, 0]
qres.put(out)
return
#Run:
concurrent = 1000
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in url_routes:
q.put(url)
q.join()
except Exception:
pass
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]
我觉得这个方法比requests_futures快,但是我不知道要设置多少线程才能最大化这个-
D - 龙卷风(不工作)
我现在正在尝试龙卷风 - 但是不能完全让它工作它会破坏现有代码 -1073741819 如果我使用 curl - 如果我使用 simple_httpclient 它可以工作但是然后我得到超时错误:
ERROR:tornado.application:Multiple exceptions in yield list Traceback (most recent call last): File "C:\Anaconda3\lib\site-packages\tornado\gen.py", line 789, in callback result_list.append(f.result()) File "C:\Anaconda3\lib\site-packages\tornado\concurrent.py", line 232, in result raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info tornado.httpclient.HTTPError: HTTP 599: Timeout
def handle_req(r):
try:
json_geocode = json_decode(r)
status = int(json_geocode['status'])
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
print(out)
except Exception as err:
print(err)
out = [999, 0, 0, 0, 0, 0, 0]
return out
# Configure
# For some reason curl_httpclient crashes my computer
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10)
@gen.coroutine
def run_experiment(urls):
http_client = AsyncHTTPClient()
responses = yield [http_client.fetch(url) for url, qid in urls]
responses_out = [handle_req(r.body) for r in responses]
raise gen.Return(value=responses_out)
# Initialise
_ioloop = ioloop.IOLoop.instance()
run_func = partial(run_experiment, url_routes)
calc_routes = _ioloop.run_sync(run_func)
E-asyncio/aiohttp
决定尝试使用 asyncio 和 aiohttp 的另一种方法(尽管让 tornado 正常工作会很棒)。
import asyncio
import aiohttp
def handle_req(data, qid):
json_geocode = json.loads(data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done, but not route for {0} - status: {1}".format(qid, status))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
return out
def chunked_http_client(num_chunks):
# Use semaphore to limit number of requests
semaphore = asyncio.Semaphore(num_chunks)
@asyncio.coroutine
# Return co-routine that will download files asynchronously and respect
# locking fo semaphore
def http_get(url, qid):
nonlocal semaphore
with (yield from semaphore):
response = yield from aiohttp.request('GET', url)
body = yield from response.content.read()
yield from response.wait_for_close()
return body, qid
return http_get
def run_experiment(urls):
http_client = chunked_http_client(500)
# http_client returns futures
# save all the futures to a list
tasks = [http_client(url, qid) for url, qid in urls]
response = []
# wait for futures to be ready then iterate over them
for future in asyncio.as_completed(tasks):
data, qid = yield from future
try:
out = handle_req(data, qid)
except Exception as err:
print("Error for {0} - {1}".format(qid,err))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
response.append(out)
return response
# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))
这工作正常,但仍然比多处理慢!
问题一
你得到错误,因为这种方法:
def ReqOsrm(url_input):
req_url, query_id = url_input
try_c = 0
#print(req_url)
while try_c < 5:
try:
response = requests.get(req_url)
json_geocode = response.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
....
pool = Pool(cpu_count()-1)
calc_routes = pool.map(ReqOsrm, url_routes)
为每个请求创建一个新的 TCP 连接 URL 并且在某些时候它失败只是因为系统没有可用的本地端口。要确认您可以 运行 netstat
,而您的代码正在执行:
netstat -a -n | find /c "localhost:5005"
这将为您提供与服务器的连接数。
另外,达到 1700 RPS 对于这种方法来说看起来很不现实,因为 requests.get
是相当昂贵的操作,而且您不太可能通过这种方式获得 50 RPS。因此,您可能需要仔细检查您的 RPS 计算。
为避免错误,您需要使用会话而不是从头开始创建连接:
import multiprocessing
import requests
import time
class Worker(multiprocessing.Process):
def __init__(self, qin, qout, *args, **kwargs):
super(Worker, self).__init__(*args, **kwargs)
self.qin = qin
self.qout = qout
def run(self):
s = requests.session()
while not self.qin.empty():
result = s.get(self.qin.get())
self.qout.put(result)
self.qin.task_done()
if __name__ == '__main__':
start = time.time()
qin = multiprocessing.JoinableQueue()
[qin.put('http://localhost:8080/') for _ in range(10000)]
qout = multiprocessing.Queue()
[Worker(qin, qout).start() for _ in range(multiprocessing.cpu_count())]
qin.join()
result = []
while not qout.empty():
result.append(qout.get())
print time.time() - start
print result
问题二
除非I/O比计算花费更多时间(例如高网络延迟、大响应等),否则线程或异步方法不会获得更高的 RPS,因为线程受 GIL 影响,因为 运行ning 在同一个 Python 进程和异步库中可能会被长 运行ning 计算阻塞。
尽管线程或异步库可以提高性能,运行在多个进程中使用相同的线程或异步代码无论如何都会给您带来更高的性能。
查看问题顶部的多处理代码。似乎每次调用 ReqOsrm 时都会调用 HttpConnectionPool()
。因此,为每个 url 创建了一个新池。相反,使用 initializer
和 args
参数为每个进程创建一个池。
conn_pool = None
def makePool(host, port):
global conn_pool
pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)
def ReqOsrm(url_input):
ul, qid = url_input
try:
response = conn_pool.request('GET', ul)
json_geocode = json.loads(response.data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from, used_to = json_geocode['via_points']
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
return out
else:
print("Done but no route: %d %s" % (qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("%s: %d %s" % (err, qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
if __name__ == "__main__":
# run:
pool = Pool(initializer=makePool, initargs=('127.0.0.1', 5005))
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()
request-futures 版本似乎有缩进错误。循环
for future in as_completed(futures):
在外循环下缩进
for i in range(len(url_routes)):
。因此,在外循环中发出请求,然后内循环在外循环的下一次迭代之前等待那个未来到 return。这使得请求 运行 串行而不是并行。
我觉得代码应该是这样的:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
# Submit all the requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid
# this was indented under the code in section B of the question
# process the futures as they become copmlete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data
except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
print(row)
calc_routes.append(row)
感谢大家的帮助。我想 post 我的结论:
由于我的 HTTP 请求是发送给本地服务器的,它会立即处理请求,因此使用异步方法对我来说没有多大意义(与通过 Internet 发送请求的大多数情况相比)。对我来说,代价高昂的因素实际上是发送请求和处理反馈,这意味着我使用多个进程(线程受 GIL 影响)可以获得更快的速度。我还应该使用会话来提高速度(无需关闭并重新打开与同一服务器的连接)并帮助防止端口耗尽。
以下是使用示例 RPS 尝试(有效)的所有方法:
连载
S1。串行 GET 请求(无会话)-> 215 RPS
def ReqOsrm(data):
url, qid = data
try:
response = requests.get(url)
json_geocode = json.loads(response.content.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
return [qid, 999, 0, 0]
# Run:
calc_routes = [ReqOsrm(x) for x in url_routes]
S2。串行 GET 请求 (requests.Session()) -> 335 RPS
session = requests.Session()
def ReqOsrm(data):
url, qid = data
try:
response = session.get(url)
json_geocode = json.loads(response.content.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
return [qid, 999, 0, 0]
# Run:
calc_routes = [ReqOsrm(x) for x in url_routes]
S3。串行 GET 请求 (urllib3.HTTPConnectionPool) -> 545 RPS
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=1)
def ReqOsrm(data):
url, qid = data
try:
response = conn_pool.request('GET', url)
json_geocode = json.loads(response.data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
return [qid, 999, 0, 0]
# Run:
calc_routes = [ReqOsrm(x) for x in url_routes]
异步 IO
A4。 AsyncIO with aiohttp -> 450 RPS
import asyncio
import aiohttp
concurrent = 100
def handle_req(data, qid):
json_geocode = json.loads(data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
def chunked_http_client(num_chunks):
# Use semaphore to limit number of requests
semaphore = asyncio.Semaphore(num_chunks)
@asyncio.coroutine
# Return co-routine that will download files asynchronously and respect
# locking fo semaphore
def http_get(url, qid):
nonlocal semaphore
with (yield from semaphore):
with aiohttp.ClientSession() as session:
response = yield from session.get(url)
body = yield from response.content.read()
yield from response.wait_for_close()
return body, qid
return http_get
def run_experiment(urls):
http_client = chunked_http_client(num_chunks=concurrent)
# http_client returns futures, save all the futures to a list
tasks = [http_client(url, qid) for url, qid in urls]
response = []
# wait for futures to be ready then iterate over them
for future in asyncio.as_completed(tasks):
data, qid = yield from future
try:
out = handle_req(data, qid)
except Exception as err:
print("Error for {0} - {1}".format(qid,err))
out = [qid, 999, 0, 0]
response.append(out)
return response
# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))
A5。没有会话的线程 -> 330 RPS
from threading import Thread
from queue import Queue
concurrent = 100
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()
def getReq(url):
try:
resp = requests.get(url)
return resp.status_code, resp
except:
return 999, None
def processReq(status, resp, qid):
try:
json_geocode = json.loads(resp.content.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
out = [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err, qid, url)
out = [qid, 999, 0, 0]
qres.put(out)
return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
for url in url_routes:
q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]
A6。使用 HTTPConnectionPool 的线程 -> 1550 RPS
from threading import Thread
from queue import Queue
from urllib3 import HTTPConnectionPool
concurrent = 100
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=concurrent)
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()
def getReq(url):
try:
resp = conn_pool.request('GET', url)
return resp.status, resp
except:
return 999, None
def processReq(status, resp, qid):
try:
json_geocode = json.loads(resp.data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
out = [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err, qid, url)
out = [qid, 999, 0, 0]
qres.put(out)
return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
for url in url_routes:
q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]
A7。请求期货 -> 520 RPS
from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
concurrent = 100
def ReqOsrm(sess, resp):
try:
json_geocode = resp.json()
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
out = [200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err)
out = [999, 0, 0]
resp.data = out
#Run:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=concurrent)) as session:
# Submit requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid
# Process the futures as they become complete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data
except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0]
calc_routes.append(row)
多进程
P8。 multiprocessing.worker + 队列 + requests.session() -> 1058 RPS
from multiprocessing import *
class Worker(Process):
def __init__(self, qin, qout, *args, **kwargs):
super(Worker, self).__init__(*args, **kwargs)
self.qin = qin
self.qout = qout
def run(self):
s = requests.session()
while not self.qin.empty():
url, qid = self.qin.get()
data = s.get(url)
self.qout.put(ReqOsrm(data, qid))
self.qin.task_done()
def ReqOsrm(resp, qid):
try:
json_geocode = json.loads(resp.content.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err, qid)
return [qid, 999, 0, 0]
# Run:
qout = Queue()
qin = JoinableQueue()
[qin.put(url_q) for url_q in url_routes]
[Worker(qin, qout).start() for _ in range(cpu_count())]
qin.join()
calc_routes = []
while not qout.empty():
calc_routes.append(qout.get())
P9. multiprocessing.worker + 队列 + HTTPConnectionPool() -> 1230 RPS
P10。多处理 v2(不太确定这有什么不同)-> 1350 RPS
conn_pool = None
def makePool(host, port):
global conn_pool
pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)
def ReqOsrm(data):
url, qid = data
try:
response = conn_pool.request('GET', url)
json_geocode = json.loads(response.data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err, qid, url)
return [qid, 999, 0, 0]
# Run:
pool = Pool(initializer=makePool, initargs=(ghost, gport))
calc_routes = pool.map(ReqOsrm, url_routes)
所以总而言之,对我来说最好的方法似乎是#10(令人惊讶的是#6)
这是我在 gevent 中使用的模式,它是基于协程的,可能不会受到 GIL 的影响。这可能比使用线程更快,并且在与多处理结合使用时可能最快(目前它只使用 1 个核心):
from gevent import monkey
monkey.patch_all()
import logging
import random
import time
from threading import Thread
from gevent.queue import JoinableQueue
from logger import initialize_logger
initialize_logger()
log = logging.getLogger(__name__)
class Worker(Thread):
def __init__(self, worker_idx, queue):
# initialize the base class
super(Worker, self).__init__()
self.worker_idx = worker_idx
self.queue = queue
def log(self, msg):
log.info("WORKER %s - %s" % (self.worker_idx, msg))
def do_work(self, line):
#self.log(line)
time.sleep(random.random() / 10)
def run(self):
while True:
line = self.queue.get()
self.do_work(line)
self.queue.task_done()
def main(number_of_workers=20):
start_time = time.time()
queue = JoinableQueue()
for idx in range(number_of_workers):
worker = Worker(idx, queue)
# "daemonize" a thread to ensure that the threads will
# close when the main program finishes
worker.daemon = True
worker.start()
for idx in xrange(100):
queue.put("%s" % idx)
queue.join()
time_taken = time.time() - start_time
log.info("Parallel work took %s seconds." % time_taken)
start_time = time.time()
for idx in xrange(100):
#log.info(idx)
time.sleep(random.random() / 10)
time_taken = time.time() - start_time
log.info("Sync work took %s seconds." % time_taken)
if __name__ == "__main__":
main()