multiprocessing / psycopg2 TypeError: can't pickle _thread.RLock objects
multiprocessing / psycopg2 TypeError: can't pickle _thread.RLock objects
我按照以下代码在 postgres 数据库上实现并行 select 查询:
https://tech.geoblink.com/2017/07/06/parallelizing-queries-in-postgresql-with-python/
我的基本问题是我有 ~6k 查询需要执行,我正在尝试优化这些 select 查询的执行。最初它是一个包含所有 6k 谓词 ID 的 where id in (...)
查询,但我 运行 遇到问题,查询在它 运行 所在的机器上用完 > 4GB RAM,所以我决定将其拆分为 6k 个单独的查询,这些查询在同步时保持稳定的内存使用。然而, 运行 需要更长的时间,这对我的用例来说不是问题。尽管如此,我还是尽量减少时间。
我的代码是这样的:
class PostgresConnector(object):
def __init__(self, db_url):
self.db_url = db_url
self.engine = self.init_connection()
self.pool = self.init_pool()
def init_pool(self):
CPUS = multiprocessing.cpu_count()
return multiprocessing.Pool(CPUS)
def init_connection(self):
LOGGER.info('Creating Postgres engine')
return create_engine(self.db_url)
def run_parallel_queries(self, queries):
results = []
try:
for i in self.pool.imap_unordered(self.execute_parallel_query, queries):
results.append(i)
except Exception as exception:
LOGGER.error('Error whilst executing %s queries in parallel: %s', len(queries), exception)
raise
finally:
self.pool.close()
self.pool.join()
LOGGER.info('Parallel query ran producing %s sets of results of type: %s', len(results), type(results))
return list(chain.from_iterable(results))
def execute_parallel_query(self, query):
con = psycopg2.connect(self.db_url)
cur = con.cursor()
cur.execute(query)
records = cur.fetchall()
con.close()
return list(records)
然而,每当这个 运行s 时,我都会收到以下错误:
TypeError: can't pickle _thread.RLock objects
我读过很多关于使用多处理和可腌制对象的类似问题,但我终究无法弄清楚我做错了什么。
池通常是每个进程一个(我认为这是最佳实践)但每个连接器实例共享 class 这样它就不会为每次使用 parallel_query 创建一个池方法。
类似问题的最佳答案:
Accessing a MySQL connection pool from Python multiprocessing
除了使用 MySql 而不是 Postgres 之外,显示了与我自己的几乎相同的实现。
我是不是做错了什么?
谢谢!
编辑:
我找到了这个答案:
非常详细,看起来我误解了 multiprocessing.Pool
与 ThreadedConnectionPool
等连接池的区别。然而,在第一个 link 中,它没有提到需要任何连接池等。这个解决方案看起来不错,但对于我认为相当简单的问题,似乎有很多代码?
编辑 2:
所以上面的 link 解决了另一个问题,无论如何我可能 运行 都会遇到这个问题,所以我很高兴我发现了它,但是它并没有解决最初无法解决的问题使用 imap_unordered
直到酸洗错误。非常令人沮丧。
最后,我认为可能值得注意的是,这个 运行s 在 Heroku 中,在 worker dyno 上,使用 Redis rq 进行调度、后台任务等,并使用 Postgres 的托管实例作为数据库。
简单来说,postgres连接和sqlalchemy连接池是线程安全的,但是它们不是fork安全的。
如果要使用多处理,应该在fork之后的每个子进程中初始化引擎。
如果你想共享引擎,你应该使用多线程。
参考Thread and process safety in psycopg2 documentation:
libpq connections
shouldn’t be used by a forked processes, so when using a module such
as multiprocessing or a forking web deploy method such as FastCGI make
sure to create the connections after the fork.
如果您使用 multiprocessing.Pool,则有一个关键字参数初始值设定项,可用于 运行 在每个子进程上编码一次。试试这个:
class PostgresConnector(object):
def __init__(self, db_url):
self.db_url = db_url
self.pool = self.init_pool()
def init_pool(self):
CPUS = multiprocessing.cpu_count()
return multiprocessing.Pool(CPUS, initializer=self.init_connection(self.db_url))
@classmethod
def init_connection(cls, db_url):
def _init_connection():
LOGGER.info('Creating Postgres engine')
cls.engine = create_engine(db_url)
return _init_connection
def run_parallel_queries(self, queries):
results = []
try:
for i in self.pool.imap_unordered(self.execute_parallel_query, queries):
results.append(i)
except Exception as exception:
LOGGER.error('Error whilst executing %s queries in parallel: %s', len(queries), exception)
raise
finally:
pass
#self.pool.close()
#self.pool.join()
LOGGER.info('Parallel query ran producing %s sets of results of type: %s', len(results), type(results))
return list(chain.from_iterable(results))
def execute_parallel_query(self, query):
with self.engine.connect() as conn:
with conn.begin():
result = conn.execute(query)
return result.fetchall()
def __getstate__(self):
# this is a hack, if you want to remove this method, you should
# remove self.pool and just pass pool explicitly
self_dict = self.__dict__.copy()
del self_dict['pool']
return self_dict
现在,解决 XY 问题。
Initially it was a single query with the where id in (...) contained
all 6k predicate IDs but I ran into issues with the query using up >
4GB of RAM on the machine it ran on, so I decided to split it out into
6k individual queries which when synchronously keeps a steady memory
usage.
您可能想要做的是以下选项之一:
- 编写一个生成所有 6000 个 ID 的子查询,并在您的原始批量查询中使用该子查询。
- 同上,但是将子查询写成 CTE
- 如果您的 ID 列表来自外部来源(即不是来自数据库),那么您可以创建一个包含 6000 个 ID 的临时 table,然后 运行 您的原始批量查询临时 table
但是,如果您坚持 运行通过 python 处理 6000 个 ID,那么最快的查询可能不会一次完成所有 6000 个 ID(这将 运行内存)也不是 运行 6000 个单独的查询。相反,您可能想尝试对查询进行分块。例如一次发送 500 个 ID。您将不得不试验块大小以确定一次可以发送的最大 ID 数,同时仍然在您的内存预算范围内。
我按照以下代码在 postgres 数据库上实现并行 select 查询:
https://tech.geoblink.com/2017/07/06/parallelizing-queries-in-postgresql-with-python/
我的基本问题是我有 ~6k 查询需要执行,我正在尝试优化这些 select 查询的执行。最初它是一个包含所有 6k 谓词 ID 的 where id in (...)
查询,但我 运行 遇到问题,查询在它 运行 所在的机器上用完 > 4GB RAM,所以我决定将其拆分为 6k 个单独的查询,这些查询在同步时保持稳定的内存使用。然而, 运行 需要更长的时间,这对我的用例来说不是问题。尽管如此,我还是尽量减少时间。
我的代码是这样的:
class PostgresConnector(object):
def __init__(self, db_url):
self.db_url = db_url
self.engine = self.init_connection()
self.pool = self.init_pool()
def init_pool(self):
CPUS = multiprocessing.cpu_count()
return multiprocessing.Pool(CPUS)
def init_connection(self):
LOGGER.info('Creating Postgres engine')
return create_engine(self.db_url)
def run_parallel_queries(self, queries):
results = []
try:
for i in self.pool.imap_unordered(self.execute_parallel_query, queries):
results.append(i)
except Exception as exception:
LOGGER.error('Error whilst executing %s queries in parallel: %s', len(queries), exception)
raise
finally:
self.pool.close()
self.pool.join()
LOGGER.info('Parallel query ran producing %s sets of results of type: %s', len(results), type(results))
return list(chain.from_iterable(results))
def execute_parallel_query(self, query):
con = psycopg2.connect(self.db_url)
cur = con.cursor()
cur.execute(query)
records = cur.fetchall()
con.close()
return list(records)
然而,每当这个 运行s 时,我都会收到以下错误:
TypeError: can't pickle _thread.RLock objects
我读过很多关于使用多处理和可腌制对象的类似问题,但我终究无法弄清楚我做错了什么。
池通常是每个进程一个(我认为这是最佳实践)但每个连接器实例共享 class 这样它就不会为每次使用 parallel_query 创建一个池方法。
类似问题的最佳答案:
Accessing a MySQL connection pool from Python multiprocessing
除了使用 MySql 而不是 Postgres 之外,显示了与我自己的几乎相同的实现。
我是不是做错了什么?
谢谢!
编辑:
我找到了这个答案:
非常详细,看起来我误解了 multiprocessing.Pool
与 ThreadedConnectionPool
等连接池的区别。然而,在第一个 link 中,它没有提到需要任何连接池等。这个解决方案看起来不错,但对于我认为相当简单的问题,似乎有很多代码?
编辑 2:
所以上面的 link 解决了另一个问题,无论如何我可能 运行 都会遇到这个问题,所以我很高兴我发现了它,但是它并没有解决最初无法解决的问题使用 imap_unordered
直到酸洗错误。非常令人沮丧。
最后,我认为可能值得注意的是,这个 运行s 在 Heroku 中,在 worker dyno 上,使用 Redis rq 进行调度、后台任务等,并使用 Postgres 的托管实例作为数据库。
简单来说,postgres连接和sqlalchemy连接池是线程安全的,但是它们不是fork安全的。
如果要使用多处理,应该在fork之后的每个子进程中初始化引擎。
如果你想共享引擎,你应该使用多线程。
参考Thread and process safety in psycopg2 documentation:
libpq connections shouldn’t be used by a forked processes, so when using a module such as multiprocessing or a forking web deploy method such as FastCGI make sure to create the connections after the fork.
如果您使用 multiprocessing.Pool,则有一个关键字参数初始值设定项,可用于 运行 在每个子进程上编码一次。试试这个:
class PostgresConnector(object):
def __init__(self, db_url):
self.db_url = db_url
self.pool = self.init_pool()
def init_pool(self):
CPUS = multiprocessing.cpu_count()
return multiprocessing.Pool(CPUS, initializer=self.init_connection(self.db_url))
@classmethod
def init_connection(cls, db_url):
def _init_connection():
LOGGER.info('Creating Postgres engine')
cls.engine = create_engine(db_url)
return _init_connection
def run_parallel_queries(self, queries):
results = []
try:
for i in self.pool.imap_unordered(self.execute_parallel_query, queries):
results.append(i)
except Exception as exception:
LOGGER.error('Error whilst executing %s queries in parallel: %s', len(queries), exception)
raise
finally:
pass
#self.pool.close()
#self.pool.join()
LOGGER.info('Parallel query ran producing %s sets of results of type: %s', len(results), type(results))
return list(chain.from_iterable(results))
def execute_parallel_query(self, query):
with self.engine.connect() as conn:
with conn.begin():
result = conn.execute(query)
return result.fetchall()
def __getstate__(self):
# this is a hack, if you want to remove this method, you should
# remove self.pool and just pass pool explicitly
self_dict = self.__dict__.copy()
del self_dict['pool']
return self_dict
现在,解决 XY 问题。
Initially it was a single query with the where id in (...) contained all 6k predicate IDs but I ran into issues with the query using up > 4GB of RAM on the machine it ran on, so I decided to split it out into 6k individual queries which when synchronously keeps a steady memory usage.
您可能想要做的是以下选项之一:
- 编写一个生成所有 6000 个 ID 的子查询,并在您的原始批量查询中使用该子查询。
- 同上,但是将子查询写成 CTE
- 如果您的 ID 列表来自外部来源(即不是来自数据库),那么您可以创建一个包含 6000 个 ID 的临时 table,然后 运行 您的原始批量查询临时 table
但是,如果您坚持 运行通过 python 处理 6000 个 ID,那么最快的查询可能不会一次完成所有 6000 个 ID(这将 运行内存)也不是 运行 6000 个单独的查询。相反,您可能想尝试对查询进行分块。例如一次发送 500 个 ID。您将不得不试验块大小以确定一次可以发送的最大 ID 数,同时仍然在您的内存预算范围内。