读取 Http 流
Read Http Stream
我正在尝试从流 API 中读取数据,其中数据是使用分块传输编码发送的。每个块可以有多个记录,每个记录由 CRLF 分隔。数据总是使用 gzip 压缩发送。我正在尝试获取提要,然后一次进行一些处理。我浏览了一堆 Whosebug 资源,但在 Python 中找不到实现它的方法。 iter_content(chunk) 大小在我的例子中抛出异常。
for chunk in api_response.iter_content(chunk_size=1024):
在 Fiddler(我用作代理)中,我可以看到数据正在不断下载并在 Fiddler 中执行 "COMETPeek",我实际上可以看到一些示例 json。
连iter_lines都不行。我看过这里提到的 asyncio 和 aiohttp 案例:Why doesn't requests.get() return? What is the default timeout that requests.get() uses?
但不确定如何进行处理。如您所见,我已经尝试使用一堆 python 库。抱歉,某些代码可能包含一些库,我后来将其从使用中删除,因为它没有成功。
我也查看了请求库的文档,但找不到任何实质内容。
如上所述,下面是我正在尝试执行的示例代码。任何关于我应该如何进行的指示都将不胜感激。
这是我第一次尝试读取流
from oauthlib.oauth2 import BackendApplicationClient
from requests_oauthlib import OAuth2Session
import requests
import zlib
import json
READ_BLOCK_SIZE = 1024*8
clientID="ClientID"
clientSecret="ClientSecret"
proxies = {
"https": "http://127.0.0.1:8888",
}
client = BackendApplicationClient(client_id=clientID)
oauth = OAuth2Session(client=client)
token = oauth.fetch_token(token_url='https://baseTokenURL/token', client_id=clientID,client_secret=clientSecret,proxies=proxies,verify=False)
auth_t=token['access_token']
#auth_t = accesstoken.encode("ascii", "ignore")
headers = {
'authorization': "Bearer " + auth_t,
'content-type': "application/json",
'Accept-Encoding': "gzip",
}
dec=zlib.decompressobj(32 + zlib.MAX_WBITS)
try:
init_res = requests.get('https://BaseStreamURL/api/1/stream/specificStream', headers=headers, allow_redirects=False,proxies=proxies,verify=False)
if init_res.status_code == 302:
print(init_res.headers['Location'])
api_response = requests.get(init_res.headers['Location'], headers=headers, allow_redirects=False,proxies=proxies,verify=False, timeout=20, stream=True,params={"smoothing":"1", "smoothingBucketSize" : "180"})
if api_response.status_code == 200:
#api_response.raw.decode_content = True
#print(api_response.raw.read(20))
for chunk in api_response.iter_content(chunk_size=api_response.chunk_size):
#Parse the response
elif init_res.status_code == 200:
print(init_res.content)
except Exception as ce:
print(ce)
更新
我现在正在看这个:https://aiohttp.readthedocs.io/en/v0.20.0/client.html
这是要走的路吗?
以防万一有人觉得这有用。我找到了一种使用 aiohttp 从 api 到 python 进行流式传输的方法。下面是骨架。请记住,它只是一个骨架,它通过不断向我展示结果来工作。如果有人有更好的方法——我会洗耳恭听,因为这是我第一次尝试直播。
async def fetch(session, url, headers):
with async_timeout.timeout(None):
async with session.get(init_res.headers['Location'], headers=headers, proxy="http://127.0.0.1:8888", allow_redirects=False,timeout=None) as r:
while True:
chunk=await r.content.read(1024*3)
if not chunk:
break
print(chunk)
async def main(url, headers):
async with aiohttp.ClientSession() as session:
html = await fetch(session, url,headers)
在调用者中
try:
init_res = requests.get('https://BaseStreamURL/api/1/stream/specificStream', headers=headers, allow_redirects=False,proxies=proxies,verify=False)
if init_res.status_code == 302:
loc=init_res.headers['Location']
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loc, headers=headers))
elif init_res.status_code == 200:
print(init_res.content)
except Exception as ce:
print(ce)
我已经通过堆栈溢出答案中的以下点点滴滴实现了上述目标
以下对我有用。
MAX_REDIRECTS =1000
def get_data(url, **kwargs):
import requests
kwargs.setdefault('allow_redirects', False)
for i in range(0, MAX_REDIRECTS):
response = requests.get(url, **kwargs)
#check for response codes to check if redirects happedned
if response.status_code == requests.codes.moved or \
response.status_code == requests.codes.found:
if 'Location' in response.headers:
url = response.headers['Location']
content_type_header = response.headers.get('content_type')
continue
else:
print ("problem reading")
return response
在你的行中调用上面的函数
init_res = requests.get('https://BaseStreamURL/api/1/stream/specificStream', headers=headers, allow_redirects=False,proxies=proxies,verify=False)
至
init_res = get_data('https://BaseStreamURL/api/1/stream/specificStream',stream=True, headers=headers,params=payload)
我正在尝试从流 API 中读取数据,其中数据是使用分块传输编码发送的。每个块可以有多个记录,每个记录由 CRLF 分隔。数据总是使用 gzip 压缩发送。我正在尝试获取提要,然后一次进行一些处理。我浏览了一堆 Whosebug 资源,但在 Python 中找不到实现它的方法。 iter_content(chunk) 大小在我的例子中抛出异常。
for chunk in api_response.iter_content(chunk_size=1024):
在 Fiddler(我用作代理)中,我可以看到数据正在不断下载并在 Fiddler 中执行 "COMETPeek",我实际上可以看到一些示例 json。
连iter_lines都不行。我看过这里提到的 asyncio 和 aiohttp 案例:Why doesn't requests.get() return? What is the default timeout that requests.get() uses?
但不确定如何进行处理。如您所见,我已经尝试使用一堆 python 库。抱歉,某些代码可能包含一些库,我后来将其从使用中删除,因为它没有成功。
我也查看了请求库的文档,但找不到任何实质内容。
如上所述,下面是我正在尝试执行的示例代码。任何关于我应该如何进行的指示都将不胜感激。
这是我第一次尝试读取流
from oauthlib.oauth2 import BackendApplicationClient
from requests_oauthlib import OAuth2Session
import requests
import zlib
import json
READ_BLOCK_SIZE = 1024*8
clientID="ClientID"
clientSecret="ClientSecret"
proxies = {
"https": "http://127.0.0.1:8888",
}
client = BackendApplicationClient(client_id=clientID)
oauth = OAuth2Session(client=client)
token = oauth.fetch_token(token_url='https://baseTokenURL/token', client_id=clientID,client_secret=clientSecret,proxies=proxies,verify=False)
auth_t=token['access_token']
#auth_t = accesstoken.encode("ascii", "ignore")
headers = {
'authorization': "Bearer " + auth_t,
'content-type': "application/json",
'Accept-Encoding': "gzip",
}
dec=zlib.decompressobj(32 + zlib.MAX_WBITS)
try:
init_res = requests.get('https://BaseStreamURL/api/1/stream/specificStream', headers=headers, allow_redirects=False,proxies=proxies,verify=False)
if init_res.status_code == 302:
print(init_res.headers['Location'])
api_response = requests.get(init_res.headers['Location'], headers=headers, allow_redirects=False,proxies=proxies,verify=False, timeout=20, stream=True,params={"smoothing":"1", "smoothingBucketSize" : "180"})
if api_response.status_code == 200:
#api_response.raw.decode_content = True
#print(api_response.raw.read(20))
for chunk in api_response.iter_content(chunk_size=api_response.chunk_size):
#Parse the response
elif init_res.status_code == 200:
print(init_res.content)
except Exception as ce:
print(ce)
更新 我现在正在看这个:https://aiohttp.readthedocs.io/en/v0.20.0/client.html
这是要走的路吗?
以防万一有人觉得这有用。我找到了一种使用 aiohttp 从 api 到 python 进行流式传输的方法。下面是骨架。请记住,它只是一个骨架,它通过不断向我展示结果来工作。如果有人有更好的方法——我会洗耳恭听,因为这是我第一次尝试直播。
async def fetch(session, url, headers):
with async_timeout.timeout(None):
async with session.get(init_res.headers['Location'], headers=headers, proxy="http://127.0.0.1:8888", allow_redirects=False,timeout=None) as r:
while True:
chunk=await r.content.read(1024*3)
if not chunk:
break
print(chunk)
async def main(url, headers):
async with aiohttp.ClientSession() as session:
html = await fetch(session, url,headers)
在调用者中
try:
init_res = requests.get('https://BaseStreamURL/api/1/stream/specificStream', headers=headers, allow_redirects=False,proxies=proxies,verify=False)
if init_res.status_code == 302:
loc=init_res.headers['Location']
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loc, headers=headers))
elif init_res.status_code == 200:
print(init_res.content)
except Exception as ce:
print(ce)
我已经通过堆栈溢出答案中的以下点点滴滴实现了上述目标 以下对我有用。
MAX_REDIRECTS =1000
def get_data(url, **kwargs):
import requests
kwargs.setdefault('allow_redirects', False)
for i in range(0, MAX_REDIRECTS):
response = requests.get(url, **kwargs)
#check for response codes to check if redirects happedned
if response.status_code == requests.codes.moved or \
response.status_code == requests.codes.found:
if 'Location' in response.headers:
url = response.headers['Location']
content_type_header = response.headers.get('content_type')
continue
else:
print ("problem reading")
return response
在你的行中调用上面的函数
init_res = requests.get('https://BaseStreamURL/api/1/stream/specificStream', headers=headers, allow_redirects=False,proxies=proxies,verify=False)
至
init_res = get_data('https://BaseStreamURL/api/1/stream/specificStream',stream=True, headers=headers,params=payload)