multiprocessing.Pool 执行并发子进程以从 mysqldump 传输数据

multiprocessing.Pool to execute concurrent subprocesses to pipe data from mysqldump

我正在使用 Python 将数据从一个 mysql 数据库传输到另一个数据库。这是我已经使用了几个月的代码的简单抽象版本,它运行良好:

def copy_table(mytable):
    raw_mysqldump = "mysqldump -h source_host -u source_user --password='secret' --lock-tables=FALSE myschema mytable"
    raw_mysql = "mysql -h destination_host -u destination_user --password='secret' myschema"

    mysqldump = shlex.split(raw_mysqldump)
    mysql = shlex.split(raw_mysql)

    ps = subprocess.Popen(mysqldump, stdout=subprocess.PIPE)
    subprocess.check_output(mysql, stdin=ps.stdout)
    ps.stdout.close()
    retcode = ps.wait()
    if retcode == 0:
        return mytable, 1
    else:
        return mytable, 0

数据量变大了,目前复制30张表大概需要一个小时。为了加快速度,我想利用多处理。我试图在 Ubuntu 服务器上执行以下代码,这是一个 t2.micro (AWS EC2)。

def copy_tables(tables):
    with multiprocessing.Pool(processes=4) as pool:
        params = [(arg, table) for table in sorted(tables)]
        results = pool.starmap(copy_table, params)
    failed_tables = [table for table, success in results if success == 0]
    all_tables_processed = False if failed_tables else True
    return all_tables_processed

问题是:几乎所有的表都会被复制,但总是有几个子进程遗留下来无法完成——它们只是挂起,我从监控数据库中可以看出没有数据正在被复制转入。感觉好像他们以某种方式与父进程断开连接,或者数据没有正确返回。

这是我的第一个问题,我尽量做到既具体又简洁 - 在此先感谢您的帮助,如果我能提供更多信息,请告诉我。

我认为下面的代码

ps = subprocess.Popen(mysqldump, stdout=subprocess.PIPE)
subprocess.check_output(mysql, stdin=ps.stdout)
ps.stdout.close()
retcode = ps.wait()

应该是

ps = subprocess.Popen(mysqldump, stdout=subprocess.PIPE)
sps = subprocess.Popen(mysql, stdin=ps.stdout)
retcode = ps.wait()
ps.stdout.close()
sps.wait()

在 mysqldump 进程完成之前,您不应该关闭管道。而check_output是阻塞的,它会挂起直到stdin到达末尾。