将(新)文档从 Dataframe 添加到 MongoDB 的最有效方法是什么?

What's the most efficient way to add (new) documents from a Dataframe to MongoDB?

在这个用例中,我尝试使用 pymongo 将文档添加到 MongoDB collection,这些文档是根据日期(不是日期时间)、标题和数据框格式的文章摘要(日期是数据框的索引)。

当我将数据框存储到数据库时,它们以 _id、日期、标题、摘要的模式存储,这很好。

所以我要做的是仅上传数据框中尚未作为文档存储在 collection 中的那些行。我试过几种方法:

  1. 获取数据库中的最后一个文档,与数据框进行比较。创建一个新的 DF,它排除所有以前的行 + 与之比较的行。这应该可行,但是,它仍在上传大约 20% 的先前存储的行,我不知道为什么。

  2. 存储整个数据帧,然后聚合 collection 并删除重复项:理论上听起来不错,但是所有这样做的例子都是在 JS 而不是 python ,所以我没能让它发挥作用。

  3. 创建标题的唯一索引:同样,这在理论上应该可行,但我还没有实现。

我不想做的一件事是查询整个 collection 并存储为 DF,连接它们,删除重复项,删除 collection 和 re-create 它来自新的 DF。现在这不是问题,因为我正在处理 30 个左右的文档,但是当我要处理多个 collection 和数百万个文档时,嗯..根本不是很有效。

有人对我可以研究/研究/代码示例有任何建议吗?

这是我现在使用的代码:

下载 RSS 源

def getSymbolNews(self, symbol):
    self.symbol = symbol
    self.dbName = 'db_' + self.symbol
    self.columnName = 'col_News'
    self.topics = ['$' + self.symbol]
    self.sa = getNews().parseNews(fn.SeekingAlpha(topics = self.topics))
    self.yfin = getNews().parseNews(fn.Yahoo(topics = self.topics))
    self.wb_news = getNews().getWebullNews(self.symbol)        
    self.df = pd.concat([self.sa, self.yfin, self.wb_news], axis = 0, ignore_index = False)
    self.df.drop_duplicates(inplace = True)
    self.df.sort_index(ascending = True, inplace = True)
    del self.symbol, self.topics, self.sa, self.yfin, self.wb_news
    getNews().uploadRecords(self.dbName, self.columnName, self.df)
    return self.df

上传到Collection:

def uploadRecords(self, dbName, columnName, data):
    self.data = data
    self.dbName = dbName
    self.columnName = columnName
    self.data.reset_index(inplace=True)
    self.data.rename(columns={'index': 'Date'}, inplace = True)
    mongoFunctions.insertRecords(self.dbName, self.columnName, self.data)
    del self.data
    gc.collect()
    return

要上传的 PyMongo 函数:

def insertRecords(dbName: str, collectionName: str, data: object):
    """Inserts a pandas dataframe object into a MongoDB collection (table)

    Args:
        dbName (str): Database name
        collectionName (str): Collection name
        data (object): Pandas dataframe object
    """

    collection = getCollection(dbName, collectionName)
    query = queryAllRecords(dbName, collectionName)

    if query.shape == (0, 0):
        record = data.to_dict(orient="records")
        collection.insert(record)
    else:
        query.drop(["_id"], axis=1, inplace=True)
        if query.equals(data):
            return
        else:
            df_temp = pd.concat([query, data]).drop_duplicates(keep=False)
            records = df_temp.to_dict(orient="records")
            collection.insert_many(records)

    return

我想获取文档的 md5 哈希并将其存储为 _id;然后你可以使用 insert_many()ordered=False 来插入任何不重复的项目;您可以 运行 随时添加,并且只会添加新项目;请记住,如果 any 字段稍作更改,则会添加一个新项目;如果这不是您想要的行为,请调整您传递给 md5().

的内容

代码最终变得相当简单:

from pymongo import MongoClient
from pymongo.errors import BulkWriteError
import feedparser
from hashlib import md5
from json import dumps

db = MongoClient()['mydatabase']

entries = feedparser.parse("http://feeds.bbci.co.uk/news/world/rss.xml")['entries']

for item in entries:
    item['_id'] = md5(dumps(item).encode("utf-8")).hexdigest()

try:
    db.news.insert_many(entries, ordered=False)
except BulkWriteError:
    pass