无法 sqlalchemy.orm 在多线程期间使用 Session.execute() 将数据插入我的 postgres 数据库 table

Cannot get sqlalchemy.orm to insert data into my postgres database table using Session.execute() during multithreading

首先,我对使用sqlite以外的数据库完全陌生,我只用过一点,python也是我只用了大约6个月的东西,所以请耐心等待如果我错过了一些明显的东西或完全误解了某些东西。

我有很多我正在抓取的历史市场数据(三个区域约 15000 个项目)并且为了有效地做到这一点,我试图通过为每个区域使用一个进程然后对每个进程进行多线程处理来获得所有项目。我从每个项目的抓取中得到的响应是一个字典列表,然后我想使用 Session.execute() 插入它。我还没有让它工作(如果你知道一种方法,请也指导我正确的方向),所以现在我只使用多线程,因为我已经成功地使用它来将数据插入 regionid 和 typeid table秒。 仍然没有数据插入我的 historical_data table 并且当我 运行 我的脚本时没有错误。我尝试使用

启用 sqlalchemy 日志记录
import logging

logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)

这向我展示了我期望在 main 期间从 regionid 和 typeid tables 获取数据的调用,但在那之后什么都没有,这是否意味着我在多线程之后没有连接到数据库还是记录器不擅长处理多线程?

对于 regionid 和 typeid tables,我使用 Session.merge() 并使用 for 循环处理每个项目的数据,所以我猜这是我对 [=19 的使用=] 关闭了吗?

我尝试使用 sqlalchemy.orm 将我所有的历史数据插入到 postgres 数据库中 我用来尝试插入数据的实际脚本如下:

if __name__ == '__main__':
    print("Start database session")
    Base.metadata.create_all(engine)
    Session = scoped_session(session_factory)
    ini_params()
    print("Get typeids and regionids from database")
    typeids = get_typeids() #get all typeids from typeid table
    regionids = get_regionids() #get all regionids from regionid table
    typeids = typeid_test_list #uncomment for debug
    print(typeids)
    for idx, regionid in enumerate(regionids):
        no_data = data_to_db(regionid, typeids, idx)
        #no_data = multi_processing(regionid, typeids, idx)
    print(no_data)

def data_to_db(regionid, typeids, idx):
    ini_params()
    position = int(idx)
    no_data_typeids = []
    prefix = 'Getting data for region ' + str(regionid)
    typeid_length = len(typeids)
    with tqdm(typeids, total=typeid_length, desc=prefix, position=position) as pbar:
        with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
            futures = {executor.submit(multithread_func, typeid, regionid): typeid for typeid in typeids}
            for future in concurrent.futures.as_completed(futures):
                pbar.update(1)
    return no_data_typeids

def multithread_func(typeid, regionid):
    today = datetime.date.today()
    history = get_market_history(regionid, typeid) #URL-scraper
    if history != "error":
        import_full_history(history)
    else:
        return typeid
    return 0

def import_full_history(history):
    get_data_session = Session()
    print(type(history))
    get_data_session.execute(historical_data.insert(), item_dict)
    get_data_session.commit()
    Session.remove()
    return 0

我的数据库是这样构建的

基础

engine = create_engine('postgresql://user@localhost:5432/historic_market_data', pool_size=12, max_overflow=0)
session_factory = sessionmaker(bind = engine)
Base = declarative_base()

regionids

Session = scoped_session(session_factory)
class Regionid(Base):
    __tablename__ = 'regionids'
    regionid = Column(Integer, primary_key=True)
    query = Session.query_property()
    def __init__(self, regionid):
        self.regionid = regionid

typeids

Session = scoped_session(session_factory)
class Typeid(Base):
    __tablename__ = 'typeids'
    typeid = Column(Integer, primary_key=True)
    query = Session.query_property()
    def __init__(self, typeid):
        self.typeid = typeid

historical_data

class Historical_data(Base):
    __tablename__ = 'historical_data'
    
    id = Column(Integer, primary_key=True)
    typeid = Column('typeid', Integer, ForeignKey('typeids.typeid'))
    regionid = Column('regionid', Integer, ForeignKey('regionids.regionid'))
    date = Column(Date)
    average = Column(Float)
    highest = Column(Float)
    lowest = Column(Float)
    order_count = Column(Integer)
    volume = Column(Integer)
    buy_weighted_avg = Column(Float)
    buy_maxval = Column(Float)
    buy_minval = Column(Float)
    buy_stddev = Column(Float)
    buy_median = Column(Float)
    buy_volume = Column(Float)
    buy_numorders = Column(Integer)
    buy_fivepercent = Column(Float)
    sell_weighted_avg = Column(Float)
    sell_maxval = Column(Float)
    sell_minval = Column(Float)
    sell_stddev = Column(Float)
    sell_median = Column(Float)
    sell_volume = Column(Float)
    sell_numorders = Column(Integer)
    sell_fivepercent = Column(Float)
    
def __init__(self, title, release_date):
    self.typeid = typeid
    self.regionid = regionid
    self.date = date
    self.average = average
    self.highest = highest
    self.lowest = lowest
    self.order_count = order_count
    self.volume = volume
    self.buy_weighted_avg = buy_weighted_avg
    self.buy_maxval = buy_maxval
    self.buy_minval = buy_minval
    self.buy_stddev = buy_stddev
    self.buy_median = buy_median
    self.buy_volume = buy_volume
    self.buy_numorders = buy_numorders
    self.buy_fivepercent = buy_fivepercent
    self.sell_weighted_avg = sell_weighted_avg
    self.sell_maxval = sell_maxval
    self.sell_minval = sell_minval
    self.sell_stddev = sell_stddev
    self.sell_median = sell_median
    self.sell_volume = sell_volume
    self.sell_numorders = sell_numorders
    self.sell_fivepercent = sell_fivepercent

我已经设法通过使用 bulk_insert_mappings() 使其正常工作,只需将我的 import_full_history() 更改为

def import_full_esi_history(history):
    get_data_session = Session()
    get_data_session.bulk_insert_mappings(Historical_data, history)
    get_data_session.commit()
    Session.remove()
    return 0

我得到它来插入数据。 IT 还可以像我最初预期的那样结合多线程和多处理。似乎 Session.insert() 一次只适用于一列,而我在列表中的字典是针对整行的