如何在守护线程中关闭 sqlite 连接?

How to close sqlite connection in daemon thread?

我有多个线程处理数据并将其放入队列,还有一个线程从队列中获取数据,然后将其保存到数据库中。

我认为以下会导致内存泄漏:

class DBThread(threading.Thread):
    def __init__(self, myqueue):
        threading.Thread.__init__(self)
        self.myqueue = myqueue

    def run(self):
        conn = sqlite3.connect("test.db")
        c = conn.cursor()

        while True:
            data = myqueue.get()
            if data:
                c.execute("INSERT INTO test (data) VALUES (?)", (data,))
                conn.commit()

            self.myqueue.task_done()

        #conn.close()    <--- never reaches this point

q = Queue.Queue()

# Create other threads
....

# Create DB thread
t = DBThread(q)
t.setDaemon(True)
t.start()

q.join()

我不能将 conn.close() 放在 while 循环中,因为我认为这会在第一个循环中关闭连接。我不能把它放在if data:语句中,因为那样它不会保存稍后可能放入队列的数据。

在哪里关闭数据库连接?如果我不关闭它,这会不会导致内存泄漏?

如果您可以使用不会出现在您的正常数据中的标记值,例如None,您可以在 finally 子句中向线程发出停止和关闭数据库连接的信号:

import threading
import Queue
import sqlite3

class DBThread(threading.Thread):
    def __init__(self, myqueue, db_path):
        threading.Thread.__init__(self)
        self.myqueue = myqueue
        self.db_path = db_path

    def run(self):
        conn = sqlite3.connect(self.db_path)

        try:
            while True:
                data = self.myqueue.get()    
                if data is None:    # check for sentinel value
                    break

                with conn:
                    conn.execute("INSERT INTO test (data) VALUES (?)", (data,))
                self.myqueue.task_done()
        finally:
            conn.close()


q = Queue.Queue()
for i in range(100):
    q.put(str(i))

conn = sqlite3.connect('test.db')
conn.execute('create table if not exists test (data text)')
conn.close()

t = DBThread(q, 'test.db')
t.start()

q.join()
q.put(None)    # tell database thread to terminate

如果您不能使用标记值,您可以在 while 循环中检查的 class 添加一个标志。还要在设置标志的线程class中添加一个stop()方法。您将需要使用非阻塞 Queue.get():

class DBThread(threading.Thread):
    def __init__(self, myqueue, db_path):
        threading.Thread.__init__(self)
        self.myqueue = myqueue
        self.db_path = db_path
        self._terminate = False

    def terminate(self):
        self._terminate = True

    def run(self):
        conn = sqlite3.connect(self.db_path)

        try:
            while not self._terminate:
                try:
                    data = self.myqueue.get(timeout=1)
                except Queue.Empty:
                    continue

                with conn:
                    conn.execute("INSERT INTO test (data) VALUES (?)", (data,))
                self.myqueue.task_done()
        finally:
            conn.close()

....
q.join()
t.terminate()    # tell database thread to terminate

最后,值得一提的是,如果数据库线程设法耗尽队列,即如果 q.join() returns,您的程序可能会终止。这是因为 db 线程是守护线程,不会阻止主线程退出。您需要确保您的工作线程产生足够的数据来保持数据库线程忙碌,否则 q.join() 将 return 并且主线程将退出。