Python 带有多任务处理的 Tweepy 流媒体

Python Tweepy streaming with multitasking

in Python 2.7 我使用以下代码成功收听帐户上的直接消息流:

from tweepy import Stream
from tweepy import OAuthHandler
from tweepy import API
from tweepy.streaming import StreamListener

# These values are appropriately filled in the code

consumer_key = '######'
consumer_secret = '######'
access_token = '######'
access_token_secret = '######'

class StdOutListener( StreamListener ):

    def __init__( self ):
        self.tweetCount = 0

    def on_connect( self ):
        print("Connection established!!")

    def on_disconnect( self, notice ):
        print("Connection lost!! : ", notice)

    def on_data( self, status ):
        print("Entered on_data()")
        print(status, flush = True)
        return True
        # I can add code here to execute when a message is received, such as slicing the message and activating something else

    def on_direct_message( self, status ):
        print("Entered on_direct_message()")
        try:
            print(status, flush = True)
            return True
        except BaseException as e:
            print("Failed on_direct_message()", str(e))

    def on_error( self, status ):
        print(status)

def main():

    try:
        auth = OAuthHandler(consumer_key, consumer_secret)
        auth.secure = True
        auth.set_access_token(access_token, access_token_secret)

        api = API(auth)

        # If the authentication was successful, you should
        # see the name of the account print out
        print(api.me().name)

        stream = Stream(auth, StdOutListener())

        stream.userstream()

    except BaseException as e:
        print("Error in main()", e)

if __name__ == '__main__':
    main()

这很好,我也可以在收到消息时执行代码,但是我添加到工作队列中的作业需要能够在一定时间后停止。我正在使用流行的 start = time.time() 并减去当前时间来确定经过的时间,但此流式处理代码不会循环检查时间。我只是在等待一条新消息,所以可以说从来没有检查过时钟。

我的问题是:如何进行流式传输并仍然跟踪流逝的时间?我是否需要使用本文所述的多线程? http://www.tutorialspoint.com/python/python_multithreading.htm

我是 Python 的新手,玩 Raspberry Pi 上的硬件玩得很开心。我从 Whosebug 学到了很多东西,谢谢大家:)

我不确定您想如何决定何时停止,但您可以将 timeout argument 传递给流以在一定延迟后放弃。

stream = Stream(auth, StdOutListener(), timeout=30)

这将调用您的听众的 on_timeout() method。如果您 return 为真,它将继续流式传输。否则,它将停止。

在流的超时参数和您的侦听器的 on_timeout() 之间,您应该能够决定何时停止流。

我发现我能够按照我想要的方式获得一些多线程代码。 Unlike this tutorial from Tutorialspoint 给出了启动具有不同时间参数的相同代码的多个实例的示例,我能够在它们自己的实例中将两个不同的代码块 运行

一段代码不断地给一个全局变量(var)加10。 另一个块检查 5 秒后,然后打印 var 的值。

这演示了 2 个不同的任务使用 Python 多线程执行和共享数据。

见下方代码

import threading
import time

exitFlag = 0
var = 10

class myThread1 (threading.Thread):
    def __init__(self, threadID, name, counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter
    def run(self):
        #var counting block begins here
        print "addemup starting"
        global var
        while (var < 100000):
            if var > 90000:
                var = 0
            var = var + 10



class myThread2 (threading.Thread):
    def __init__(self, threadID, name, counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter
    def run(self):
        #time checking block begins here and prints var every 5 secs
        print "checkem starting"
        global var
        start = time.time()
        elapsed = time.time() - start
        while (elapsed < 10):
            elapsed = time.time() - start
            if elapsed > 5:
                print "var = ", var
                start = time.time()
                elapsed = time.time() - start

# Create new threads
thread1 = myThread1(1, "Thread-1", 1)
thread2 = myThread2(2, "Thread-2", 2)

# Start new Threads
thread1.start()
thread2.start()

print "Exiting Main Thread"

我的下一个任务是将我的推特流分解到它自己的线程中,并将作为变量接收的直接消息传递给任务排队程序,同时希望第一个线程继续监听更多直接消息。