通过 SFTP 将一个文件并行复制到多个远程主机
Copying one file to multiple remote hosts in parallel over SFTP
我想使用 Python 将本地文件并行复制到多个远程主机。我正在尝试使用 asyncio
和 Paramiko 来做到这一点,因为我已经在我的程序中将这些库用于其他目的。
我正在使用 BaseEventLoop.run_in_executor()
和默认 ThreadPoolExecutor
,这实际上是旧 threading
库的新接口,以及用于复制的 Paramiko 的 SFTP 功能。
这是一个简单的例子。
import sys
import asyncio
import paramiko
import functools
def copy_file_node(
*,
user: str,
host: str,
identity_file: str,
local_path: str,
remote_path: str):
ssh_client = paramiko.client.SSHClient()
ssh_client.load_system_host_keys()
ssh_client.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
ssh_client.connect(
username=user,
hostname=host,
key_filename=identity_file,
timeout=3)
with ssh_client:
with ssh_client.open_sftp() as sftp:
print("[{h}] Copying file...".format(h=host))
sftp.put(localpath=local_path, remotepath=remote_path)
print("[{h}] Copy complete.".format(h=host))
loop = asyncio.get_event_loop()
tasks = []
# NOTE: You'll have to update the values being passed in to
# `functools.partial(copy_file_node, ...)`
# to get this working on on your machine.
for host in ['10.0.0.1', '10.0.0.2']:
task = loop.run_in_executor(
None,
functools.partial(
copy_file_node,
user='user',
host=host,
identity_file='/path/to/identity_file',
local_path='/path/to/local/file',
remote_path='/path/to/remote/file'))
tasks.append(task)
try:
loop.run_until_complete(asyncio.gather(*tasks))
except Exception as e:
print("At least one node raised an error:", e, file=sys.stderr)
sys.exit(1)
loop.close()
我看到的问题是文件被串行而不是并行地复制到主机。所以如果单台主机复制需要5秒,两台主机需要10秒,以此类推。
我尝试了各种其他方法,包括放弃 SFTP 并通过 exec_command()
将文件通过管道传输到每个远程主机上的 dd
,但副本总是按顺序进行。
我可能误解了这里的一些基本概念。是什么阻止了不同的线程并行复制文件?
根据我的测试,似乎是在远程写入时发生了阻塞,而不是在读取本地文件时发生。但为什么会这样,因为我们正在尝试针对独立的远程主机进行网络 I/O?
我不确定这是处理它的最佳方式,但它对我有用
#start
from multiprocessing import Process
#omitted
tasks = []
for host in hosts:
p = Process(
None,
functools.partial(
copy_file_node,
user=user,
host=host,
identity_file=identity_file,
local_path=local_path,
remote_path=remote_path))
tasks.append(p)
[t.start() for t in tasks]
[t.join() for t in tasks]
根据评论,添加了日期戳并捕获了多处理的输出并得到了这个:
2015-10-24 03:06:08.749683[vagrant1] Copying file...
2015-10-24 03:06:08.751826[basement] Copying file...
2015-10-24 03:06:08.757040[upstairs] Copying file...
2015-10-24 03:06:16.222416[vagrant1] Copy complete.
2015-10-24 03:06:18.094373[upstairs] Copy complete.
2015-10-24 03:06:22.478711[basement] Copy complete.
你对asyncio的使用没有问题。
为了证明这一点,让我们试试您的脚本的简化版本 - 没有 paramiko,只是纯粹的 Python。
import asyncio, functools, sys, time
START_TIME = time.monotonic()
def log(msg):
print('{:>7.3f} {}'.format(time.monotonic() - START_TIME, msg))
def dummy(thread_id):
log('Thread {} started'.format(thread_id))
time.sleep(1)
log('Thread {} finished'.format(thread_id))
loop = asyncio.get_event_loop()
tasks = []
for i in range(0, int(sys.argv[1])):
task = loop.run_in_executor(None, functools.partial(dummy, thread_id=i))
tasks.append(task)
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
有两个线程,这将打印:
$ python3 async.py 2
0.001 Thread 0 started
0.002 Thread 1 started <-- 2 tasks are executed concurrently
1.003 Thread 0 finished
1.003 Thread 1 finished <-- Total time is 1 second
此并发扩展到 5 个线程:
$ python3 async.py 5
0.001 Thread 0 started
...
0.003 Thread 4 started <-- 5 tasks are executed concurrently
1.002 Thread 0 finished
...
1.005 Thread 4 finished <-- Total time is still 1 second
如果再添加一个线程,就会达到线程池限制:
$ python3 async.py 6
0.001 Thread 0 started
0.001 Thread 1 started
0.002 Thread 2 started
0.003 Thread 3 started
0.003 Thread 4 started <-- 5 tasks are executed concurrently
1.002 Thread 0 finished
1.003 Thread 5 started <-- 6th task is executed after 1 second
1.003 Thread 1 finished
1.004 Thread 2 finished
1.004 Thread 3 finished
1.004 Thread 4 finished <-- 5 task are completed after 1 second
2.005 Thread 5 finished <-- 6th task is completed after 2 seconds
一切如预期,总时间每5个项目增加1秒。幻数 5 记录在 ThreadPoolExecutor 文档中:
Changed in version 3.5: If max_workers is None
or not given, it will default to the number of processors on the machine, multiplied by 5
, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.
第三方库如何阻止我的 ThreadPoolExecutor?
库使用了某种全局锁。这意味着该库不支持多线程。尝试使用 ProcessPoolExecutor,但要小心:库可能包含其他反模式,例如使用相同的硬编码临时文件名。
函数执行时间长,不释放GIL。它可能表明 C 扩展代码中存在错误,但持有 GIL 的最常见原因是进行一些 CPU 密集型计算。同样,您可以尝试 ProcessPoolExecutor,因为它不受 GIL 的影响。
None 这些预计会发生在像 paramiko 这样的库中。
第三方库如何阻止我的 ProcessPoolExecutor?
通常不能。您的任务在单独的进程中执行。如果你看到 ProcessPoolExecutor 中的两个任务花费了两倍的时间,怀疑是资源瓶颈(比如消耗了 100% 的网络带宽)。
我想使用 Python 将本地文件并行复制到多个远程主机。我正在尝试使用 asyncio
和 Paramiko 来做到这一点,因为我已经在我的程序中将这些库用于其他目的。
我正在使用 BaseEventLoop.run_in_executor()
和默认 ThreadPoolExecutor
,这实际上是旧 threading
库的新接口,以及用于复制的 Paramiko 的 SFTP 功能。
这是一个简单的例子。
import sys
import asyncio
import paramiko
import functools
def copy_file_node(
*,
user: str,
host: str,
identity_file: str,
local_path: str,
remote_path: str):
ssh_client = paramiko.client.SSHClient()
ssh_client.load_system_host_keys()
ssh_client.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
ssh_client.connect(
username=user,
hostname=host,
key_filename=identity_file,
timeout=3)
with ssh_client:
with ssh_client.open_sftp() as sftp:
print("[{h}] Copying file...".format(h=host))
sftp.put(localpath=local_path, remotepath=remote_path)
print("[{h}] Copy complete.".format(h=host))
loop = asyncio.get_event_loop()
tasks = []
# NOTE: You'll have to update the values being passed in to
# `functools.partial(copy_file_node, ...)`
# to get this working on on your machine.
for host in ['10.0.0.1', '10.0.0.2']:
task = loop.run_in_executor(
None,
functools.partial(
copy_file_node,
user='user',
host=host,
identity_file='/path/to/identity_file',
local_path='/path/to/local/file',
remote_path='/path/to/remote/file'))
tasks.append(task)
try:
loop.run_until_complete(asyncio.gather(*tasks))
except Exception as e:
print("At least one node raised an error:", e, file=sys.stderr)
sys.exit(1)
loop.close()
我看到的问题是文件被串行而不是并行地复制到主机。所以如果单台主机复制需要5秒,两台主机需要10秒,以此类推。
我尝试了各种其他方法,包括放弃 SFTP 并通过 exec_command()
将文件通过管道传输到每个远程主机上的 dd
,但副本总是按顺序进行。
我可能误解了这里的一些基本概念。是什么阻止了不同的线程并行复制文件?
根据我的测试,似乎是在远程写入时发生了阻塞,而不是在读取本地文件时发生。但为什么会这样,因为我们正在尝试针对独立的远程主机进行网络 I/O?
我不确定这是处理它的最佳方式,但它对我有用
#start
from multiprocessing import Process
#omitted
tasks = []
for host in hosts:
p = Process(
None,
functools.partial(
copy_file_node,
user=user,
host=host,
identity_file=identity_file,
local_path=local_path,
remote_path=remote_path))
tasks.append(p)
[t.start() for t in tasks]
[t.join() for t in tasks]
根据评论,添加了日期戳并捕获了多处理的输出并得到了这个:
2015-10-24 03:06:08.749683[vagrant1] Copying file...
2015-10-24 03:06:08.751826[basement] Copying file...
2015-10-24 03:06:08.757040[upstairs] Copying file...
2015-10-24 03:06:16.222416[vagrant1] Copy complete.
2015-10-24 03:06:18.094373[upstairs] Copy complete.
2015-10-24 03:06:22.478711[basement] Copy complete.
你对asyncio的使用没有问题。
为了证明这一点,让我们试试您的脚本的简化版本 - 没有 paramiko,只是纯粹的 Python。
import asyncio, functools, sys, time
START_TIME = time.monotonic()
def log(msg):
print('{:>7.3f} {}'.format(time.monotonic() - START_TIME, msg))
def dummy(thread_id):
log('Thread {} started'.format(thread_id))
time.sleep(1)
log('Thread {} finished'.format(thread_id))
loop = asyncio.get_event_loop()
tasks = []
for i in range(0, int(sys.argv[1])):
task = loop.run_in_executor(None, functools.partial(dummy, thread_id=i))
tasks.append(task)
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
有两个线程,这将打印:
$ python3 async.py 2
0.001 Thread 0 started
0.002 Thread 1 started <-- 2 tasks are executed concurrently
1.003 Thread 0 finished
1.003 Thread 1 finished <-- Total time is 1 second
此并发扩展到 5 个线程:
$ python3 async.py 5
0.001 Thread 0 started
...
0.003 Thread 4 started <-- 5 tasks are executed concurrently
1.002 Thread 0 finished
...
1.005 Thread 4 finished <-- Total time is still 1 second
如果再添加一个线程,就会达到线程池限制:
$ python3 async.py 6
0.001 Thread 0 started
0.001 Thread 1 started
0.002 Thread 2 started
0.003 Thread 3 started
0.003 Thread 4 started <-- 5 tasks are executed concurrently
1.002 Thread 0 finished
1.003 Thread 5 started <-- 6th task is executed after 1 second
1.003 Thread 1 finished
1.004 Thread 2 finished
1.004 Thread 3 finished
1.004 Thread 4 finished <-- 5 task are completed after 1 second
2.005 Thread 5 finished <-- 6th task is completed after 2 seconds
一切如预期,总时间每5个项目增加1秒。幻数 5 记录在 ThreadPoolExecutor 文档中:
Changed in version 3.5: If max_workers is
None
or not given, it will default to the number of processors on the machine, multiplied by5
, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.
第三方库如何阻止我的 ThreadPoolExecutor?
库使用了某种全局锁。这意味着该库不支持多线程。尝试使用 ProcessPoolExecutor,但要小心:库可能包含其他反模式,例如使用相同的硬编码临时文件名。
函数执行时间长,不释放GIL。它可能表明 C 扩展代码中存在错误,但持有 GIL 的最常见原因是进行一些 CPU 密集型计算。同样,您可以尝试 ProcessPoolExecutor,因为它不受 GIL 的影响。
None 这些预计会发生在像 paramiko 这样的库中。
第三方库如何阻止我的 ProcessPoolExecutor?
通常不能。您的任务在单独的进程中执行。如果你看到 ProcessPoolExecutor 中的两个任务花费了两倍的时间,怀疑是资源瓶颈(比如消耗了 100% 的网络带宽)。