Tweepy Connection broken: IncompleteRead - 处理异常的最佳方法?或者,线程可以帮助避免吗?
Tweepy Connection broken: IncompleteRead - best way to handle exception? or, can threading help avoid?
我正在使用 tweepy 处理大型 Twitter 流(关注 4,000 多个帐户)。添加到流中的帐户越多,出现此错误的可能性就越大:
Traceback (most recent call last):
File "myscript.py", line 2103, in <module>
main()
File "myscript.py", line 2091, in main
twitter_stream.filter(follow=USERS_TO_FOLLOW_STRING_LIST, stall_warnings=True)
File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 445, in filter
self._start(async)
File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 361, in _start
self._run()
File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 294, in _run
raise exception
requests.packages.urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(0 bytes read, 2000 more expected)', IncompleteRead(0 bytes read, 2000 more expected))
显然这是一根粗的消防水管 - 根据经验,很明显,它 太 粗而无法处理。根据在 Whosebug 上研究这个错误以及 'the more accounts to follow I add, the faster this exception occurs' 的经验趋势,我的假设是这是 'my fault'。我处理每条推文的时间太长 and/or 我的消防水管太粗了。我明白了。
但尽管如此设置,我仍然有两个问题似乎无法找到可靠的答案。
1.有没有办法简单地'handle'这个异常,接受我会错过一些推文,但保留脚本运行?我想它可能会错过一条推文(或许多推文,但如果我可以在没有我想要的 100% 的推文的情况下生活,那么 script/stream 仍然可以继续,随时准备捕捉下一条推文。
我已经尝试过这种异常处理,这是在 Whosebug 上的类似问题中推荐的:
来自 urllib3.exceptions 导入 ProtocolError
while True:
try:
twitter_stream.filter(follow=USERS_TO_FOLLOW_STRING_LIST, stall_warnings=True)
except ProtocolError:
continue
但对我来说不幸的是,(也许我实施不正确,但我不认为我做了),那没有用。无论有没有推荐的异常处理代码,我都得到了与之前相同的错误。
- 我从未在我的 python 代码中实现队列 and/or 线程。现在是我尝试实施它的好时机吗?我对 queues/threading 不是一无所知,但我在想象......
我可以在一个线程上将推文以原始方式写入内存、数据库或其他东西吗?然后,准备好第二个线程来处理这些推文,一旦准备就绪?我认为,至少,我对推文的 post 处理是我正在阅读的流水线带宽的限制因素。然后如果我仍然得到错误我可以减少我关注的人等
我看过一些线程教程,但认为 'works' 与...这个 tweepy/twitter/etc/ 复杂性是否值得一问。我对我对我遇到的问题的理解或线程如何帮助我没有信心,所以我想我可以就这是否真的对我有帮助征求意见。
如果这个想法是正确的,是否有一些简单的示例代码可以帮助我指出正确的方向?
我想我终于完成了我的第一个 queue/thread 实施,从而解决了这个问题。我的学识还不足以了解执行此操作的最佳方法,但我认为这种方法确实有效。使用下面的代码,我现在建立了一个新推文队列,并可以在队列中按我的意愿处理它们,而不是落在后面并失去与 tweepy 的联系。
from Queue import Queue
from threading import Thread
class My_Parser(tweepy.StreamListener):
def __init__(self, q = Queue()):
num_worker_threads = 4
self.q = q
for i in range(num_worker_threads):
t = Thread(target=self.do_stuff)
t.daemon = True
t.start()
def on_data(self, data):
self.q.put(data)
def do_stuff(self):
while True:
do_whatever(self.q.get())
self.q.task_done()
我确实继续挖掘了一段时间关于 IncompleteRead 错误,我尝试了更多使用 url 库和 http 库的异常处理解决方案,但我为此苦苦挣扎。而且我认为除了保持连接之外,排队的东西可能还有一些好处(首先,不会丢失数据)。
希望这对某人有所帮助。哈哈
非常感谢你,我遇到了这样的问题并尝试了各种解决方案。发生这种情况是因为,除了从 API 进行流式传输外,我还在对数据进行大量处理,这让我失去了联系。我只是按照你的方式做了一些调整,我不得不在 init 方法中添加 super().init() 因为我是使用 on_status
并且导入队列必须小写。
另一件事,我没有制作 do_whatever
,我只是把 self.q.get()
放在里面。
无论如何,完美运行,再次感谢。
最终代码:
from queue import Queue
from threading import Thread
class Listener(tweepy.StreamListener):
def __init__(self, q = Queue()):
super().__init__()
self.q = q
for i in range(4):
t = Thread(target=self.do_stuff)
t.daemon = True
t.start()
def on_status(self, status):
<my code here>
def do_stuff(self):
while True:
self.q.get()
self.q.task_done()
对我来说,类似的方法有效:
使用调度程序,您可以 运行 间隔一段时间尝试重新连接的方法。请注意,使用参数 stall_warnings=True 可以大大减少错误。 Twitter API 检测到多次重新连接尝试并限制您的速率,因此重新连接的最佳机会是实施退避策略。使用下面的方法,处理各种断开连接,因此如果只需要检测特定错误,则需要修改。如果在同一个客户端 class 中使用了多个流,则必须相应地调整调度程序和 ID。此外,您必须将自己的 MyStreamListener(tweepy.StreamListener) class 实现包含到解决方案中。
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import logging
import asyncio
class MyClient():
def __init__():
self.scheduler = AsyncIOScheduler(event_loop=asyncio.get_event_loop())
self.stream = None
self.stream_listener = None
self.userIDToFollow = 123456789 # insert here twitter id of the user
logging.basicConfig(level=logging.INFO)
def start_stream(self):
try:
self.stream_listener = MyStreamListener()
self.stream = tweepy.Stream(auth=self.api.auth, listener=self.stream_listener)
self.stream.filter(follow=[str(self.userIDToFollow)], is_async=True, stall_warnings=True)
self.scheduler.add_job(self.keep_stream_alive, 'interval', minutes=5, max_instances=1, replace_existing=True, id='keep_stream_alive')
logging.info("Stream started successfully!")
except Exception as e:
self.stop_stream() # for cleanup purposes
logging.error(e) # feel free to log more
def stop_stream(self):
try:
self.stream.disconnect()
self.stream_listener = None
self.scheduler.remove_job('keep_stream_alive')
logging.info("Stream stopped successfully!")
except Exception as e:
logging.error(e)
async def keep_stream_alive(self):
try:
if not self.stream.running:
for i in range(5, 100, 5): # basic backoff strategy, feel free to implement yours
logging.error("Stream disconnected!")
logging.info("Sleeping for " + str(i) + " seconds...")
time.sleep(i)
logging.info("Attempting to reconnect...")
self.start_stream()
if self.stream.running:
break
if self.stream.running is False:
# Reconnection not possible - Stopped trying
logging.error("Reconnection not possible - Stopped trying")
else:
logging.info("Stream is still running.")
except Exception as e:
logging.error(e)
我正在使用 tweepy 处理大型 Twitter 流(关注 4,000 多个帐户)。添加到流中的帐户越多,出现此错误的可能性就越大:
Traceback (most recent call last):
File "myscript.py", line 2103, in <module>
main()
File "myscript.py", line 2091, in main
twitter_stream.filter(follow=USERS_TO_FOLLOW_STRING_LIST, stall_warnings=True)
File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 445, in filter
self._start(async)
File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 361, in _start
self._run()
File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 294, in _run
raise exception
requests.packages.urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(0 bytes read, 2000 more expected)', IncompleteRead(0 bytes read, 2000 more expected))
显然这是一根粗的消防水管 - 根据经验,很明显,它 太 粗而无法处理。根据在 Whosebug 上研究这个错误以及 'the more accounts to follow I add, the faster this exception occurs' 的经验趋势,我的假设是这是 'my fault'。我处理每条推文的时间太长 and/or 我的消防水管太粗了。我明白了。
但尽管如此设置,我仍然有两个问题似乎无法找到可靠的答案。
1.有没有办法简单地'handle'这个异常,接受我会错过一些推文,但保留脚本运行?我想它可能会错过一条推文(或许多推文,但如果我可以在没有我想要的 100% 的推文的情况下生活,那么 script/stream 仍然可以继续,随时准备捕捉下一条推文。
我已经尝试过这种异常处理,这是在 Whosebug 上的类似问题中推荐的: 来自 urllib3.exceptions 导入 ProtocolError
while True:
try:
twitter_stream.filter(follow=USERS_TO_FOLLOW_STRING_LIST, stall_warnings=True)
except ProtocolError:
continue
但对我来说不幸的是,(也许我实施不正确,但我不认为我做了),那没有用。无论有没有推荐的异常处理代码,我都得到了与之前相同的错误。
- 我从未在我的 python 代码中实现队列 and/or 线程。现在是我尝试实施它的好时机吗?我对 queues/threading 不是一无所知,但我在想象......
我可以在一个线程上将推文以原始方式写入内存、数据库或其他东西吗?然后,准备好第二个线程来处理这些推文,一旦准备就绪?我认为,至少,我对推文的 post 处理是我正在阅读的流水线带宽的限制因素。然后如果我仍然得到错误我可以减少我关注的人等
我看过一些线程教程,但认为 'works' 与...这个 tweepy/twitter/etc/ 复杂性是否值得一问。我对我对我遇到的问题的理解或线程如何帮助我没有信心,所以我想我可以就这是否真的对我有帮助征求意见。
如果这个想法是正确的,是否有一些简单的示例代码可以帮助我指出正确的方向?
我想我终于完成了我的第一个 queue/thread 实施,从而解决了这个问题。我的学识还不足以了解执行此操作的最佳方法,但我认为这种方法确实有效。使用下面的代码,我现在建立了一个新推文队列,并可以在队列中按我的意愿处理它们,而不是落在后面并失去与 tweepy 的联系。
from Queue import Queue
from threading import Thread
class My_Parser(tweepy.StreamListener):
def __init__(self, q = Queue()):
num_worker_threads = 4
self.q = q
for i in range(num_worker_threads):
t = Thread(target=self.do_stuff)
t.daemon = True
t.start()
def on_data(self, data):
self.q.put(data)
def do_stuff(self):
while True:
do_whatever(self.q.get())
self.q.task_done()
我确实继续挖掘了一段时间关于 IncompleteRead 错误,我尝试了更多使用 url 库和 http 库的异常处理解决方案,但我为此苦苦挣扎。而且我认为除了保持连接之外,排队的东西可能还有一些好处(首先,不会丢失数据)。
希望这对某人有所帮助。哈哈
非常感谢你,我遇到了这样的问题并尝试了各种解决方案。发生这种情况是因为,除了从 API 进行流式传输外,我还在对数据进行大量处理,这让我失去了联系。我只是按照你的方式做了一些调整,我不得不在 init 方法中添加 super().init() 因为我是使用 on_status
并且导入队列必须小写。
另一件事,我没有制作 do_whatever
,我只是把 self.q.get()
放在里面。
无论如何,完美运行,再次感谢。
最终代码:
from queue import Queue
from threading import Thread
class Listener(tweepy.StreamListener):
def __init__(self, q = Queue()):
super().__init__()
self.q = q
for i in range(4):
t = Thread(target=self.do_stuff)
t.daemon = True
t.start()
def on_status(self, status):
<my code here>
def do_stuff(self):
while True:
self.q.get()
self.q.task_done()
对我来说,类似的方法有效: 使用调度程序,您可以 运行 间隔一段时间尝试重新连接的方法。请注意,使用参数 stall_warnings=True 可以大大减少错误。 Twitter API 检测到多次重新连接尝试并限制您的速率,因此重新连接的最佳机会是实施退避策略。使用下面的方法,处理各种断开连接,因此如果只需要检测特定错误,则需要修改。如果在同一个客户端 class 中使用了多个流,则必须相应地调整调度程序和 ID。此外,您必须将自己的 MyStreamListener(tweepy.StreamListener) class 实现包含到解决方案中。
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import logging
import asyncio
class MyClient():
def __init__():
self.scheduler = AsyncIOScheduler(event_loop=asyncio.get_event_loop())
self.stream = None
self.stream_listener = None
self.userIDToFollow = 123456789 # insert here twitter id of the user
logging.basicConfig(level=logging.INFO)
def start_stream(self):
try:
self.stream_listener = MyStreamListener()
self.stream = tweepy.Stream(auth=self.api.auth, listener=self.stream_listener)
self.stream.filter(follow=[str(self.userIDToFollow)], is_async=True, stall_warnings=True)
self.scheduler.add_job(self.keep_stream_alive, 'interval', minutes=5, max_instances=1, replace_existing=True, id='keep_stream_alive')
logging.info("Stream started successfully!")
except Exception as e:
self.stop_stream() # for cleanup purposes
logging.error(e) # feel free to log more
def stop_stream(self):
try:
self.stream.disconnect()
self.stream_listener = None
self.scheduler.remove_job('keep_stream_alive')
logging.info("Stream stopped successfully!")
except Exception as e:
logging.error(e)
async def keep_stream_alive(self):
try:
if not self.stream.running:
for i in range(5, 100, 5): # basic backoff strategy, feel free to implement yours
logging.error("Stream disconnected!")
logging.info("Sleeping for " + str(i) + " seconds...")
time.sleep(i)
logging.info("Attempting to reconnect...")
self.start_stream()
if self.stream.running:
break
if self.stream.running is False:
# Reconnection not possible - Stopped trying
logging.error("Reconnection not possible - Stopped trying")
else:
logging.info("Stream is still running.")
except Exception as e:
logging.error(e)