Python 运行 具有多处理功能的 ffmpeg

Python running ffmpeg with multiprocessing

我正在尝试在并行任务中使用 multiprocessing 运行 ffmpeg 命令。 我要并行化的 python ffmpeg 调用如下:

def load_audio(args, kwargs):
    url = args

    start = kwargs["start"]
    end = kwargs["end"]
    sr = kwargs["sr"]
    n_channels = kwargs["n_channels"]
    mono = kwargs["mono"]

    cmd = ["ffmpeg",  "-i", url, "-acodec", "pcm_s16le", "-ac", str(n_channels), "-ar", str(sr), "-ss", _to_ffmpeg_time(start), "-t", _to_ffmpeg_time(end - start), "-sn", "-vn", "-y", "-f", "wav", "pipe:1"]

    process = subprocess.run(
        cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=10 ** 8
    )
    buffer = process.stdout
    waveform = np.frombuffer(buffer=process.stdout, dtype=np.uint16, offset=8 * 44)
    waveform = waveform.astype(dtype)
    return waveform

然后我通过偏移读取音频 60 seconds:

_THREAD_POOL = BoundedThreadPoolExecutor(max_workers=NTHREADS)
tasks = []
cur_start = start
for i in range(NTHREADS):
    msg = {'start':cur_start,
           'end':min(cur_start+60,end)}
    t = execute_callback(load_audio, 
                         'https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3', 
                         msg)
    cur_start+=60
    tasks.append(t)

其中 execute_callback 会将线程提交到池中:

def execute_callback(fn, args, kwargs):
    try:
        futures_thread = _THREAD_POOL.submit(fn, args, kwargs)
        return futures_thread
    except Exception as e:
        return None

我终于检索到结果,并连接到一个 numpy 数组(将转到 soundfile 以供读取)

futures_results = get_results_as_completed(tasks, return_when=ALL_COMPLETED)
waveform = []
for i,r in enumerate(futures_results):
    if not i:
        waveform = r
        print(type(r))
    else:
        waveform = np.append(waveform,r)

其中 get_results_as_completed

def get_results_as_completed(futures, return_when=ALL_COMPLETED):
        finished = as_completed(futures)
        for f in finished:
            try:
                yield f.result()
            except Exception as e:
                pass

我使用的是有界池执行器 class here and here。 我正在使用 as_completed 检索已完成状态的期货,这会导致输出不保留在输入顺序中,而是“完成”顺序,这会导致音频输出错误。我的问题是

我想如果您使用的是 BoundedThreadPoolExecutor,那么您在技术上是多线程的,那么每个线程都是 运行 一个进程(稍后会详细介绍)。

无论如何,你的函数 execute_callback,其名称让我有点困惑,实际上提交了一个任务到池和 returns 一个 Future 实例,然后附加到列表tasks。然后,您将 tasks 传递给 get_results_as_completed,这会按完成顺序从您的任务中生成 return 值。但这不是你想要的。

所以首先回答你的第二个问题:如果你不希望结果按完成顺序排列,就不要使用函数 as_completed。而是调用:

def get_results(futures):
    for f in futures:
        try:
            yield f.result()
        except Exception as e:
            pass

回答您的第一个问题:对于 CPU 密集型 Python 代码,多线程通常不是您想要的,因为存在对全局解释器锁的争用。但是由于每个线程都在启动一个进程并等待进程结束,我认为你应该实现并行化,我认为没有理由因为我之前的回答而不能保持输入顺序。

现在是我的问题和评论:

  1. 为什么 BoundedThreadPoolExecutor 而不是仅使用标准 ThreadPoolExecutor
  2. 在函数定义中,您的参数签名以 argskwargs 作为参数,例如load_audio,将这些参数指定为 *args**kwargs 会更“正常”,然后您可以这样调用此函数:load_audio('https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3', start=start, end=end).