将(新)文档从 Dataframe 添加到 MongoDB 的最有效方法是什么?
What's the most efficient way to add (new) documents from a Dataframe to MongoDB?
在这个用例中,我尝试使用 pymongo 将文档添加到 MongoDB collection,这些文档是根据日期(不是日期时间)、标题和数据框格式的文章摘要(日期是数据框的索引)。
当我将数据框存储到数据库时,它们以 _id、日期、标题、摘要的模式存储,这很好。
所以我要做的是仅上传数据框中尚未作为文档存储在 collection 中的那些行。我试过几种方法:
获取数据库中的最后一个文档,与数据框进行比较。创建一个新的 DF,它排除所有以前的行 + 与之比较的行。这应该可行,但是,它仍在上传大约 20% 的先前存储的行,我不知道为什么。
存储整个数据帧,然后聚合 collection 并删除重复项:理论上听起来不错,但是所有这样做的例子都是在 JS 而不是 python ,所以我没能让它发挥作用。
创建标题的唯一索引:同样,这在理论上应该可行,但我还没有实现。
我不想做的一件事是查询整个 collection 并存储为 DF,连接它们,删除重复项,删除 collection 和 re-create 它来自新的 DF。现在这不是问题,因为我正在处理 30 个左右的文档,但是当我要处理多个 collection 和数百万个文档时,嗯..根本不是很有效。
有人对我可以研究/研究/代码示例有任何建议吗?
这是我现在使用的代码:
下载 RSS 源
def getSymbolNews(self, symbol):
self.symbol = symbol
self.dbName = 'db_' + self.symbol
self.columnName = 'col_News'
self.topics = ['$' + self.symbol]
self.sa = getNews().parseNews(fn.SeekingAlpha(topics = self.topics))
self.yfin = getNews().parseNews(fn.Yahoo(topics = self.topics))
self.wb_news = getNews().getWebullNews(self.symbol)
self.df = pd.concat([self.sa, self.yfin, self.wb_news], axis = 0, ignore_index = False)
self.df.drop_duplicates(inplace = True)
self.df.sort_index(ascending = True, inplace = True)
del self.symbol, self.topics, self.sa, self.yfin, self.wb_news
getNews().uploadRecords(self.dbName, self.columnName, self.df)
return self.df
上传到Collection:
def uploadRecords(self, dbName, columnName, data):
self.data = data
self.dbName = dbName
self.columnName = columnName
self.data.reset_index(inplace=True)
self.data.rename(columns={'index': 'Date'}, inplace = True)
mongoFunctions.insertRecords(self.dbName, self.columnName, self.data)
del self.data
gc.collect()
return
要上传的 PyMongo 函数:
def insertRecords(dbName: str, collectionName: str, data: object):
"""Inserts a pandas dataframe object into a MongoDB collection (table)
Args:
dbName (str): Database name
collectionName (str): Collection name
data (object): Pandas dataframe object
"""
collection = getCollection(dbName, collectionName)
query = queryAllRecords(dbName, collectionName)
if query.shape == (0, 0):
record = data.to_dict(orient="records")
collection.insert(record)
else:
query.drop(["_id"], axis=1, inplace=True)
if query.equals(data):
return
else:
df_temp = pd.concat([query, data]).drop_duplicates(keep=False)
records = df_temp.to_dict(orient="records")
collection.insert_many(records)
return
我想获取文档的 md5
哈希并将其存储为 _id
;然后你可以使用 insert_many()
和 ordered=False
来插入任何不重复的项目;您可以 运行 随时添加,并且只会添加新项目;请记住,如果 any 字段稍作更改,则会添加一个新项目;如果这不是您想要的行为,请调整您传递给 md5()
.
的内容
代码最终变得相当简单:
from pymongo import MongoClient
from pymongo.errors import BulkWriteError
import feedparser
from hashlib import md5
from json import dumps
db = MongoClient()['mydatabase']
entries = feedparser.parse("http://feeds.bbci.co.uk/news/world/rss.xml")['entries']
for item in entries:
item['_id'] = md5(dumps(item).encode("utf-8")).hexdigest()
try:
db.news.insert_many(entries, ordered=False)
except BulkWriteError:
pass
在这个用例中,我尝试使用 pymongo 将文档添加到 MongoDB collection,这些文档是根据日期(不是日期时间)、标题和数据框格式的文章摘要(日期是数据框的索引)。
当我将数据框存储到数据库时,它们以 _id、日期、标题、摘要的模式存储,这很好。
所以我要做的是仅上传数据框中尚未作为文档存储在 collection 中的那些行。我试过几种方法:
获取数据库中的最后一个文档,与数据框进行比较。创建一个新的 DF,它排除所有以前的行 + 与之比较的行。这应该可行,但是,它仍在上传大约 20% 的先前存储的行,我不知道为什么。
存储整个数据帧,然后聚合 collection 并删除重复项:理论上听起来不错,但是所有这样做的例子都是在 JS 而不是 python ,所以我没能让它发挥作用。
创建标题的唯一索引:同样,这在理论上应该可行,但我还没有实现。
我不想做的一件事是查询整个 collection 并存储为 DF,连接它们,删除重复项,删除 collection 和 re-create 它来自新的 DF。现在这不是问题,因为我正在处理 30 个左右的文档,但是当我要处理多个 collection 和数百万个文档时,嗯..根本不是很有效。
有人对我可以研究/研究/代码示例有任何建议吗?
这是我现在使用的代码:
下载 RSS 源
def getSymbolNews(self, symbol):
self.symbol = symbol
self.dbName = 'db_' + self.symbol
self.columnName = 'col_News'
self.topics = ['$' + self.symbol]
self.sa = getNews().parseNews(fn.SeekingAlpha(topics = self.topics))
self.yfin = getNews().parseNews(fn.Yahoo(topics = self.topics))
self.wb_news = getNews().getWebullNews(self.symbol)
self.df = pd.concat([self.sa, self.yfin, self.wb_news], axis = 0, ignore_index = False)
self.df.drop_duplicates(inplace = True)
self.df.sort_index(ascending = True, inplace = True)
del self.symbol, self.topics, self.sa, self.yfin, self.wb_news
getNews().uploadRecords(self.dbName, self.columnName, self.df)
return self.df
上传到Collection:
def uploadRecords(self, dbName, columnName, data):
self.data = data
self.dbName = dbName
self.columnName = columnName
self.data.reset_index(inplace=True)
self.data.rename(columns={'index': 'Date'}, inplace = True)
mongoFunctions.insertRecords(self.dbName, self.columnName, self.data)
del self.data
gc.collect()
return
要上传的 PyMongo 函数:
def insertRecords(dbName: str, collectionName: str, data: object):
"""Inserts a pandas dataframe object into a MongoDB collection (table)
Args:
dbName (str): Database name
collectionName (str): Collection name
data (object): Pandas dataframe object
"""
collection = getCollection(dbName, collectionName)
query = queryAllRecords(dbName, collectionName)
if query.shape == (0, 0):
record = data.to_dict(orient="records")
collection.insert(record)
else:
query.drop(["_id"], axis=1, inplace=True)
if query.equals(data):
return
else:
df_temp = pd.concat([query, data]).drop_duplicates(keep=False)
records = df_temp.to_dict(orient="records")
collection.insert_many(records)
return
我想获取文档的 md5
哈希并将其存储为 _id
;然后你可以使用 insert_many()
和 ordered=False
来插入任何不重复的项目;您可以 运行 随时添加,并且只会添加新项目;请记住,如果 any 字段稍作更改,则会添加一个新项目;如果这不是您想要的行为,请调整您传递给 md5()
.
代码最终变得相当简单:
from pymongo import MongoClient
from pymongo.errors import BulkWriteError
import feedparser
from hashlib import md5
from json import dumps
db = MongoClient()['mydatabase']
entries = feedparser.parse("http://feeds.bbci.co.uk/news/world/rss.xml")['entries']
for item in entries:
item['_id'] = md5(dumps(item).encode("utf-8")).hexdigest()
try:
db.news.insert_many(entries, ordered=False)
except BulkWriteError:
pass