无法 sqlalchemy.orm 在多线程期间使用 Session.execute() 将数据插入我的 postgres 数据库 table
Cannot get sqlalchemy.orm to insert data into my postgres database table using Session.execute() during multithreading
首先,我对使用sqlite以外的数据库完全陌生,我只用过一点,python也是我只用了大约6个月的东西,所以请耐心等待如果我错过了一些明显的东西或完全误解了某些东西。
我有很多我正在抓取的历史市场数据(三个区域约 15000 个项目)并且为了有效地做到这一点,我试图通过为每个区域使用一个进程然后对每个进程进行多线程处理来获得所有项目。我从每个项目的抓取中得到的响应是一个字典列表,然后我想使用 Session.execute()
插入它。我还没有让它工作(如果你知道一种方法,请也指导我正确的方向),所以现在我只使用多线程,因为我已经成功地使用它来将数据插入 regionid 和 typeid table秒。
仍然没有数据插入我的 historical_data table 并且当我 运行 我的脚本时没有错误。我尝试使用
启用 sqlalchemy 日志记录
import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
这向我展示了我期望在 main 期间从 regionid 和 typeid tables 获取数据的调用,但在那之后什么都没有,这是否意味着我在多线程之后没有连接到数据库还是记录器不擅长处理多线程?
对于 regionid 和 typeid tables,我使用 Session.merge() 并使用 for 循环处理每个项目的数据,所以我猜这是我对 [=19 的使用=] 关闭了吗?
我尝试使用 sqlalchemy.orm 将我所有的历史数据插入到 postgres 数据库中
我用来尝试插入数据的实际脚本如下:
if __name__ == '__main__':
print("Start database session")
Base.metadata.create_all(engine)
Session = scoped_session(session_factory)
ini_params()
print("Get typeids and regionids from database")
typeids = get_typeids() #get all typeids from typeid table
regionids = get_regionids() #get all regionids from regionid table
typeids = typeid_test_list #uncomment for debug
print(typeids)
for idx, regionid in enumerate(regionids):
no_data = data_to_db(regionid, typeids, idx)
#no_data = multi_processing(regionid, typeids, idx)
print(no_data)
def data_to_db(regionid, typeids, idx):
ini_params()
position = int(idx)
no_data_typeids = []
prefix = 'Getting data for region ' + str(regionid)
typeid_length = len(typeids)
with tqdm(typeids, total=typeid_length, desc=prefix, position=position) as pbar:
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
futures = {executor.submit(multithread_func, typeid, regionid): typeid for typeid in typeids}
for future in concurrent.futures.as_completed(futures):
pbar.update(1)
return no_data_typeids
def multithread_func(typeid, regionid):
today = datetime.date.today()
history = get_market_history(regionid, typeid) #URL-scraper
if history != "error":
import_full_history(history)
else:
return typeid
return 0
def import_full_history(history):
get_data_session = Session()
print(type(history))
get_data_session.execute(historical_data.insert(), item_dict)
get_data_session.commit()
Session.remove()
return 0
我的数据库是这样构建的
基础
engine = create_engine('postgresql://user@localhost:5432/historic_market_data', pool_size=12, max_overflow=0)
session_factory = sessionmaker(bind = engine)
Base = declarative_base()
regionids
Session = scoped_session(session_factory)
class Regionid(Base):
__tablename__ = 'regionids'
regionid = Column(Integer, primary_key=True)
query = Session.query_property()
def __init__(self, regionid):
self.regionid = regionid
typeids
Session = scoped_session(session_factory)
class Typeid(Base):
__tablename__ = 'typeids'
typeid = Column(Integer, primary_key=True)
query = Session.query_property()
def __init__(self, typeid):
self.typeid = typeid
historical_data
class Historical_data(Base):
__tablename__ = 'historical_data'
id = Column(Integer, primary_key=True)
typeid = Column('typeid', Integer, ForeignKey('typeids.typeid'))
regionid = Column('regionid', Integer, ForeignKey('regionids.regionid'))
date = Column(Date)
average = Column(Float)
highest = Column(Float)
lowest = Column(Float)
order_count = Column(Integer)
volume = Column(Integer)
buy_weighted_avg = Column(Float)
buy_maxval = Column(Float)
buy_minval = Column(Float)
buy_stddev = Column(Float)
buy_median = Column(Float)
buy_volume = Column(Float)
buy_numorders = Column(Integer)
buy_fivepercent = Column(Float)
sell_weighted_avg = Column(Float)
sell_maxval = Column(Float)
sell_minval = Column(Float)
sell_stddev = Column(Float)
sell_median = Column(Float)
sell_volume = Column(Float)
sell_numorders = Column(Integer)
sell_fivepercent = Column(Float)
def __init__(self, title, release_date):
self.typeid = typeid
self.regionid = regionid
self.date = date
self.average = average
self.highest = highest
self.lowest = lowest
self.order_count = order_count
self.volume = volume
self.buy_weighted_avg = buy_weighted_avg
self.buy_maxval = buy_maxval
self.buy_minval = buy_minval
self.buy_stddev = buy_stddev
self.buy_median = buy_median
self.buy_volume = buy_volume
self.buy_numorders = buy_numorders
self.buy_fivepercent = buy_fivepercent
self.sell_weighted_avg = sell_weighted_avg
self.sell_maxval = sell_maxval
self.sell_minval = sell_minval
self.sell_stddev = sell_stddev
self.sell_median = sell_median
self.sell_volume = sell_volume
self.sell_numorders = sell_numorders
self.sell_fivepercent = sell_fivepercent
我已经设法通过使用 bulk_insert_mappings()
使其正常工作,只需将我的 import_full_history()
更改为
def import_full_esi_history(history):
get_data_session = Session()
get_data_session.bulk_insert_mappings(Historical_data, history)
get_data_session.commit()
Session.remove()
return 0
我得到它来插入数据。 IT 还可以像我最初预期的那样结合多线程和多处理。似乎 Session.insert()
一次只适用于一列,而我在列表中的字典是针对整行的
首先,我对使用sqlite以外的数据库完全陌生,我只用过一点,python也是我只用了大约6个月的东西,所以请耐心等待如果我错过了一些明显的东西或完全误解了某些东西。
我有很多我正在抓取的历史市场数据(三个区域约 15000 个项目)并且为了有效地做到这一点,我试图通过为每个区域使用一个进程然后对每个进程进行多线程处理来获得所有项目。我从每个项目的抓取中得到的响应是一个字典列表,然后我想使用 Session.execute()
插入它。我还没有让它工作(如果你知道一种方法,请也指导我正确的方向),所以现在我只使用多线程,因为我已经成功地使用它来将数据插入 regionid 和 typeid table秒。
仍然没有数据插入我的 historical_data table 并且当我 运行 我的脚本时没有错误。我尝试使用
import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
这向我展示了我期望在 main 期间从 regionid 和 typeid tables 获取数据的调用,但在那之后什么都没有,这是否意味着我在多线程之后没有连接到数据库还是记录器不擅长处理多线程?
对于 regionid 和 typeid tables,我使用 Session.merge() 并使用 for 循环处理每个项目的数据,所以我猜这是我对 [=19 的使用=] 关闭了吗?
我尝试使用 sqlalchemy.orm 将我所有的历史数据插入到 postgres 数据库中 我用来尝试插入数据的实际脚本如下:
if __name__ == '__main__':
print("Start database session")
Base.metadata.create_all(engine)
Session = scoped_session(session_factory)
ini_params()
print("Get typeids and regionids from database")
typeids = get_typeids() #get all typeids from typeid table
regionids = get_regionids() #get all regionids from regionid table
typeids = typeid_test_list #uncomment for debug
print(typeids)
for idx, regionid in enumerate(regionids):
no_data = data_to_db(regionid, typeids, idx)
#no_data = multi_processing(regionid, typeids, idx)
print(no_data)
def data_to_db(regionid, typeids, idx):
ini_params()
position = int(idx)
no_data_typeids = []
prefix = 'Getting data for region ' + str(regionid)
typeid_length = len(typeids)
with tqdm(typeids, total=typeid_length, desc=prefix, position=position) as pbar:
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
futures = {executor.submit(multithread_func, typeid, regionid): typeid for typeid in typeids}
for future in concurrent.futures.as_completed(futures):
pbar.update(1)
return no_data_typeids
def multithread_func(typeid, regionid):
today = datetime.date.today()
history = get_market_history(regionid, typeid) #URL-scraper
if history != "error":
import_full_history(history)
else:
return typeid
return 0
def import_full_history(history):
get_data_session = Session()
print(type(history))
get_data_session.execute(historical_data.insert(), item_dict)
get_data_session.commit()
Session.remove()
return 0
我的数据库是这样构建的
基础
engine = create_engine('postgresql://user@localhost:5432/historic_market_data', pool_size=12, max_overflow=0)
session_factory = sessionmaker(bind = engine)
Base = declarative_base()
regionids
Session = scoped_session(session_factory)
class Regionid(Base):
__tablename__ = 'regionids'
regionid = Column(Integer, primary_key=True)
query = Session.query_property()
def __init__(self, regionid):
self.regionid = regionid
typeids
Session = scoped_session(session_factory)
class Typeid(Base):
__tablename__ = 'typeids'
typeid = Column(Integer, primary_key=True)
query = Session.query_property()
def __init__(self, typeid):
self.typeid = typeid
historical_data
class Historical_data(Base):
__tablename__ = 'historical_data'
id = Column(Integer, primary_key=True)
typeid = Column('typeid', Integer, ForeignKey('typeids.typeid'))
regionid = Column('regionid', Integer, ForeignKey('regionids.regionid'))
date = Column(Date)
average = Column(Float)
highest = Column(Float)
lowest = Column(Float)
order_count = Column(Integer)
volume = Column(Integer)
buy_weighted_avg = Column(Float)
buy_maxval = Column(Float)
buy_minval = Column(Float)
buy_stddev = Column(Float)
buy_median = Column(Float)
buy_volume = Column(Float)
buy_numorders = Column(Integer)
buy_fivepercent = Column(Float)
sell_weighted_avg = Column(Float)
sell_maxval = Column(Float)
sell_minval = Column(Float)
sell_stddev = Column(Float)
sell_median = Column(Float)
sell_volume = Column(Float)
sell_numorders = Column(Integer)
sell_fivepercent = Column(Float)
def __init__(self, title, release_date):
self.typeid = typeid
self.regionid = regionid
self.date = date
self.average = average
self.highest = highest
self.lowest = lowest
self.order_count = order_count
self.volume = volume
self.buy_weighted_avg = buy_weighted_avg
self.buy_maxval = buy_maxval
self.buy_minval = buy_minval
self.buy_stddev = buy_stddev
self.buy_median = buy_median
self.buy_volume = buy_volume
self.buy_numorders = buy_numorders
self.buy_fivepercent = buy_fivepercent
self.sell_weighted_avg = sell_weighted_avg
self.sell_maxval = sell_maxval
self.sell_minval = sell_minval
self.sell_stddev = sell_stddev
self.sell_median = sell_median
self.sell_volume = sell_volume
self.sell_numorders = sell_numorders
self.sell_fivepercent = sell_fivepercent
我已经设法通过使用 bulk_insert_mappings()
使其正常工作,只需将我的 import_full_history()
更改为
def import_full_esi_history(history):
get_data_session = Session()
get_data_session.bulk_insert_mappings(Historical_data, history)
get_data_session.commit()
Session.remove()
return 0
我得到它来插入数据。 IT 还可以像我最初预期的那样结合多线程和多处理。似乎 Session.insert()
一次只适用于一列,而我在列表中的字典是针对整行的