使用 Python Tweepy 从 Twitter API 中挖掘具有特定主题标签的 7 天推文

Mining 7 days worth of Tweets from Twitter API with a certain hashtag using Python Tweepy

我正在使用 Python、Tweepy、Django、Celery、Django REST 框架从 Twitter API 挖掘 7 天的推文。

我使用 celery beat 每分钟发送一个请求,并使用 Django ORM 将收集到的数据存储到 Postgresql 数据库中。

为了确保 api 不会在每次调用时都发送相同的 100 条推文,我正在检查数据库中的 min(tweet.id),并将其设置为 max_id 参数在每个新请求之前。

我 运行 遇到了一个问题:一旦我获得了 7 天的推文,我该如何重置这个 max_id

models.py

class Tweet(models.Model):
    tweet_id = models.CharField(
        max_length=200,
        unique=True,
        primary_key=True
    )
    tweet_date = models.DateTimeField()
    tweet_source = models.TextField()
    tweet_favorite_cnt = models.CharField(max_length=200)
    tweet_retweet_cnt = models.CharField(max_length=200)
    tweet_text = models.TextField()

    def __str__(self):
        return self.tweet_id + '  |  ' + str(self.tweet_date)

tasks.py

auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)

# Instantiate an instance of the API class from the tweepy library.
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)


@shared_task(name='cleanup')
def cleanup():
    """
    Check database for records older than 7 days.
    Delete them if they exist.
    """
    Tweet.objects.filter(tweet_date__lte=datetime.now() - timedelta(days=7)).delete()


@shared_task(name='get_tweets')
def get_tweets():
    """Get some tweets from the twitter api and store them to the db."""

    # Subtasks
    chain = cleanup.s()
    chain()

    # Check for the minimum tweet_id and set it as max_id.
    # This ensures the API call doesn't keep getting the same tweets.
    max_id = min([tweet.tweet_id for tweet in Tweet.objects.all()])

    # Make the call to the Twitter Search API.
    tweets = api.search(
        q='#python',
        max_id=max_id,
        count=100
    )

    # Store the collected data into lists.
    tweets_date = [tweet.created_at for tweet in tweets]
    tweets_id = [tweet.id for tweet in tweets]
    tweets_source = [tweet.source for tweet in tweets]
    tweets_favorite_cnt = [tweet.favorite_count for tweet in tweets]
    tweets_retweet_cnt = [tweet.retweet_count for tweet in tweets]
    tweets_text = [tweet.text for tweet in tweets]

    # Iterate over these lists and save the items as fields for new records in the database.
    for i, j, k, l, m, n in zip(
            tweets_id,
            tweets_date,
            tweets_source,
            tweets_favorite_cnt,
            tweets_retweet_cnt,
            tweets_text
    ):
        try:
            Tweet.objects.create(
                tweet_id=i,
                tweet_date=j,
                tweet_source=k,
                tweet_favorite_cnt=l,
                tweet_retweet_cnt=m,
                tweet_text=n,
            )
        except IntegrityError:
            pass

试试这个:

# Check for the minimum tweet_id and set it as max_id.
# This ensures the API call doesn't keep getting the same tweets.

date_partition = get_seven_day_partition
## Since you're cutting off every seven days, you should know how
## to separate your weeks into seven day sections

max_id = min([tweet.tweet_id for tweet in Tweet.objects.all()
              if tweet.tweet_date > date_partition])

你没有指定足够的信息来说明你是如何提取这些推文的,以及你是如何知道在某个特定日期停止(以及这个程序的执行)的,所以很难建议一个正确的方法来跟踪日期。

我可以告诉你的是,根据你的用例相应地设置 date_partitionmax_id 分配的这个添加将正确地获取最早 7 天期间的最大天数