Python3 中的多处理和死锁问题
Problem with Multiprocessing and Deadlocking in Python3
我的多处理有问题,恐怕这是一个相当简单的修复,我只是没有正确地正确实施多处理。我一直在研究可能导致问题的原因,但我真正发现的是人们建议使用队列来防止这种情况发生,但这似乎并没有阻止它(同样,我可能只是在实施队列不正确)我已经在这几天了,我希望我能得到一些帮助。
提前致谢!
import csv
import multiprocessing as mp
import os
import queue
import sys
import time
import connections
import packages
import profiles
def execute_extract(package, profiles, q):
# This is the package execution for the extract
# It fires fine and will print the starting message below
started_at = time.monotonic()
print(f"Starting {package.packageName}")
try:
oracle_connection = connections.getOracleConnection(profiles['oracle'], 1)
engine = connections.getSQLConnection(profiles['system'], 1)
path = os.path.join(os.getcwd(), 'csv_data', package.packageName + '.csv')
cursor = oracle_connection.cursor()
if os.path.exists(path):
os.remove(path)
f = open(path, 'w')
chunksize = 100000
offset = 0
row_total = 0
csv_writer = csv.writer(f, delimiter='^', lineterminator='\n')
# I am having to do some data cleansing. I know this is not the most efficient way to do this, but currently
# it is what I am limited too
while True:
cursor.execute(package.query + f'\r\n OFFSET {offset} ROWS\r\n FETCH NEXT {chunksize} ROWS ONLY')
test = cursor.fetchone()
if test is None:
break
else:
while True:
row = cursor.fetchone()
if row is None:
break
else:
new_row = list(row)
new_row.append(package.sourceId[0])
new_row.append('')
i = 0
for item in new_row:
if type(item) == float:
new_row[i] = int(item)
elif type(item) == str:
new_row[i] = item.encode('ascii', 'replace')
i += 1
row = tuple(new_row)
csv_writer.writerow(row)
row_total += 1
offset += chunksize
f.close()
# I know that execution is at least reaching this point. I can watch the CSV files grow as more and more
# rows are added to the for all the packages What I never get are either the success message or error message
# below, and there are never any entries placed in the tables
query = f"BULK INSERT {profiles['system'].database.split('_')[0]}_{profiles['system'].database.split('_')[1]}_test_{profiles['system'].database.split('_')[2]}.{package.destTable} FROM \"{path}\" WITH (FIELDTERMINATOR='^', ROWTERMINATOR='\n');"
engine.cursor().execute(query)
engine.commit()
end_time = time.monotonic() - started_at
print(
f"{package.packageName} has completed. Total rows inserted: {row_total}. Total execution time: {end_time} seconds\n")
os.remove(path)
except Exception as e:
print(f'An error has occured for package {package.packageName}.\r\n {repr(e)}')
finally:
# Here is where I am trying to add an item to the queue so the get method in the main def will pick it up and
# remove it from the queue
q.put(f'{package.packageName} has completed')
if oracle_connection:
oracle_connection.close()
if engine:
engine.cursor().close()
engine.close()
if __name__ == '__main__':
# Setting mp creation type
ctx = mp.get_context('spawn')
q = ctx.Queue()
# For the Etl I generate a list of class objects that hold relevant information profs contains a list of
# connection objects (credentials, connection strings, etc) packages contains the information to run the extract
# (destination tables, query string, package name for logging, etc)
profs = profiles.get_conn_vars(sys.argv[1])
packages = packages.get_etl_packages(profs)
processes = []
# I'm trying to track both individual package execution time and overall time so I can get an estimate on rows
# per second
start_time = time.monotonic()
sqlConn = connections.getSQLConnection(profs['system'])
# Here I'm executing a SQL command to truncate all my staging tables to ensure they are empty and will not
# generate any key violations
sqlConn.execute(
f"USE[{profs['system'].database.split('_')[0]}_{profs['system'].database.split('_')[1]}_test_{profs['system'].database.split('_')[2]}]\r\nExec Sp_msforeachtable @command1='Truncate Table ?',@whereand='and Schema_Id=Schema_id(''my_schema'')'")
# Here is where I start generating a process per package to try and get all packages to run simultaneously
for package in packages:
p = ctx.Process(target=execute_extract, args=(package, profs, q,))
processes.append(p)
p.start()
# Here is my attempt at managing the queue. This is a monstrosity of fixes I've tried to get this to work
results = []
while True:
try:
result = q.get(False, 0.01)
results.append(result)
except queue.Empty:
pass
allExited = True
for t in processes:
if t.exitcode is None:
allExited = False
break
if allExited & q.empty():
break
for p in processes:
p.join()
# Closing out the end time and writing the overall execution time in minutes.
end_time = time.monotonic() - start_time
print(f'Total execution time of {end_time / 60} minutes.')
我无法确定您遇到死锁的原因(我完全不相信它与您的队列管理有关),但我可以肯定地说,如果您这样做,您可以简化队列管理逻辑两件事之一:
方法一
确保您的工作函数 execute_extract
即使在出现异常的情况下也会将某些内容放入结果队列(我建议放置 Exception
对象本身)。然后,您可以将以 while True:
开头的试图获取结果的整个主进程循环替换为:
results = [q.get() for _ in range(len(processes))]
您保证队列中的消息数量固定等于创建的进程数。
方法二(更简单)
只需颠倒等待子流程完成和处理结果队列的顺序即可。您不知道队列中将有多少消息,但在所有进程返回之前您不会处理队列。因此,无论队列中有多少消息,您都将得到。只需检索它们,直到队列为空:
for p in processes:
p.join()
results = []
while not q.empty():
results.append(q.get())
在这一点上,我通常会建议您使用多处理池 class,例如 multiprocessing.Pool
,它不需要显式队列来检索结果。但是进行这些更改中的任何一个(我建议使用方法 2,因为我看不出它是如何导致死锁的,因为此时只有主进程是 运行)并查看您的问题是否消失。但是,我不是,保证您的问题不在代码的其他地方。虽然您的代码过于复杂且效率低下,但它显然不是“错误的”。至少你会知道你的问题是否出在其他地方。
我想问你的问题是:使用通过 ctx = mp.get_context('spawn')
获取的上下文而不是仅仅调用 multiprocessing
模块本身的方法来做所有事情对你有什么好处?如果您的平台支持 fork
调用,这将是默认上下文,您不想使用它吗?
我的多处理有问题,恐怕这是一个相当简单的修复,我只是没有正确地正确实施多处理。我一直在研究可能导致问题的原因,但我真正发现的是人们建议使用队列来防止这种情况发生,但这似乎并没有阻止它(同样,我可能只是在实施队列不正确)我已经在这几天了,我希望我能得到一些帮助。 提前致谢!
import csv
import multiprocessing as mp
import os
import queue
import sys
import time
import connections
import packages
import profiles
def execute_extract(package, profiles, q):
# This is the package execution for the extract
# It fires fine and will print the starting message below
started_at = time.monotonic()
print(f"Starting {package.packageName}")
try:
oracle_connection = connections.getOracleConnection(profiles['oracle'], 1)
engine = connections.getSQLConnection(profiles['system'], 1)
path = os.path.join(os.getcwd(), 'csv_data', package.packageName + '.csv')
cursor = oracle_connection.cursor()
if os.path.exists(path):
os.remove(path)
f = open(path, 'w')
chunksize = 100000
offset = 0
row_total = 0
csv_writer = csv.writer(f, delimiter='^', lineterminator='\n')
# I am having to do some data cleansing. I know this is not the most efficient way to do this, but currently
# it is what I am limited too
while True:
cursor.execute(package.query + f'\r\n OFFSET {offset} ROWS\r\n FETCH NEXT {chunksize} ROWS ONLY')
test = cursor.fetchone()
if test is None:
break
else:
while True:
row = cursor.fetchone()
if row is None:
break
else:
new_row = list(row)
new_row.append(package.sourceId[0])
new_row.append('')
i = 0
for item in new_row:
if type(item) == float:
new_row[i] = int(item)
elif type(item) == str:
new_row[i] = item.encode('ascii', 'replace')
i += 1
row = tuple(new_row)
csv_writer.writerow(row)
row_total += 1
offset += chunksize
f.close()
# I know that execution is at least reaching this point. I can watch the CSV files grow as more and more
# rows are added to the for all the packages What I never get are either the success message or error message
# below, and there are never any entries placed in the tables
query = f"BULK INSERT {profiles['system'].database.split('_')[0]}_{profiles['system'].database.split('_')[1]}_test_{profiles['system'].database.split('_')[2]}.{package.destTable} FROM \"{path}\" WITH (FIELDTERMINATOR='^', ROWTERMINATOR='\n');"
engine.cursor().execute(query)
engine.commit()
end_time = time.monotonic() - started_at
print(
f"{package.packageName} has completed. Total rows inserted: {row_total}. Total execution time: {end_time} seconds\n")
os.remove(path)
except Exception as e:
print(f'An error has occured for package {package.packageName}.\r\n {repr(e)}')
finally:
# Here is where I am trying to add an item to the queue so the get method in the main def will pick it up and
# remove it from the queue
q.put(f'{package.packageName} has completed')
if oracle_connection:
oracle_connection.close()
if engine:
engine.cursor().close()
engine.close()
if __name__ == '__main__':
# Setting mp creation type
ctx = mp.get_context('spawn')
q = ctx.Queue()
# For the Etl I generate a list of class objects that hold relevant information profs contains a list of
# connection objects (credentials, connection strings, etc) packages contains the information to run the extract
# (destination tables, query string, package name for logging, etc)
profs = profiles.get_conn_vars(sys.argv[1])
packages = packages.get_etl_packages(profs)
processes = []
# I'm trying to track both individual package execution time and overall time so I can get an estimate on rows
# per second
start_time = time.monotonic()
sqlConn = connections.getSQLConnection(profs['system'])
# Here I'm executing a SQL command to truncate all my staging tables to ensure they are empty and will not
# generate any key violations
sqlConn.execute(
f"USE[{profs['system'].database.split('_')[0]}_{profs['system'].database.split('_')[1]}_test_{profs['system'].database.split('_')[2]}]\r\nExec Sp_msforeachtable @command1='Truncate Table ?',@whereand='and Schema_Id=Schema_id(''my_schema'')'")
# Here is where I start generating a process per package to try and get all packages to run simultaneously
for package in packages:
p = ctx.Process(target=execute_extract, args=(package, profs, q,))
processes.append(p)
p.start()
# Here is my attempt at managing the queue. This is a monstrosity of fixes I've tried to get this to work
results = []
while True:
try:
result = q.get(False, 0.01)
results.append(result)
except queue.Empty:
pass
allExited = True
for t in processes:
if t.exitcode is None:
allExited = False
break
if allExited & q.empty():
break
for p in processes:
p.join()
# Closing out the end time and writing the overall execution time in minutes.
end_time = time.monotonic() - start_time
print(f'Total execution time of {end_time / 60} minutes.')
我无法确定您遇到死锁的原因(我完全不相信它与您的队列管理有关),但我可以肯定地说,如果您这样做,您可以简化队列管理逻辑两件事之一:
方法一
确保您的工作函数 execute_extract
即使在出现异常的情况下也会将某些内容放入结果队列(我建议放置 Exception
对象本身)。然后,您可以将以 while True:
开头的试图获取结果的整个主进程循环替换为:
results = [q.get() for _ in range(len(processes))]
您保证队列中的消息数量固定等于创建的进程数。
方法二(更简单)
只需颠倒等待子流程完成和处理结果队列的顺序即可。您不知道队列中将有多少消息,但在所有进程返回之前您不会处理队列。因此,无论队列中有多少消息,您都将得到。只需检索它们,直到队列为空:
for p in processes:
p.join()
results = []
while not q.empty():
results.append(q.get())
在这一点上,我通常会建议您使用多处理池 class,例如 multiprocessing.Pool
,它不需要显式队列来检索结果。但是进行这些更改中的任何一个(我建议使用方法 2,因为我看不出它是如何导致死锁的,因为此时只有主进程是 运行)并查看您的问题是否消失。但是,我不是,保证您的问题不在代码的其他地方。虽然您的代码过于复杂且效率低下,但它显然不是“错误的”。至少你会知道你的问题是否出在其他地方。
我想问你的问题是:使用通过 ctx = mp.get_context('spawn')
获取的上下文而不是仅仅调用 multiprocessing
模块本身的方法来做所有事情对你有什么好处?如果您的平台支持 fork
调用,这将是默认上下文,您不想使用它吗?