如何在不中断流的情况下从 asyncio 安全地读取 ReaderStream
How to safely read ReaderStream from asyncio without breaking the stream
我正在尝试构建一个 man-in-middle 代理服务器,将客户端请求中继到我的应用程序中预定义的各种代理服务。所以,为了做到这一点,我需要区分客户对服务的请求。
只要我不取消注释 # data = await client_reader.read(2048)
我可以从中解析 headers,下面的代码就可以工作。即如果我执行该代码,
说如果我用未注释的那行代码来做这个:
r = requests.get('http://api.ipify.org/', headers = {'Proxy-Type':'custom'}, proxies={'http':'http://127.0.0.1:9911'})
我会在 r.content
中从 ipify 得到一个 408 Request Time-out
async def proxy_data(reader, writer, connection_string):
try:
while True:
data = await reader.read(2048)
if not data:
break
writer.write(data)
await writer.drain()
except Exception as e:
raise
finally:
writer.close()
async def accept_client(client_reader, client_writer):
try:
# Get proxy service - [Proxy-Type] from header via client_reader
# Set remote_address and remote_port based on it
# data = await client_reader.read(2048)
(remote_reader, remote_writer) = await asyncio.wait_for(
asyncio.open_connection(host = remote_address, port = remote_port),
timeout = 30)
except asyncio.TimeoutError:
client_writer.close()
except Exception as e:
client_writer.close()
else:
# Pipe the streams
asyncio.ensure_future(proxy_data(client_reader, remote_writer))
asyncio.ensure_future(proxy_data(remote_reader, client_writer))
def main():
def handle_client(client_reader, client_writer):
asyncio.ensure_future(
accept_client(
client_reader = client_reader,
client_writer = client_writer
)
)
loop = asyncio.get_event_loop()
try:
server = loop.run_until_complete(
asyncio.start_server(
handle_client, host = '127.0.0.1', port = 9911))
except Exception as e:
logger.error('Bind error: {}'.format(e))
sys.exit(1)
for s in server.sockets:
logger.debug('Proxy broker listening on {}'.format(s.getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()
谁能指出这里的问题或如何有条件地打开连接?
通过重新加载 client_reader (ReaderStream)
解决了问题
感谢 Vincent 指出请求篡改情况并将请求重构回其原始形式的意见
async def accept_client(client_reader, client_writer):
try:
# Get proxy service - [Proxy-Type] from header via client_reader
# Set remote_address and remote_port based on it
data = await client_reader.read(2048)
# -------- Edited --------
# perform operations based on data and obtain remote_address, remote_port
(remote_reader, remote_writer) = await asyncio.wait_for(
asyncio.open_connection(host = remote_address, port = remote_port),
timeout = 30)
except asyncio.TimeoutError:
client_writer.close()
except Exception as e:
client_writer.close()
else:
# Write the data to remote
remote_writer.write(data)
await remote_writer.drain()
# Pipe the streams
asyncio.ensure_future(proxy_data(client_reader, remote_writer))
asyncio.ensure_future(proxy_data(remote_reader, client_writer))
我正在尝试构建一个 man-in-middle 代理服务器,将客户端请求中继到我的应用程序中预定义的各种代理服务。所以,为了做到这一点,我需要区分客户对服务的请求。
只要我不取消注释 # data = await client_reader.read(2048)
我可以从中解析 headers,下面的代码就可以工作。即如果我执行该代码,
说如果我用未注释的那行代码来做这个:
r = requests.get('http://api.ipify.org/', headers = {'Proxy-Type':'custom'}, proxies={'http':'http://127.0.0.1:9911'})
我会在 r.content
408 Request Time-out
async def proxy_data(reader, writer, connection_string):
try:
while True:
data = await reader.read(2048)
if not data:
break
writer.write(data)
await writer.drain()
except Exception as e:
raise
finally:
writer.close()
async def accept_client(client_reader, client_writer):
try:
# Get proxy service - [Proxy-Type] from header via client_reader
# Set remote_address and remote_port based on it
# data = await client_reader.read(2048)
(remote_reader, remote_writer) = await asyncio.wait_for(
asyncio.open_connection(host = remote_address, port = remote_port),
timeout = 30)
except asyncio.TimeoutError:
client_writer.close()
except Exception as e:
client_writer.close()
else:
# Pipe the streams
asyncio.ensure_future(proxy_data(client_reader, remote_writer))
asyncio.ensure_future(proxy_data(remote_reader, client_writer))
def main():
def handle_client(client_reader, client_writer):
asyncio.ensure_future(
accept_client(
client_reader = client_reader,
client_writer = client_writer
)
)
loop = asyncio.get_event_loop()
try:
server = loop.run_until_complete(
asyncio.start_server(
handle_client, host = '127.0.0.1', port = 9911))
except Exception as e:
logger.error('Bind error: {}'.format(e))
sys.exit(1)
for s in server.sockets:
logger.debug('Proxy broker listening on {}'.format(s.getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()
谁能指出这里的问题或如何有条件地打开连接?
通过重新加载 client_reader (ReaderStream)
解决了问题感谢 Vincent 指出请求篡改情况并将请求重构回其原始形式的意见
async def accept_client(client_reader, client_writer):
try:
# Get proxy service - [Proxy-Type] from header via client_reader
# Set remote_address and remote_port based on it
data = await client_reader.read(2048)
# -------- Edited --------
# perform operations based on data and obtain remote_address, remote_port
(remote_reader, remote_writer) = await asyncio.wait_for(
asyncio.open_connection(host = remote_address, port = remote_port),
timeout = 30)
except asyncio.TimeoutError:
client_writer.close()
except Exception as e:
client_writer.close()
else:
# Write the data to remote
remote_writer.write(data)
await remote_writer.drain()
# Pipe the streams
asyncio.ensure_future(proxy_data(client_reader, remote_writer))
asyncio.ensure_future(proxy_data(remote_reader, client_writer))