Caching/reusing 一个数据库连接供以后查看使用情况

Caching/reusing a DB connection for later view usage

我正在保存用户的数据库连接。在他们第一次输入凭据时,我会执行以下操作:

self.conn = MySQLdb.connect (
    host = 'aaa',
    user = 'bbb',
    passwd = 'ccc',
    db = 'ddd',
    charset='utf8'
)
cursor = self.conn.cursor()
cursor.execute("SET NAMES utf8")
cursor.execute('SET CHARACTER SET utf8;')
cursor.execute('SET character_set_connection=utf8;')

然后我准备好 conn 来处理所有用户的查询。但是,我不想在每次加载 view 时都重新连接。我将如何存储此 "open connection" 以便我可以在视图中执行类似以下操作:

def do_queries(request, sql):
    user = request.user
    conn = request.session['conn']
    cursor = request.session['cursor']
    cursor.execute(sql)

更新:上面的内容似乎是不可能的,也不是很好的做法,所以让我重新表述一下我正在尝试做的事情:

我有一个 sql 编辑器,用户可以在输入凭据后使用它(想想 Navicat 或 SequelPro 之类的东西)。请注意,这是 NOT 默认的 django 数据库连接——我事先不知道凭据。现在,一旦用户拥有 'connected',我希望他们能够进行任意数量的查询,而无需我每次都重新连接。例如——再次重复——像 Navicat 或 SequelPro 这样的东西。如何使用 python、django 或 mysql 完成此操作?也许我真的不明白这里有什么必要(缓存连接?连接池?等),所以任何建议或帮助将不胜感激。

您可以使用 IoC 容器为您存储单例提供程序。本质上,它不会每次都构建一个新连接,它只会构建一次(第一次 ConnectionContainer.connection_provider() 被调用),此后它将始终 return 先前构建的连接。

您需要 dependency-injector 包才能使我的示例工作:

import dependency_injector.containers as containers
import dependency_injector.providers as providers


class ConnectionProvider():
    def __init__(self, host, user, passwd, db, charset):
        self.conn = MySQLdb.connect(
            host=host,
            user=user,
            passwd=passwd,
            db=db,
            charset=charset
        )


class ConnectionContainer(containers.DeclarativeContainer):
    connection_provider = providers.Singleton(ConnectionProvider,
                                              host='aaa',
                                              user='bbb',
                                              passwd='ccc',
                                              db='ddd',
                                              charset='utf8')


def do_queries(request, sql):
    user = request.user
    conn = ConnectionContainer.connection_provider().conn
    cursor = conn.cursor()
    cursor.execute(sql)

我在这里对连接字符串进行了硬编码,但也可以根据可更改的配置使其可变。在这种情况下,您还可以为配置文件创建一个容器,并让连接容器从那里读取其配置。然后在运行时设置配置。如下:

import dependency_injector.containers as containers
import dependency_injector.providers as providers

class ConnectionProvider():
    def __init__(self, connection_config):
        self.conn = MySQLdb.connect(**connection_config)

class ConfigContainer(containers.DeclarativeContainer):
    connection_config = providers.Configuration("connection_config")

class ConnectionContainer(containers.DeclarativeContainer):
    connection_provider = providers.Singleton(ConnectionProvider, ConfigContainer.connection_config)

def do_queries(request, sql):
    user = request.user
    conn = ConnectionContainer.connection_provider().conn
    cursor = conn.cursor()
    cursor.execute(sql)


# run code
my_config = {
    'host':'aaa',
    'user':'bbb',
    'passwd':'ccc',
    'db':'ddd',
    'charset':'utf8'
}

ConfigContainer.connection_config.override(my_config)
request = ...
sql = ...

do_queries(request, sql)

我不是该领域的专家,但我相信 PgBouncer 会为您完成这项工作,前提是您能够使用 PostgreSQL 后端(这是您不了解的一个细节)弄清)。 PgBouncer 是一个 连接池 ,它允许您重复使用连接,避免在每个请求上连接的开销。

根据他们documentation:

user, password

If user= is set, all connections to the destination database will be done with the specified user, meaning that there will be only one pool for this database.

Otherwise PgBouncer tries to log into the destination database with client username, meaning that there will be one pool per user.

因此,您可以为每个用户设置一个连接池,这听起来正是您想要的。

在 MySQL 地区,mysql.connector.pooling 模块允许您进行一些连接池化,但我不确定您是否可以进行每用户池化。鉴于您可以设置池名称,我猜您可以使用用户名来标识池。

无论您使用什么,您都可能会遇到无法避免重新连接的情况(用户连接,做一些事情,去开会和吃午饭,回来并想采取更多行动)。

我只是在这里分享我的知识。

安装 PyMySQL 以使用 MySql

对于Python2.x

pip install PyMySQL

对于Python3.x

pip3 install PyMySQL

1. 如果您愿意使用 Django 框架,那么 运行 SQL 查询非常容易,无需任何重新连接。

在 setting.py 文件中添加以下行

DATABASES = {
        'default': {
            'ENGINE': 'django.db.backends.mysql',
            'NAME': 'test',
            'USER': 'test',
            'PASSWORD': 'test',
            'HOST': 'localhost',
            'OPTIONS': {'charset': 'utf8mb4'},
        }
    }

在 views.py 文件中添加这些行以获取数据。您可以根据需要自定义您的查询

from django.db import connection
def connect(request):
    cursor = connection.cursor()
    cursor.execute("SELECT * FROM Tablename");
    results = cursor.fetchall()
    return results 

你会得到想要的结果。

点击here了解更多信息

2。对于 python Tkinter

from Tkinter import *
import MySQLdb

db = MySQLdb.connect("localhost","root","root","test")
# prepare a cursor object using cursor() method
cursor = db.cursor()
cursor.execute("SELECT * FROM Tablename")
if cursor.fetchone() is not None:
    print("In If")
else:
    print("In Else")
cursor.close()

参考this了解更多信息

PS: 您可以查看此 link 以了解您的问题,以便稍后重用数据库连接。

How to enable MySQL client auto re-connect with MySQLdb?

我不明白你为什么需要缓存连接,为什么不在每个请求缓存用户凭据的地方重新连接,但无论如何我会尝试概述一个可能符合你要求的解决方案。

我建议先研究一个更通用的任务 - 在您的应用需要处理的后续请求之间缓存一些内容,并且不能序列化到 django 的会话中。 在您的特定情况下,此共享值将是 数据库连接 (或多个连接)。 让我们从一个简单的任务开始,在请求之间共享一个简单的 计数器变量 ,只是为了了解幕后实际发生的事情。

令人惊讶的是,这两个答案都没有提到任何关于您可能使用的网络服务器的信息! 实际上有多种方法可以处理网络应用程序中的并发连接:

  1. 有多个进程,每个请求随机进入一个进程
  2. 有多个线程,每个请求都由随机线程
  3. 处理
  4. p.1 和 p.2 合并
  5. 各种async技术,当有一个single process + 事件循环 处理请求并注意请求处理程序不应长时间阻塞

根据我自己的经验,p.1-2 适用于大多数典型的网络应用程序。 Apache1.x 只能处理 p.1,Apache2.x 可以处理所有 1-3。

让我们从以下 django 应用程序和 运行 单进程 gunicorn 网络服务器开始。 我将使用 gunicorn,因为与 apache 不同,它的配置相当容易(个人意见 :-)

views.py

import time

from django.http import HttpResponse

c = 0

def main(self):
    global c
    c += 1
    return HttpResponse('val: {}\n'.format(c))


def heavy(self):
    time.sleep(10)
    return HttpResponse('heavy done')

urls.py

from django.contrib import admin
from django.urls import path

from . import views

urlpatterns = [
    path('admin/', admin.site.urls),
    path('', views.main, name='main'),
    path('heavy/', views.heavy, name='heavy')
]

运行它在单进程模式下:

gunicorn testpool.wsgi -w 1

这是我们的流程树 - 只有 1 个工作人员可以处理所有请求

pstree 77292
-+= 77292 oleg /Users/oleg/.virtualenvs/test3.4/bin/python /Users/oleg/.virtualenvs/test3.4/bin/gunicorn testpool.wsgi -w 1
 \--- 77295 oleg /Users/oleg/.virtualenvs/test3.4/bin/python /Users/oleg/.virtualenvs/test3.4/bin/gunicorn testpool.wsgi -w 1

正在尝试使用我们的应用程序:

curl 'http://127.0.0.1:8000'
val: 1

curl 'http://127.0.0.1:8000'
val: 2

curl 'http://127.0.0.1:8000'
val: 3

如您所见,您可以轻松地在后续请求之间共享计数器。 这里的问题是您只能并行处理单个请求。如果您在一个选项卡中请求 /heavy/,则 / 将无法工作,直到 /heavy完成

现在让我们使用 2 个工作进程:

gunicorn testpool.wsgi -w 2

这是进程树的样子:

 pstree 77285
-+= 77285 oleg /Users/oleg/.virtualenvs/test3.4/bin/python /Users/oleg/.virtualenvs/test3.4/bin/gunicorn testpool.wsgi -w 2
 |--- 77288 oleg /Users/oleg/.virtualenvs/test3.4/bin/python /Users/oleg/.virtualenvs/test3.4/bin/gunicorn testpool.wsgi -w 2
 \--- 77289 oleg /Users/oleg/.virtualenvs/test3.4/bin/python /Users/oleg/.virtualenvs/test3.4/bin/gunicorn testpool.wsgi -w 2

正在测试我们的应用程序:

curl 'http://127.0.0.1:8000'
val: 1

curl 'http://127.0.0.1:8000'
val: 2

curl 'http://127.0.0.1:8000'
val: 1

前两个请求已由第一个 worker process 第三个 处理 - 由具有自己内存的第二个工作进程处理 space 所以你看到 1 而不是 3。 请注意,您的输出可能会有所不同,因为过程 1 和 2 是随机选择的。但迟早你会遇到一个不同的过程。

这对我们不是很有帮助,因为我们需要处理 多个 并发请求,我们需要以某种方式让我们的请求由一般无法完成的特定进程处理案例.

大多数 池化 开箱即用的技术只会在 单个 进程范围内缓存连接,如果您的请求得到满足通过不同的过程 - 需要建立 NEW 连接。

让我们转到线程

gunicorn testpool.wsgi -w 1 --threads 2

同样 - 只有 1 个进程

pstree 77310
-+= 77310 oleg /Users/oleg/.virtualenvs/test3.4/bin/python /Users/oleg/.virtualenvs/test3.4/bin/gunicorn testpool.wsgi -w 1 --threads 2
 \--- 77313 oleg /Users/oleg/.virtualenvs/test3.4/bin/python /Users/oleg/.virtualenvs/test3.4/bin/gunicorn testpool.wsgi -w 1 --threads 2

现在,如果您在一个选项卡中 运行 /heavy,您仍然可以查询 / 和您的计数器将在请求之间 保留! 即使线程数量根据您的工作负载增加或减少,它仍然可以正常工作。

问题:您需要使用python线程同步技术同步访问共享变量( read more)。 另一个问题是同一用户可能需要并行发出多个查询 - 即打开多个选项卡。

要处理它,当您有可用的数据库凭据时,您可以在第一个请求上打开 多个 连接。

如果用户需要的连接数多于您的应用程序可能会等待锁定,直到连接可用。

回到你的问题

您可以创建一个具有以下方法的 class:

from contextlib import contextmanager

class ConnectionPool(object):

   def __init__(self, max_connections=4):
      self._pool = dict()
      self._max_connections = max_connections

   def preconnect(self, session_id, user, password):
       # create multiple connections and put them into self._pool
       # ...

    @contextmanager
    def get_connection(sef, session_id):
       # if have an available connection:
            # mark it as allocated
            # and return it
            try:
                yield connection
           finally:
              # put it back to the pool
              # ....
       # else
        # wait until there's a connection returned to the pool by another thread

pool = ConnectionPool(4)

def some_view(self):
     session_id = ...
     with pool.get_connection(session_id) as conn:
        conn.query(...)

这不是一个完整的解决方案 - 您需要以某种方式删除长时间未使用的过时连接。

如果用户在很长一段时间后回来并且他的连接已关闭,他将需要再次提供他的凭据 - 希望从您的应用程序的角度来看没问题。

另外请记住 python threads 有其性能损失,不确定这对您来说是否是个问题。

我还没有检查过 apache2(配置负担太大,我好久没用了,一般都用 uwsgi),但它应该也能用 - 会是很高兴收到你的回音 如果你设法 运行 )

也不要忘记 p.4(异步方法)——你不太可能在 apache 上使用它,但值得研究——关键字:django + gevent, django + asyncio。它有它的 pros/cons 并且可能会极大地影响您的应用实施,因此在不详细了解您的应用要求的情况下很难提出任何解决方案

在 Web 应用上下文中同步执行此类操作不是一个好主意。请记住,您的应用程序可能需要以多 process/thread 方式工作,并且您无法在进程之间正常共享连接。因此,如果您在一个进程上为您的用户创建连接,则无法保证在同一进程上收到查询请求。可能更好的想法是让一个进程后台工作者处理多个线程中的连接(每个会话一个线程)以对数据库进行查询并在 Web 应用程序上检索结果。您的应用程序应该为每个会话分配一个唯一的 ID,后台工作人员使用会话 ID 跟踪每个线程。您可以使用 celery 或任何其他支持异步结果的任务队列。所以设计会像下面这样:

             |<--|        |<--------------|                   |<--|
user (id: x) |   | webapp |   | queue |   | worker (thread x) |   | DB
             |-->|        |-->|       |-->|                   |-->|

您还可以为每个用户创建一个队列,直到他们有一个活动会话,因此您可以 运行 每个会话一个单独的后台进程。

我实际上分享了我对这个确切问题的解决方案。我在这里所做的是创建一个连接池,您可以指定最大连接数,然后通过此通道异步排队查询请求。这样你可以保持一定数量的连接打开,但它会异步排队和池并保持你习惯的速度。

这需要 gevent 和 postgres。