multiprocessing.Pool 执行并发子进程以从 mysqldump 传输数据
multiprocessing.Pool to execute concurrent subprocesses to pipe data from mysqldump
我正在使用 Python 将数据从一个 mysql 数据库传输到另一个数据库。这是我已经使用了几个月的代码的简单抽象版本,它运行良好:
def copy_table(mytable):
raw_mysqldump = "mysqldump -h source_host -u source_user --password='secret' --lock-tables=FALSE myschema mytable"
raw_mysql = "mysql -h destination_host -u destination_user --password='secret' myschema"
mysqldump = shlex.split(raw_mysqldump)
mysql = shlex.split(raw_mysql)
ps = subprocess.Popen(mysqldump, stdout=subprocess.PIPE)
subprocess.check_output(mysql, stdin=ps.stdout)
ps.stdout.close()
retcode = ps.wait()
if retcode == 0:
return mytable, 1
else:
return mytable, 0
数据量变大了,目前复制30张表大概需要一个小时。为了加快速度,我想利用多处理。我试图在 Ubuntu 服务器上执行以下代码,这是一个 t2.micro (AWS EC2)。
def copy_tables(tables):
with multiprocessing.Pool(processes=4) as pool:
params = [(arg, table) for table in sorted(tables)]
results = pool.starmap(copy_table, params)
failed_tables = [table for table, success in results if success == 0]
all_tables_processed = False if failed_tables else True
return all_tables_processed
问题是:几乎所有的表都会被复制,但总是有几个子进程遗留下来无法完成——它们只是挂起,我从监控数据库中可以看出没有数据正在被复制转入。感觉好像他们以某种方式与父进程断开连接,或者数据没有正确返回。
这是我的第一个问题,我尽量做到既具体又简洁 - 在此先感谢您的帮助,如果我能提供更多信息,请告诉我。
我认为下面的代码
ps = subprocess.Popen(mysqldump, stdout=subprocess.PIPE)
subprocess.check_output(mysql, stdin=ps.stdout)
ps.stdout.close()
retcode = ps.wait()
应该是
ps = subprocess.Popen(mysqldump, stdout=subprocess.PIPE)
sps = subprocess.Popen(mysql, stdin=ps.stdout)
retcode = ps.wait()
ps.stdout.close()
sps.wait()
在 mysqldump 进程完成之前,您不应该关闭管道。而check_output是阻塞的,它会挂起直到stdin到达末尾。
我正在使用 Python 将数据从一个 mysql 数据库传输到另一个数据库。这是我已经使用了几个月的代码的简单抽象版本,它运行良好:
def copy_table(mytable):
raw_mysqldump = "mysqldump -h source_host -u source_user --password='secret' --lock-tables=FALSE myschema mytable"
raw_mysql = "mysql -h destination_host -u destination_user --password='secret' myschema"
mysqldump = shlex.split(raw_mysqldump)
mysql = shlex.split(raw_mysql)
ps = subprocess.Popen(mysqldump, stdout=subprocess.PIPE)
subprocess.check_output(mysql, stdin=ps.stdout)
ps.stdout.close()
retcode = ps.wait()
if retcode == 0:
return mytable, 1
else:
return mytable, 0
数据量变大了,目前复制30张表大概需要一个小时。为了加快速度,我想利用多处理。我试图在 Ubuntu 服务器上执行以下代码,这是一个 t2.micro (AWS EC2)。
def copy_tables(tables):
with multiprocessing.Pool(processes=4) as pool:
params = [(arg, table) for table in sorted(tables)]
results = pool.starmap(copy_table, params)
failed_tables = [table for table, success in results if success == 0]
all_tables_processed = False if failed_tables else True
return all_tables_processed
问题是:几乎所有的表都会被复制,但总是有几个子进程遗留下来无法完成——它们只是挂起,我从监控数据库中可以看出没有数据正在被复制转入。感觉好像他们以某种方式与父进程断开连接,或者数据没有正确返回。
这是我的第一个问题,我尽量做到既具体又简洁 - 在此先感谢您的帮助,如果我能提供更多信息,请告诉我。
我认为下面的代码
ps = subprocess.Popen(mysqldump, stdout=subprocess.PIPE)
subprocess.check_output(mysql, stdin=ps.stdout)
ps.stdout.close()
retcode = ps.wait()
应该是
ps = subprocess.Popen(mysqldump, stdout=subprocess.PIPE)
sps = subprocess.Popen(mysql, stdin=ps.stdout)
retcode = ps.wait()
ps.stdout.close()
sps.wait()
在 mysqldump 进程完成之前,您不应该关闭管道。而check_output是阻塞的,它会挂起直到stdin到达末尾。