Python 运行 具有多处理功能的 ffmpeg
Python running ffmpeg with multiprocessing
我正在尝试在并行任务中使用 multiprocessing
运行 ffmpeg
命令。
我要并行化的 python ffmpeg
调用如下:
def load_audio(args, kwargs):
url = args
start = kwargs["start"]
end = kwargs["end"]
sr = kwargs["sr"]
n_channels = kwargs["n_channels"]
mono = kwargs["mono"]
cmd = ["ffmpeg", "-i", url, "-acodec", "pcm_s16le", "-ac", str(n_channels), "-ar", str(sr), "-ss", _to_ffmpeg_time(start), "-t", _to_ffmpeg_time(end - start), "-sn", "-vn", "-y", "-f", "wav", "pipe:1"]
process = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=10 ** 8
)
buffer = process.stdout
waveform = np.frombuffer(buffer=process.stdout, dtype=np.uint16, offset=8 * 44)
waveform = waveform.astype(dtype)
return waveform
然后我通过偏移读取音频 60 seconds
:
_THREAD_POOL = BoundedThreadPoolExecutor(max_workers=NTHREADS)
tasks = []
cur_start = start
for i in range(NTHREADS):
msg = {'start':cur_start,
'end':min(cur_start+60,end)}
t = execute_callback(load_audio,
'https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3',
msg)
cur_start+=60
tasks.append(t)
其中 execute_callback
会将线程提交到池中:
def execute_callback(fn, args, kwargs):
try:
futures_thread = _THREAD_POOL.submit(fn, args, kwargs)
return futures_thread
except Exception as e:
return None
我终于检索到结果,并连接到一个 numpy
数组(将转到 soundfile
以供读取)
futures_results = get_results_as_completed(tasks, return_when=ALL_COMPLETED)
waveform = []
for i,r in enumerate(futures_results):
if not i:
waveform = r
print(type(r))
else:
waveform = np.append(waveform,r)
其中 get_results_as_completed
是
def get_results_as_completed(futures, return_when=ALL_COMPLETED):
finished = as_completed(futures)
for f in finished:
try:
yield f.result()
except Exception as e:
pass
我使用的是有界池执行器 class here and here。
我正在使用 as_completed
检索已完成状态的期货,这会导致输出不保留在输入顺序中,而是“完成”顺序,这会导致音频输出错误。我的问题是
ffmpeg
期货实际上是并行执行的吗?在我的测试中,下载整个音频如下:
args = 'https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3'
kwargs = {'start':start,
'end':end}
waveform = load_audio(args,kwargs)
是否可以在不使用信号量的情况下保留结果的输入顺序,但只有multiprocessing
函数(map
可能是?)。如果是,怎么做?
我想如果您使用的是 BoundedThreadPoolExecutor
,那么您在技术上是多线程的,那么每个线程都是 运行 一个进程(稍后会详细介绍)。
无论如何,你的函数 execute_callback
,其名称让我有点困惑,实际上提交了一个任务到池和 returns 一个 Future
实例,然后附加到列表tasks
。然后,您将 tasks
传递给 get_results_as_completed
,这会按完成顺序从您的任务中生成 return 值。但这不是你想要的。
所以首先回答你的第二个问题:如果你不希望结果按完成顺序排列,就不要使用函数 as_completed
。而是调用:
def get_results(futures):
for f in futures:
try:
yield f.result()
except Exception as e:
pass
回答您的第一个问题:对于 CPU 密集型 Python 代码,多线程通常不是您想要的,因为存在对全局解释器锁的争用。但是由于每个线程都在启动一个进程并等待进程结束,我认为你应该实现并行化,我认为没有理由因为我之前的回答而不能保持输入顺序。
现在是我的问题和评论:
- 为什么
BoundedThreadPoolExecutor
而不是仅使用标准 ThreadPoolExecutor
?
- 在函数定义中,您的参数签名以
args
和 kwargs
作为参数,例如load_audio
,将这些参数指定为 *args
和 **kwargs
会更“正常”,然后您可以这样调用此函数:load_audio('https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3', start=start, end=end)
.
我正在尝试在并行任务中使用 multiprocessing
运行 ffmpeg
命令。
我要并行化的 python ffmpeg
调用如下:
def load_audio(args, kwargs):
url = args
start = kwargs["start"]
end = kwargs["end"]
sr = kwargs["sr"]
n_channels = kwargs["n_channels"]
mono = kwargs["mono"]
cmd = ["ffmpeg", "-i", url, "-acodec", "pcm_s16le", "-ac", str(n_channels), "-ar", str(sr), "-ss", _to_ffmpeg_time(start), "-t", _to_ffmpeg_time(end - start), "-sn", "-vn", "-y", "-f", "wav", "pipe:1"]
process = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=10 ** 8
)
buffer = process.stdout
waveform = np.frombuffer(buffer=process.stdout, dtype=np.uint16, offset=8 * 44)
waveform = waveform.astype(dtype)
return waveform
然后我通过偏移读取音频 60 seconds
:
_THREAD_POOL = BoundedThreadPoolExecutor(max_workers=NTHREADS)
tasks = []
cur_start = start
for i in range(NTHREADS):
msg = {'start':cur_start,
'end':min(cur_start+60,end)}
t = execute_callback(load_audio,
'https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3',
msg)
cur_start+=60
tasks.append(t)
其中 execute_callback
会将线程提交到池中:
def execute_callback(fn, args, kwargs):
try:
futures_thread = _THREAD_POOL.submit(fn, args, kwargs)
return futures_thread
except Exception as e:
return None
我终于检索到结果,并连接到一个 numpy
数组(将转到 soundfile
以供读取)
futures_results = get_results_as_completed(tasks, return_when=ALL_COMPLETED)
waveform = []
for i,r in enumerate(futures_results):
if not i:
waveform = r
print(type(r))
else:
waveform = np.append(waveform,r)
其中 get_results_as_completed
是
def get_results_as_completed(futures, return_when=ALL_COMPLETED):
finished = as_completed(futures)
for f in finished:
try:
yield f.result()
except Exception as e:
pass
我使用的是有界池执行器 class here and here。
我正在使用 as_completed
检索已完成状态的期货,这会导致输出不保留在输入顺序中,而是“完成”顺序,这会导致音频输出错误。我的问题是
ffmpeg
期货实际上是并行执行的吗?在我的测试中,下载整个音频如下:args = 'https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3' kwargs = {'start':start, 'end':end} waveform = load_audio(args,kwargs)
是否可以在不使用信号量的情况下保留结果的输入顺序,但只有
multiprocessing
函数(map
可能是?)。如果是,怎么做?
我想如果您使用的是 BoundedThreadPoolExecutor
,那么您在技术上是多线程的,那么每个线程都是 运行 一个进程(稍后会详细介绍)。
无论如何,你的函数 execute_callback
,其名称让我有点困惑,实际上提交了一个任务到池和 returns 一个 Future
实例,然后附加到列表tasks
。然后,您将 tasks
传递给 get_results_as_completed
,这会按完成顺序从您的任务中生成 return 值。但这不是你想要的。
所以首先回答你的第二个问题:如果你不希望结果按完成顺序排列,就不要使用函数 as_completed
。而是调用:
def get_results(futures):
for f in futures:
try:
yield f.result()
except Exception as e:
pass
回答您的第一个问题:对于 CPU 密集型 Python 代码,多线程通常不是您想要的,因为存在对全局解释器锁的争用。但是由于每个线程都在启动一个进程并等待进程结束,我认为你应该实现并行化,我认为没有理由因为我之前的回答而不能保持输入顺序。
现在是我的问题和评论:
- 为什么
BoundedThreadPoolExecutor
而不是仅使用标准ThreadPoolExecutor
? - 在函数定义中,您的参数签名以
args
和kwargs
作为参数,例如load_audio
,将这些参数指定为*args
和**kwargs
会更“正常”,然后您可以这样调用此函数:load_audio('https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3', start=start, end=end)
.