如何让tornado执行并发代码?
How to make tornado execute concurrent code?
我正在尝试为 Tornado 服务器编写一个简单的工作负载生成器,这是它的简化版本:
class EventsLoader(object):
generate_num_requests = 1000
generate_concurrency = 32
server_port = 8001
def __init__(self, conf_file):
self.parse_config(conf_file)
self.client = AsyncHTTPClient()
def generate(self):
IOLoop.current().run_sync(self.generate_work)
@gen.coroutine
def generate_work(self):
self.queue = queues.Queue()
IOLoop.current().spawn_callback(self.fetch_requests)
for i in range(self.generate_concurrency):
yield self.generate_requests(i)
print 'before join queue size: %s' % self.queue.qsize()
yield self.queue.join()
@gen.coroutine
def generate_requests(self, i):
load = self.generate_num_requests / self.generate_concurrency
for j in range(load):
request = self.generate_request(i * 1000 + j)
self.queue.put(request)
@gen.coroutine
def fetch_requests(self):
while True:
try:
request = yield self.queue.get()
yield self.client.fetch(request)
except Exception as e:
print 'failed fetching: %s: %s' % (request.body, e)
finally:
print 'fetched: %s' % json.loads(request.body)['seq']
self.queue.task_done()
def generate_request(self, seq):
event = {
'seq': seq,
# ... more fields here ...
}
return HTTPRequest(
'http://localhost:%s/events' % self.server_port,
method='POST',
body=json.dumps(event),
)
我看到的是所有消息 fetched: xxxx
都按顺序出现,如果生成器确实在同时工作,这是绝对不可能的。
我怎样才能做到 运行 并发?在我对 I/O 循环的用途以及 @gen.coroutine
的作用的理解中,一定有一些巨大的缺失。 IE。无论我的 generate_concurrency
设置如何,性能都没有改变。
无论您如何生成请求,您都只能执行获取它们的任务。您需要并行化的是获取,而不是生成:
for i in range(self.fetch_concurrency):
IOLoop.current().spawn_callback(self.fetch_requests)
这将为您提供多个 fetch_requests
可以从共享队列中提取工作的工作程序。
另外,这段代码的生成部分也不是 运行并行的。而不是
for i in range(self.generate_concurrency):
yield self.generate_requests(i)
等待一个 generate_requests
调用完成再开始下一个,您可以 运行 它们与
并行
yield [self.generate_requests(i) for i in range(self.generate_concurrency)]
我正在尝试为 Tornado 服务器编写一个简单的工作负载生成器,这是它的简化版本:
class EventsLoader(object):
generate_num_requests = 1000
generate_concurrency = 32
server_port = 8001
def __init__(self, conf_file):
self.parse_config(conf_file)
self.client = AsyncHTTPClient()
def generate(self):
IOLoop.current().run_sync(self.generate_work)
@gen.coroutine
def generate_work(self):
self.queue = queues.Queue()
IOLoop.current().spawn_callback(self.fetch_requests)
for i in range(self.generate_concurrency):
yield self.generate_requests(i)
print 'before join queue size: %s' % self.queue.qsize()
yield self.queue.join()
@gen.coroutine
def generate_requests(self, i):
load = self.generate_num_requests / self.generate_concurrency
for j in range(load):
request = self.generate_request(i * 1000 + j)
self.queue.put(request)
@gen.coroutine
def fetch_requests(self):
while True:
try:
request = yield self.queue.get()
yield self.client.fetch(request)
except Exception as e:
print 'failed fetching: %s: %s' % (request.body, e)
finally:
print 'fetched: %s' % json.loads(request.body)['seq']
self.queue.task_done()
def generate_request(self, seq):
event = {
'seq': seq,
# ... more fields here ...
}
return HTTPRequest(
'http://localhost:%s/events' % self.server_port,
method='POST',
body=json.dumps(event),
)
我看到的是所有消息 fetched: xxxx
都按顺序出现,如果生成器确实在同时工作,这是绝对不可能的。
我怎样才能做到 运行 并发?在我对 I/O 循环的用途以及 @gen.coroutine
的作用的理解中,一定有一些巨大的缺失。 IE。无论我的 generate_concurrency
设置如何,性能都没有改变。
无论您如何生成请求,您都只能执行获取它们的任务。您需要并行化的是获取,而不是生成:
for i in range(self.fetch_concurrency):
IOLoop.current().spawn_callback(self.fetch_requests)
这将为您提供多个 fetch_requests
可以从共享队列中提取工作的工作程序。
另外,这段代码的生成部分也不是 运行并行的。而不是
for i in range(self.generate_concurrency):
yield self.generate_requests(i)
等待一个 generate_requests
调用完成再开始下一个,您可以 运行 它们与
yield [self.generate_requests(i) for i in range(self.generate_concurrency)]