使用 pd.read_sql 和 asyncio 从数据库读取
Read from database using pd.read_sql and asyncio
我有三个表,每个表需要大约 1 分钟的时间来查询(即总共 3 分钟),就像这样
from my_utils import get_engine
import pandas as pd
def main():
con1 = get_engine("table1")
con2 = get_engine("table2")
con3 = get_engine("table3")
df1 = pd.read_sql(query1,con=con1)
df2 = pd.read_sql(query2,con=con2)
df3 = pd.read_sql(query3,con=con3)
main()
让天“异步”。
因此我尝试了以下方法(我对使用 asyncio
很陌生)
.
.
import asyncio
async def get_df1(query1):
df1 = pd.read_sql(query1,con=con1)
return df1
async def get_df2(query2):
df2 = pd.read_sql(query2,con=con2)
return df2
async def get_df3(query3):
df3 = pd.read_sql(query3,con=con3)
return df3
async def main():
df1,df2,df3 = await asyncio.gather(get_df1(),get_df2(),get_df3())
asyncio.run(main())
它 运行 秒,但它花费的时间与同步-运行.
完全相同
我是不是漏掉了什么?
协程之间的切换仅发生在 await
语句中,并且由于在 get_df
函数中没有 await
,因此您的三个查询只会按顺序执行。由于 pd.read_sql
本身不是异步的,因此您必须用执行程序将其包装起来以制作异步版本:
async def read_sql_async(stmt, con):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, pd.read_sql, stmt, con)
然后您将能够 运行 read_sql
作为等待对象:
df1 = await read_sql_async(query1, con=con1)
我有三个表,每个表需要大约 1 分钟的时间来查询(即总共 3 分钟),就像这样
from my_utils import get_engine
import pandas as pd
def main():
con1 = get_engine("table1")
con2 = get_engine("table2")
con3 = get_engine("table3")
df1 = pd.read_sql(query1,con=con1)
df2 = pd.read_sql(query2,con=con2)
df3 = pd.read_sql(query3,con=con3)
main()
让天“异步”。
因此我尝试了以下方法(我对使用 asyncio
很陌生)
.
.
import asyncio
async def get_df1(query1):
df1 = pd.read_sql(query1,con=con1)
return df1
async def get_df2(query2):
df2 = pd.read_sql(query2,con=con2)
return df2
async def get_df3(query3):
df3 = pd.read_sql(query3,con=con3)
return df3
async def main():
df1,df2,df3 = await asyncio.gather(get_df1(),get_df2(),get_df3())
asyncio.run(main())
它 运行 秒,但它花费的时间与同步-运行.
完全相同我是不是漏掉了什么?
协程之间的切换仅发生在 await
语句中,并且由于在 get_df
函数中没有 await
,因此您的三个查询只会按顺序执行。由于 pd.read_sql
本身不是异步的,因此您必须用执行程序将其包装起来以制作异步版本:
async def read_sql_async(stmt, con):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, pd.read_sql, stmt, con)
然后您将能够 运行 read_sql
作为等待对象:
df1 = await read_sql_async(query1, con=con1)