当检测到重复值 Flask SQLAlchemy 时停止插入 Table
Stop Inserting into Table when Duplicate Value Detected Flask SQLAlchemy
我能够从推文中获取数据并将它们存储到 MySQL table 中。
但问题是有时一条推文具有相同值的重复推文。
我想知道当使用 Flask-SQLAlchemy 检测到重复值时是否可以停止插入 table。
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
from flask_sqlalchemy import SQLAlchemy
from models import TrainingTweets, db
import mysql.connector
import json
import tweepy
from tweepy.api import API
#consumer key, consumer secret, access token, access secret.
ckey=""
csecret=""
atoken=""
asecret=""
auth = OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)
api = tweepy.API(auth)
class listener(StreamListener):
def __init__(self, api=None):
self.api = api or API()
self.n = 0
self.m = 50
def on_data(self, data):
all_data = json.loads(data)
self.n = self.n+1
if self.n <= self.m:
tweet = all_data["text"]
username = all_data["user"]["screen_name"]
label = "1"
ttweets = TrainingTweets(label_id=label, tweet_username=username, tweet=tweet)
db.session.add(ttweets)
checkedtweet = TrainingTweets.query.filter(ttweets.tweet).all()
if not checkedtweet:
db.session.commit()
print((username, tweet))
return True
else:
print("Duplicate entry detected!")
return False
else:
print("Successfully stored ", self.m, " tweets into database")
return False
def on_error(self, status):
print(status)
auth = OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)
twitterStream = Stream(auth, listener())
twitterStream.filter(track=["health"], languages=["en"], follow="")
这是我的 model.py:
class TrainingTweets(db.Model):
tweet_id = db.Column(db.Integer, primary_key=True)
tweet_username = db.Column(db.String(50))
tweet = db.Column(db.String(191))
slug = db.Column(db.String(191), unique=False)
created_date = db.Column(db.DateTime, default=datetime.datetime.now)
label_id = db.Column(db.Integer, db.ForeignKey('label.label_id'))
def __init__(self, *args, **kwargs):
super(TrainingTweets, self).__init__(*args, **kwargs) # Call parent constructor.
self.generate_slug()
def generate_slug(self):
self.slug = ''
if self.tweet:
self.slug = slugify(self.tweet)
您的模型应该具有针对某些标准的唯一索引,以删除重复项。 Column
s 默认情况下不是唯一的,您似乎假设(unique=False
在列和注释中)。您应该使用一些 "natural" 键(例如 Twitter 提供的 ID)代替自动递增代理键,或者使文本列 tweet
唯一。
当您修复了唯一性要求并且如果您希望忽略 IntegrityError
s 并继续前进,请将您的插入包装在事务中(或使用隐式行为)并相应地提交或回滚:
from sqlalchemy.exc import IntegrityError
class listener(StreamListener):
def on_data(self, data):
all_data = json.loads(data)
tweet_id = all_data["id_str"]
tweet_text = all_data["text"]
tweet_username = all_data["user"]["screen_name"]
label = 1
ttweets = TrainingTweets(label_id=label,
tweet_username=tweet_username,
tweet=tweet_text)
try:
db.session.add(ttweets)
db.session.commit()
print((username, tweet))
# Increment the counter here, as we've truly successfully
# stored a tweet.
self.n += 1
except IntegrityError:
db.session.rollback()
# Don't stop the stream, just ignore the duplicate.
print("Duplicate entry detected!")
if self.n >= self.m:
print("Successfully stored", self.m, "tweets into database")
# Cross the... stop the stream.
return False
else:
# Keep the stream going.
return True
我能够从推文中获取数据并将它们存储到 MySQL table 中。 但问题是有时一条推文具有相同值的重复推文。 我想知道当使用 Flask-SQLAlchemy 检测到重复值时是否可以停止插入 table。
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
from flask_sqlalchemy import SQLAlchemy
from models import TrainingTweets, db
import mysql.connector
import json
import tweepy
from tweepy.api import API
#consumer key, consumer secret, access token, access secret.
ckey=""
csecret=""
atoken=""
asecret=""
auth = OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)
api = tweepy.API(auth)
class listener(StreamListener):
def __init__(self, api=None):
self.api = api or API()
self.n = 0
self.m = 50
def on_data(self, data):
all_data = json.loads(data)
self.n = self.n+1
if self.n <= self.m:
tweet = all_data["text"]
username = all_data["user"]["screen_name"]
label = "1"
ttweets = TrainingTweets(label_id=label, tweet_username=username, tweet=tweet)
db.session.add(ttweets)
checkedtweet = TrainingTweets.query.filter(ttweets.tweet).all()
if not checkedtweet:
db.session.commit()
print((username, tweet))
return True
else:
print("Duplicate entry detected!")
return False
else:
print("Successfully stored ", self.m, " tweets into database")
return False
def on_error(self, status):
print(status)
auth = OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)
twitterStream = Stream(auth, listener())
twitterStream.filter(track=["health"], languages=["en"], follow="")
这是我的 model.py:
class TrainingTweets(db.Model):
tweet_id = db.Column(db.Integer, primary_key=True)
tweet_username = db.Column(db.String(50))
tweet = db.Column(db.String(191))
slug = db.Column(db.String(191), unique=False)
created_date = db.Column(db.DateTime, default=datetime.datetime.now)
label_id = db.Column(db.Integer, db.ForeignKey('label.label_id'))
def __init__(self, *args, **kwargs):
super(TrainingTweets, self).__init__(*args, **kwargs) # Call parent constructor.
self.generate_slug()
def generate_slug(self):
self.slug = ''
if self.tweet:
self.slug = slugify(self.tweet)
您的模型应该具有针对某些标准的唯一索引,以删除重复项。 Column
s 默认情况下不是唯一的,您似乎假设(unique=False
在列和注释中)。您应该使用一些 "natural" 键(例如 Twitter 提供的 ID)代替自动递增代理键,或者使文本列 tweet
唯一。
当您修复了唯一性要求并且如果您希望忽略 IntegrityError
s 并继续前进,请将您的插入包装在事务中(或使用隐式行为)并相应地提交或回滚:
from sqlalchemy.exc import IntegrityError
class listener(StreamListener):
def on_data(self, data):
all_data = json.loads(data)
tweet_id = all_data["id_str"]
tweet_text = all_data["text"]
tweet_username = all_data["user"]["screen_name"]
label = 1
ttweets = TrainingTweets(label_id=label,
tweet_username=tweet_username,
tweet=tweet_text)
try:
db.session.add(ttweets)
db.session.commit()
print((username, tweet))
# Increment the counter here, as we've truly successfully
# stored a tweet.
self.n += 1
except IntegrityError:
db.session.rollback()
# Don't stop the stream, just ignore the duplicate.
print("Duplicate entry detected!")
if self.n >= self.m:
print("Successfully stored", self.m, "tweets into database")
# Cross the... stop the stream.
return False
else:
# Keep the stream going.
return True