如何在守护线程中关闭 sqlite 连接?
How to close sqlite connection in daemon thread?
我有多个线程处理数据并将其放入队列,还有一个线程从队列中获取数据,然后将其保存到数据库中。
我认为以下会导致内存泄漏:
class DBThread(threading.Thread):
def __init__(self, myqueue):
threading.Thread.__init__(self)
self.myqueue = myqueue
def run(self):
conn = sqlite3.connect("test.db")
c = conn.cursor()
while True:
data = myqueue.get()
if data:
c.execute("INSERT INTO test (data) VALUES (?)", (data,))
conn.commit()
self.myqueue.task_done()
#conn.close() <--- never reaches this point
q = Queue.Queue()
# Create other threads
....
# Create DB thread
t = DBThread(q)
t.setDaemon(True)
t.start()
q.join()
我不能将 conn.close()
放在 while 循环中,因为我认为这会在第一个循环中关闭连接。我不能把它放在if data:
语句中,因为那样它不会保存稍后可能放入队列的数据。
在哪里关闭数据库连接?如果我不关闭它,这会不会导致内存泄漏?
如果您可以使用不会出现在您的正常数据中的标记值,例如None
,您可以在 finally
子句中向线程发出停止和关闭数据库连接的信号:
import threading
import Queue
import sqlite3
class DBThread(threading.Thread):
def __init__(self, myqueue, db_path):
threading.Thread.__init__(self)
self.myqueue = myqueue
self.db_path = db_path
def run(self):
conn = sqlite3.connect(self.db_path)
try:
while True:
data = self.myqueue.get()
if data is None: # check for sentinel value
break
with conn:
conn.execute("INSERT INTO test (data) VALUES (?)", (data,))
self.myqueue.task_done()
finally:
conn.close()
q = Queue.Queue()
for i in range(100):
q.put(str(i))
conn = sqlite3.connect('test.db')
conn.execute('create table if not exists test (data text)')
conn.close()
t = DBThread(q, 'test.db')
t.start()
q.join()
q.put(None) # tell database thread to terminate
如果您不能使用标记值,您可以在 while
循环中检查的 class 添加一个标志。还要在设置标志的线程class中添加一个stop()
方法。您将需要使用非阻塞 Queue.get()
:
class DBThread(threading.Thread):
def __init__(self, myqueue, db_path):
threading.Thread.__init__(self)
self.myqueue = myqueue
self.db_path = db_path
self._terminate = False
def terminate(self):
self._terminate = True
def run(self):
conn = sqlite3.connect(self.db_path)
try:
while not self._terminate:
try:
data = self.myqueue.get(timeout=1)
except Queue.Empty:
continue
with conn:
conn.execute("INSERT INTO test (data) VALUES (?)", (data,))
self.myqueue.task_done()
finally:
conn.close()
....
q.join()
t.terminate() # tell database thread to terminate
最后,值得一提的是,如果数据库线程设法耗尽队列,即如果 q.join()
returns,您的程序可能会终止。这是因为 db 线程是守护线程,不会阻止主线程退出。您需要确保您的工作线程产生足够的数据来保持数据库线程忙碌,否则 q.join()
将 return 并且主线程将退出。
我有多个线程处理数据并将其放入队列,还有一个线程从队列中获取数据,然后将其保存到数据库中。
我认为以下会导致内存泄漏:
class DBThread(threading.Thread):
def __init__(self, myqueue):
threading.Thread.__init__(self)
self.myqueue = myqueue
def run(self):
conn = sqlite3.connect("test.db")
c = conn.cursor()
while True:
data = myqueue.get()
if data:
c.execute("INSERT INTO test (data) VALUES (?)", (data,))
conn.commit()
self.myqueue.task_done()
#conn.close() <--- never reaches this point
q = Queue.Queue()
# Create other threads
....
# Create DB thread
t = DBThread(q)
t.setDaemon(True)
t.start()
q.join()
我不能将 conn.close()
放在 while 循环中,因为我认为这会在第一个循环中关闭连接。我不能把它放在if data:
语句中,因为那样它不会保存稍后可能放入队列的数据。
在哪里关闭数据库连接?如果我不关闭它,这会不会导致内存泄漏?
如果您可以使用不会出现在您的正常数据中的标记值,例如None
,您可以在 finally
子句中向线程发出停止和关闭数据库连接的信号:
import threading
import Queue
import sqlite3
class DBThread(threading.Thread):
def __init__(self, myqueue, db_path):
threading.Thread.__init__(self)
self.myqueue = myqueue
self.db_path = db_path
def run(self):
conn = sqlite3.connect(self.db_path)
try:
while True:
data = self.myqueue.get()
if data is None: # check for sentinel value
break
with conn:
conn.execute("INSERT INTO test (data) VALUES (?)", (data,))
self.myqueue.task_done()
finally:
conn.close()
q = Queue.Queue()
for i in range(100):
q.put(str(i))
conn = sqlite3.connect('test.db')
conn.execute('create table if not exists test (data text)')
conn.close()
t = DBThread(q, 'test.db')
t.start()
q.join()
q.put(None) # tell database thread to terminate
如果您不能使用标记值,您可以在 while
循环中检查的 class 添加一个标志。还要在设置标志的线程class中添加一个stop()
方法。您将需要使用非阻塞 Queue.get()
:
class DBThread(threading.Thread):
def __init__(self, myqueue, db_path):
threading.Thread.__init__(self)
self.myqueue = myqueue
self.db_path = db_path
self._terminate = False
def terminate(self):
self._terminate = True
def run(self):
conn = sqlite3.connect(self.db_path)
try:
while not self._terminate:
try:
data = self.myqueue.get(timeout=1)
except Queue.Empty:
continue
with conn:
conn.execute("INSERT INTO test (data) VALUES (?)", (data,))
self.myqueue.task_done()
finally:
conn.close()
....
q.join()
t.terminate() # tell database thread to terminate
最后,值得一提的是,如果数据库线程设法耗尽队列,即如果 q.join()
returns,您的程序可能会终止。这是因为 db 线程是守护线程,不会阻止主线程退出。您需要确保您的工作线程产生足够的数据来保持数据库线程忙碌,否则 q.join()
将 return 并且主线程将退出。