使用 Python 中安装的 libsqlite3 版本

Use the installed version of libsqlite3 from Python

是否可以将 Python 中安装的 SQLite3 版本与 ctypes 一起使用?如果可以,怎么做?

在 Mac 上,以下工作没有错误:

from ctypes import CDLL
libsqlite3 = CDLL("libsqlite3.dylib")

...但是从 https://www.sqlite.org/c3ref/sqlite3.html

Each open SQLite database is represented by a pointer to an instance of the opaque structure named "sqlite3".

(强调我的)

这对我来说表明你不能真正为数据库制作一个 ctypes.Structure,然后传递给 sqlite3_open

(上下文:我想使用 Python 中未被内置 sqlite3 模块公开的 SQLite 部分)

sqlite3-API使用了一个不透明的指针,所以最终不需要知道它的内存布局——只需要使用一个void-指针即可。

例如,打开一个 sqlite3 数据库会创建这样一个指针:

int sqlite3_open(
  const char *filename,   /* Database filename (UTF-8) */
  sqlite3 **ppDb          /* OUT: SQLite db handle */
);

即第二个参数是指向指针的指针。此函数将创建结构并将其地址提供给我们——根本不需要知道结构的确切布局。

稍后,我们只需要这个结构的地址就可以使用更多的功能,即:

int sqlite3_close(sqlite3*);

类型安全是由编译器确保的,一旦我们有了机器代码,手套就脱掉了,我们可以将任何东西而不是 sqlite3* 传递给函数,但我们必须确保它会工作。任何指针都可以用 void* 替换,只要它指向有效的内存(即具有正确的内存布局)。这导致:

import ctypes
libsqlite3 = ctypes.CDLL("libsqlite3.dylib")
sqlite3_handle = ctypes.c_void_p()  # nullptr

# pass handle by reference:
res = libsqlite3.sqlite3_open(b"mydb.db", ctypes.byref(sqlite3_handle))
print("open result",  res)                 # check res == 0
print("pointer value:", sqlite3_handle)    # address is set

# do what ever needed...

# example usage of handle:
res = libsqlite3.sqlite3_close(sqlite3_handle)
print("close result",  res)# check res == 0

sqlite3_handle = None # make sure nobody accesses dangling pointer

这有点快速和肮脏:通常需要设置参数类型和 return-值类型。但是在上面的函数中,默认值会得到正确的行为,所以我跳过了这个(否则很重要)步骤。

基于, this is a more complete example of how to use libsqlite3 from Python, which is also at https://gist.github.com/michalc/a3147997e21665896836e0f4157975cb

下面定义了一个(生成器)函数,query

from contextlib import contextmanager
from collections import namedtuple
from ctypes import cdll, byref, string_at, c_char_p, c_int, c_double, c_int64, c_void_p
from sys import platform


def query(db_file, sql, params=()):
    libsqlite3 = cdll.LoadLibrary({'linux': 'libsqlite3.so', 'darwin': 'libsqlite3.dylib'}[platform])
    libsqlite3.sqlite3_errstr.restype = c_char_p
    libsqlite3.sqlite3_errmsg.restype = c_char_p
    libsqlite3.sqlite3_column_name.restype = c_char_p
    libsqlite3.sqlite3_column_double.restype = c_double
    libsqlite3.sqlite3_column_int64.restype = c_int64
    libsqlite3.sqlite3_column_blob.restype = c_void_p
    libsqlite3.sqlite3_column_bytes.restype = c_int64
    SQLITE_ROW = 100
    SQLITE_DONE = 101
    SQLITE_TRANSIENT = -1
    SQLITE_OPEN_READWRITE = 0x00000002

    bind = {
        type(0): libsqlite3.sqlite3_bind_int64,
        type(0.0): libsqlite3.sqlite3_bind_double,
        type(''): lambda pp_stmt, i, value: libsqlite3.sqlite3_bind_text(pp_stmt, i, value.encode('utf-8'), len(value.encode('utf-8')), SQLITE_TRANSIENT),
        type(b''): lambda pp_stmt, i, value: libsqlite3.sqlite3_bind_blob(pp_stmt, i, value, len(value), SQLITE_TRANSIENT),
        type(None): lambda pp_stmt, i, _: libsqlite3.sqlite3_bind_null(pp_stmt, i),
    }

    extract = {
        1: libsqlite3.sqlite3_column_int64,
        2: libsqlite3.sqlite3_column_double,
        3: lambda pp_stmt, i: string_at(
            libsqlite3.sqlite3_column_blob(pp_stmt, i),
            libsqlite3.sqlite3_column_bytes(pp_stmt, i),
        ).decode(),
        4: lambda pp_stmt, i: string_at(
            libsqlite3.sqlite3_column_blob(pp_stmt, i),
            libsqlite3.sqlite3_column_bytes(pp_stmt, i),
        ),
        5: lambda pp_stmt, i: None,
    }

    def run(func, *args):
        res = func(*args)
        if res != 0:
            raise Exception(libsqlite3.sqlite3_errstr(res).decode())

    def run_with_db(db, func, *args):
        if func(*args) != 0:
            raise Exception(libsqlite3.sqlite3_errmsg(db).decode())

    @contextmanager
    def get_db(db_file):
        db = c_void_p()
        run(libsqlite3.sqlite3_open_v2, db_file.encode(), byref(db), SQLITE_OPEN_READWRITE, None)
        try:
            yield db
        finally:
            run_with_db(db, libsqlite3.sqlite3_close, db)

    @contextmanager
    def get_pp_stmt(db, sql):
        pp_stmt = c_void_p()
        run_with_db(db, libsqlite3.sqlite3_prepare_v3, db, sql.encode(), -1, 0, byref(pp_stmt), None)
        try:
            yield pp_stmt
        finally:
            run_with_db(db, libsqlite3.sqlite3_finalize, pp_stmt)

    with \
            get_db(db_file) as db, \
            get_pp_stmt(db, sql) as pp_stmt:

        for i, param in enumerate(params):
            run_with_db(db, bind[type(param)], pp_stmt, i + 1, param)

        row_constructor = namedtuple('Row', (
            libsqlite3.sqlite3_column_name(pp_stmt, i).decode()
            for i in range(0, libsqlite3.sqlite3_column_count(pp_stmt))
        ))

        while True:
            res = libsqlite3.sqlite3_step(pp_stmt)
            if res == SQLITE_DONE:
                break
            if res != SQLITE_ROW:
                raise Exception(libsqlite3.sqlite3_errstr(res).decode())

            yield row_constructor(*(
                extract[libsqlite3.sqlite3_column_type(pp_stmt, i)](pp_stmt, i)
                for i in range(0, len(row_constructor._fields))
            ))

可以用作,例如:

for row in query('my.db', 'SELECT * FROM my_table WHERE a = ?;', ('b',)):
    print(row)