如何提高插入的写入速度,pymongo?

How to increase write speed on inserts, pymongo?

我有以下代码将文档插入 MongoDB,问题是它很慢,因为我无法对其进行多处理器处理,并且考虑到我必须检查插入的每个文档是否已经存在或不是 我相信使用批量插入是不可能的。我想知道是否有更快的方法来解决这个问题。在对下面进行分析后,我发现 check record()update_upstream() 是两个非常耗时的函数。因此优化它们会提高整体速度。将不胜感激任何关于如何优化以下内容的意见。谢谢!


import os
import pymongo


from directory import Directory
from pymongo import ASCENDING
from pymongo import DESCENDING
from pymongo import MongoClient
from storage_config import StorageConfig
from tqdm import tqdm

dir = Directory()

def DB_collections(collection_type):
    types = {'p': 'player_stats',
             't': 'team_standings',
             'f': 'fixture_stats',
             'l': 'league_standings',
             'pf': 'fixture_players_stats'}
    return types.get(collection_type)



class DB():

    def __init__(self, league, season, func=None):
        self.db_user = os.environ.get('DB_user')
        self.db_pass = os.environ.get('DB_pass')
        self.MONGODB_URL = f'mongodb+srv://{self.db_user}:{self.db_pass}@cluster0-mbqxj.mongodb.net/<dbname>?retryWrites=true&w=majority'
        self.league = league
        self.season = str(season)
        self.client = MongoClient(self.MONGODB_URL)
        self.DATABASE = self.client[self.league + self.season]


        self.pool = multiprocessing.cpu_count()
        self.playerfile = f'{self.league}_{self.season}_playerstats.json'
        self.teamfile = f'{self.league}_{self.season}_team_standings.json'
        self.fixturefile = f'{self.league}_{self.season}_fixturestats.json'
        self.leaguefile = f'{self.league}_{self.season}_league_standings.json'
        self.player_fixture = f'{self.league}_{self.season}_player_fixture.json'
        self.func = func

    def execute(self):
        if self.func is not None:
            return self.func(self)


def import_json(file):
    """Imports a json file in read mode
        Args:
            file(str): Name of file
    """
    return dir.load_json(file , StorageConfig.DB_DIR)

def load_file(file):
    try:
        loaded_file = import_json(file)
        return loaded_file
    except FileNotFoundError:
        print("Please check that", file, "exists")

def check_record(collection, index_dict):
    """Check if record exists in collection
        Args:
            index_dict (dict): key, value
    """
    return collection.find_one(index_dict)

def collection_index(collection, index, *args):
    """Checks if index exists for collection, 
    and return a new index if not

        Args:
            collection (str): Name of collection in database
            index (str): Dict key to be used as an index
            args (str): Additional dict keys to create compound indexs
    """
    compound_index = tuple((arg, ASCENDING) for arg in args)
    if index not in collection.index_information():
        return collection.create_index([(index, DESCENDING), *compound_index], unique=True)

def push_upstream(collection, record):
    """Update record in collection
        Args:
            collection (str): Name of collection in database
            record_id (str): record _id to be put for record in collection
            record (dict): Data to be pushed in collection
    """
    return collection.insert_one(record)

def update_upstream(collection, index_dict, record):
    """Update record in collection
        Args:
            collection (str): Name of collection in database
            index_dict (dict): key, value
            record (dict): Data to be updated in collection
    """
    return collection.update_one(index_dict, {"$set": record}, upsert=True)

def executePushPlayer(db):

    playerstats = load_file(db.playerfile)
    collection_name = DB_collections('p')
    collection = db.DATABASE[collection_name]
    collection_index(collection, 'p_id')
    for player in tqdm(playerstats):
        existingPost = check_record(collection, {'p_id': player['p_id']})
        if existingPost:
            update_upstream(collection, {'p_id': player['p_id']}, player)
        else:
            push_upstream(collection, player)

if __name__ == '__main__':
    db = DB('EN_PR', '2019')
        executePushPlayer(db)

您可以使用 upsert=True 将 check/insert/update 逻辑合并到单个 update_one() 命令中,然后使用类似以下内容的批量运算符:

updates = []

for player in tqdm(playerstats):
    updates.append(UpdateOne({'p_id': player['p_id']}, player, upsert=True))

collection.bulk_write(updates)

最后,请在 MongoDB shell:

处使用以下命令检查您的索引是否被使用
db.mycollection.aggregate([{ $indexStats: {} }])

并查看 accesses.ops 指标。