SqlAlchemy asyncio orm:如何查询数据库

SqlAlchemy asyncio orm: How to query the database

在 SqlAlchemy 异步 orm 引擎中,如何查询 table 并获取值或全部?

我从非异步方法中知道我可以做到

SESSION.query(TableClass).get(x)

但使用异步方法尝试此操作会抛出下一个错误:

AttributeError: 'AsyncSession' object has no attribute 'query'.

这是我定义的 SESSION 变量。 LOOP 变量只是 asyncio.get_event_loop() 用于在加载我的 sql 模块时启动异步方法,并填充用作缓存的变量以避免每次我需要某些东西时缓存数据库:

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session

from .. import CONFIG, LOOP

def _build_async_db_uri(uri):
    if "+asyncpg" not in uri:
        return '+asyncpg:'.join(uri.split(":", 1))
    return uri

async def start() -> declarative_base:
    engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
    async with engine.begin() as conn:
        BASE.metadata.bind = engine
        await conn.run_sync(BASE.metadata.create_all)
    return scoped_session(sessionmaker(bind=engine, autoflush=False, class_=AsyncSession))


BASE = declarative_base()
SESSION = LOOP.run_until_complete(start())

下面是table和缓存函数的例子:

class TableClass:
    __tablename__ = "tableclass"
    id = Column(Integer, primary_key = True)
    alias = Column(Integer)

CACHE = {}
async def _load_all():
    global CACHE
    try:
        curr = await SESSION.query(TableClass).all()
        CACHE = {i.id: i.alias for i in curr}

LOOP.run_until_complete(_load_all())

session.query is the old API. The asynchronous version uses select and accompanying methods.

from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker


engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
async_session = sessionmaker(
    engine, expire_on_commit=False, class_=AsyncSession
)


CACHE = {}
async def _load_all():
    global CACHE
    try:
        async with async_session() as session:
            q = select(TableClass)
            result = await session.execute(q)
            curr = result.scalars()
            CACHE = {i.id: i.alias for i in curr}
    except:
        pass

LOOP.run_until_complete(_load_all())

您可以阅读更多关于 SqlAlchemy 异步的内容 I/O here

New in version 1.4: Added Session.get(), which is moved from the now deprecated Query.get() method.

  id = 1
  user = await session.get(User, id)

SQLAlchemy 1.4 documentation