当检测到重复值 Flask SQLAlchemy 时停止插入 Table

Stop Inserting into Table when Duplicate Value Detected Flask SQLAlchemy

我能够从推文中获取数据并将它们存储到 MySQL table 中。 但问题是有时一条推文具有相同值的重复推文。 我想知道当使用 Flask-SQLAlchemy 检测到重复值时是否可以停止插入 table。

from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
from flask_sqlalchemy import SQLAlchemy
from models import TrainingTweets, db
import mysql.connector
import json
import tweepy
from tweepy.api import API

#consumer key, consumer secret, access token, access secret.
ckey=""
csecret=""
atoken=""
asecret=""

auth = OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)

api = tweepy.API(auth)


class listener(StreamListener):

    def __init__(self, api=None):
        self.api = api or API()
        self.n = 0
        self.m = 50

    def on_data(self, data):
        all_data = json.loads(data)
        self.n = self.n+1
        if self.n <= self.m:
            tweet = all_data["text"]
            username = all_data["user"]["screen_name"]
            label = "1"
            ttweets = TrainingTweets(label_id=label, tweet_username=username, tweet=tweet)
            db.session.add(ttweets)
            checkedtweet = TrainingTweets.query.filter(ttweets.tweet).all()
            if not checkedtweet:
                db.session.commit()
                print((username, tweet))
                return True
            else:
                print("Duplicate entry detected!")
                return False
        else:
            print("Successfully stored ", self.m, " tweets into database")
            return False

    def on_error(self, status):
        print(status)

auth = OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)

twitterStream = Stream(auth, listener())
twitterStream.filter(track=["health"], languages=["en"], follow="")

这是我的 model.py:

class TrainingTweets(db.Model):
    tweet_id = db.Column(db.Integer, primary_key=True)
    tweet_username = db.Column(db.String(50))
    tweet = db.Column(db.String(191))
    slug = db.Column(db.String(191), unique=False)
    created_date = db.Column(db.DateTime, default=datetime.datetime.now)
    label_id = db.Column(db.Integer, db.ForeignKey('label.label_id'))

    def __init__(self, *args, **kwargs):
        super(TrainingTweets, self).__init__(*args, **kwargs)  # Call parent constructor.
        self.generate_slug()

    def generate_slug(self):
        self.slug = ''
        if self.tweet:
            self.slug = slugify(self.tweet)

您的模型应该具有针对某些标准的唯一索引,以删除重复项。 Columns 默认情况下不是唯一的,您似乎假设(unique=False 在列和注释中)。您应该使用一些 "natural" 键(例如 Twitter 提供的 ID)代替自动递增代理键,或者使文本列 tweet 唯一。

当您修复了唯一性要求并且如果您希望忽略 IntegrityErrors 并继续前进,请将您的插入包装在事务中(或使用隐式行为)并相应地提交或回滚:

from sqlalchemy.exc import IntegrityError

class listener(StreamListener):

    def on_data(self, data):
        all_data = json.loads(data)
        tweet_id = all_data["id_str"]
        tweet_text = all_data["text"]
        tweet_username = all_data["user"]["screen_name"]
        label = 1
        ttweets = TrainingTweets(label_id=label,
                                 tweet_username=tweet_username,
                                 tweet=tweet_text)

        try:
            db.session.add(ttweets)
            db.session.commit()
            print((username, tweet))
            # Increment the counter here, as we've truly successfully
            # stored a tweet.
            self.n += 1

        except IntegrityError:
            db.session.rollback()
            # Don't stop the stream, just ignore the duplicate.
            print("Duplicate entry detected!")    

        if self.n >= self.m:
            print("Successfully stored", self.m, "tweets into database")
            # Cross the... stop the stream.
            return False
        else:
            # Keep the stream going.
            return True