Python 带有多任务处理的 Tweepy 流媒体
Python Tweepy streaming with multitasking
in Python 2.7 我使用以下代码成功收听帐户上的直接消息流:
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy import API
from tweepy.streaming import StreamListener
# These values are appropriately filled in the code
consumer_key = '######'
consumer_secret = '######'
access_token = '######'
access_token_secret = '######'
class StdOutListener( StreamListener ):
def __init__( self ):
self.tweetCount = 0
def on_connect( self ):
print("Connection established!!")
def on_disconnect( self, notice ):
print("Connection lost!! : ", notice)
def on_data( self, status ):
print("Entered on_data()")
print(status, flush = True)
return True
# I can add code here to execute when a message is received, such as slicing the message and activating something else
def on_direct_message( self, status ):
print("Entered on_direct_message()")
try:
print(status, flush = True)
return True
except BaseException as e:
print("Failed on_direct_message()", str(e))
def on_error( self, status ):
print(status)
def main():
try:
auth = OAuthHandler(consumer_key, consumer_secret)
auth.secure = True
auth.set_access_token(access_token, access_token_secret)
api = API(auth)
# If the authentication was successful, you should
# see the name of the account print out
print(api.me().name)
stream = Stream(auth, StdOutListener())
stream.userstream()
except BaseException as e:
print("Error in main()", e)
if __name__ == '__main__':
main()
这很好,我也可以在收到消息时执行代码,但是我添加到工作队列中的作业需要能够在一定时间后停止。我正在使用流行的 start = time.time() 并减去当前时间来确定经过的时间,但此流式处理代码不会循环检查时间。我只是在等待一条新消息,所以可以说从来没有检查过时钟。
我的问题是:如何进行流式传输并仍然跟踪流逝的时间?我是否需要使用本文所述的多线程? http://www.tutorialspoint.com/python/python_multithreading.htm
我是 Python 的新手,玩 Raspberry Pi 上的硬件玩得很开心。我从 Whosebug 学到了很多东西,谢谢大家:)
我不确定您想如何决定何时停止,但您可以将 timeout argument 传递给流以在一定延迟后放弃。
stream = Stream(auth, StdOutListener(), timeout=30)
这将调用您的听众的 on_timeout()
method。如果您 return 为真,它将继续流式传输。否则,它将停止。
在流的超时参数和您的侦听器的 on_timeout()
之间,您应该能够决定何时停止流。
我发现我能够按照我想要的方式获得一些多线程代码。 Unlike this tutorial from Tutorialspoint 给出了启动具有不同时间参数的相同代码的多个实例的示例,我能够在它们自己的实例中将两个不同的代码块 运行
一段代码不断地给一个全局变量(var)加10。
另一个块检查 5 秒后,然后打印 var 的值。
这演示了 2 个不同的任务使用 Python 多线程执行和共享数据。
见下方代码
import threading
import time
exitFlag = 0
var = 10
class myThread1 (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
#var counting block begins here
print "addemup starting"
global var
while (var < 100000):
if var > 90000:
var = 0
var = var + 10
class myThread2 (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
#time checking block begins here and prints var every 5 secs
print "checkem starting"
global var
start = time.time()
elapsed = time.time() - start
while (elapsed < 10):
elapsed = time.time() - start
if elapsed > 5:
print "var = ", var
start = time.time()
elapsed = time.time() - start
# Create new threads
thread1 = myThread1(1, "Thread-1", 1)
thread2 = myThread2(2, "Thread-2", 2)
# Start new Threads
thread1.start()
thread2.start()
print "Exiting Main Thread"
我的下一个任务是将我的推特流分解到它自己的线程中,并将作为变量接收的直接消息传递给任务排队程序,同时希望第一个线程继续监听更多直接消息。
in Python 2.7 我使用以下代码成功收听帐户上的直接消息流:
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy import API
from tweepy.streaming import StreamListener
# These values are appropriately filled in the code
consumer_key = '######'
consumer_secret = '######'
access_token = '######'
access_token_secret = '######'
class StdOutListener( StreamListener ):
def __init__( self ):
self.tweetCount = 0
def on_connect( self ):
print("Connection established!!")
def on_disconnect( self, notice ):
print("Connection lost!! : ", notice)
def on_data( self, status ):
print("Entered on_data()")
print(status, flush = True)
return True
# I can add code here to execute when a message is received, such as slicing the message and activating something else
def on_direct_message( self, status ):
print("Entered on_direct_message()")
try:
print(status, flush = True)
return True
except BaseException as e:
print("Failed on_direct_message()", str(e))
def on_error( self, status ):
print(status)
def main():
try:
auth = OAuthHandler(consumer_key, consumer_secret)
auth.secure = True
auth.set_access_token(access_token, access_token_secret)
api = API(auth)
# If the authentication was successful, you should
# see the name of the account print out
print(api.me().name)
stream = Stream(auth, StdOutListener())
stream.userstream()
except BaseException as e:
print("Error in main()", e)
if __name__ == '__main__':
main()
这很好,我也可以在收到消息时执行代码,但是我添加到工作队列中的作业需要能够在一定时间后停止。我正在使用流行的 start = time.time() 并减去当前时间来确定经过的时间,但此流式处理代码不会循环检查时间。我只是在等待一条新消息,所以可以说从来没有检查过时钟。
我的问题是:如何进行流式传输并仍然跟踪流逝的时间?我是否需要使用本文所述的多线程? http://www.tutorialspoint.com/python/python_multithreading.htm
我是 Python 的新手,玩 Raspberry Pi 上的硬件玩得很开心。我从 Whosebug 学到了很多东西,谢谢大家:)
我不确定您想如何决定何时停止,但您可以将 timeout argument 传递给流以在一定延迟后放弃。
stream = Stream(auth, StdOutListener(), timeout=30)
这将调用您的听众的 on_timeout()
method。如果您 return 为真,它将继续流式传输。否则,它将停止。
在流的超时参数和您的侦听器的 on_timeout()
之间,您应该能够决定何时停止流。
我发现我能够按照我想要的方式获得一些多线程代码。 Unlike this tutorial from Tutorialspoint 给出了启动具有不同时间参数的相同代码的多个实例的示例,我能够在它们自己的实例中将两个不同的代码块 运行
一段代码不断地给一个全局变量(var)加10。 另一个块检查 5 秒后,然后打印 var 的值。
这演示了 2 个不同的任务使用 Python 多线程执行和共享数据。
见下方代码
import threading
import time
exitFlag = 0
var = 10
class myThread1 (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
#var counting block begins here
print "addemup starting"
global var
while (var < 100000):
if var > 90000:
var = 0
var = var + 10
class myThread2 (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
#time checking block begins here and prints var every 5 secs
print "checkem starting"
global var
start = time.time()
elapsed = time.time() - start
while (elapsed < 10):
elapsed = time.time() - start
if elapsed > 5:
print "var = ", var
start = time.time()
elapsed = time.time() - start
# Create new threads
thread1 = myThread1(1, "Thread-1", 1)
thread2 = myThread2(2, "Thread-2", 2)
# Start new Threads
thread1.start()
thread2.start()
print "Exiting Main Thread"
我的下一个任务是将我的推特流分解到它自己的线程中,并将作为变量接收的直接消息传递给任务排队程序,同时希望第一个线程继续监听更多直接消息。