在 Python 中实现实时信号处理 - 如何连续捕获音频?
Implement realtime signal processing in Python - how to capture audio continuously?
我计划在 Python 中实现 "DSP-like" 信号处理器。它应该通过 ALSA 捕获小的音频片段,处理它们,然后通过 ALSA 播放它们。
为了开始,我编写了以下(非常简单的)代码。
import alsaaudio
inp = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NORMAL)
inp.setchannels(1)
inp.setrate(96000)
inp.setformat(alsaaudio.PCM_FORMAT_U32_LE)
inp.setperiodsize(1920)
outp = alsaaudio.PCM(alsaaudio.PCM_PLAYBACK, alsaaudio.PCM_NORMAL)
outp.setchannels(1)
outp.setrate(96000)
outp.setformat(alsaaudio.PCM_FORMAT_U32_LE)
outp.setperiodsize(1920)
while True:
l, data = inp.read()
# TODO: Perform some processing.
outp.write(data)
问题是,音频 "stutters" 不是无缝的。我尝试使用 PCM 模式进行试验,将其设置为 PCM_ASYNC 或 PCM_NONBLOCK,但问题仍然存在。我认为问题是样本 "between" 两次后续调用 "inp.read()" 都丢失了。
有没有办法在 Python 中捕获音频 "continuously"(最好不需要太 "specific"/"non-standard" 库)?我希望信号总是被捕获 "in the background" 到某个缓冲区,我可以从中读取一些 "momentary state",而即使在我执行读取的时候,音频也会被进一步捕获到缓冲区中操作。我怎样才能做到这一点?
即使我使用专用的 process/thread 来捕获音频,这个 process/thread 总是至少必须 (1) 从源读取音频,(2) 然后将其放入一些缓冲区("signal processing" process/thread 然后从中读取)。因此,这两个操作在时间上仍然是连续的,因此样本会丢失。我该如何避免这种情况?
非常感谢您的建议!
编辑 2: 现在我有了 运行。
import alsaaudio
from multiprocessing import Process, Queue
import numpy as np
import struct
"""
A class implementing buffered audio I/O.
"""
class Audio:
"""
Initialize the audio buffer.
"""
def __init__(self):
#self.__rate = 96000
self.__rate = 8000
self.__stride = 4
self.__pre_post = 4
self.__read_queue = Queue()
self.__write_queue = Queue()
"""
Reads audio from an ALSA audio device into the read queue.
Supposed to run in its own process.
"""
def __read(self):
inp = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NORMAL)
inp.setchannels(1)
inp.setrate(self.__rate)
inp.setformat(alsaaudio.PCM_FORMAT_U32_BE)
inp.setperiodsize(self.__rate / 50)
while True:
_, data = inp.read()
self.__read_queue.put(data)
"""
Writes audio to an ALSA audio device from the write queue.
Supposed to run in its own process.
"""
def __write(self):
outp = alsaaudio.PCM(alsaaudio.PCM_PLAYBACK, alsaaudio.PCM_NORMAL)
outp.setchannels(1)
outp.setrate(self.__rate)
outp.setformat(alsaaudio.PCM_FORMAT_U32_BE)
outp.setperiodsize(self.__rate / 50)
while True:
data = self.__write_queue.get()
outp.write(data)
"""
Pre-post data into the output buffer to avoid buffer underrun.
"""
def __pre_post_data(self):
zeros = np.zeros(self.__rate / 50, dtype = np.uint32)
for i in range(0, self.__pre_post):
self.__write_queue.put(zeros)
"""
Runs the read and write processes.
"""
def run(self):
self.__pre_post_data()
read_process = Process(target = self.__read)
write_process = Process(target = self.__write)
read_process.start()
write_process.start()
"""
Reads audio samples from the queue captured from the reading thread.
"""
def read(self):
return self.__read_queue.get()
"""
Writes audio samples to the queue to be played by the writing thread.
"""
def write(self, data):
self.__write_queue.put(data)
"""
Pseudonymize the audio samples from a binary string into an array of integers.
"""
def pseudonymize(self, s):
return struct.unpack(">" + ("I" * (len(s) / self.__stride)), s)
"""
Depseudonymize the audio samples from an array of integers into a binary string.
"""
def depseudonymize(self, a):
s = ""
for elem in a:
s += struct.pack(">I", elem)
return s
"""
Normalize the audio samples from an array of integers into an array of floats with unity level.
"""
def normalize(self, data, max_val):
data = np.array(data)
bias = int(0.5 * max_val)
fac = 1.0 / (0.5 * max_val)
data = fac * (data - bias)
return data
"""
Denormalize the data from an array of floats with unity level into an array of integers.
"""
def denormalize(self, data, max_val):
bias = int(0.5 * max_val)
fac = 0.5 * max_val
data = np.array(data)
data = (fac * data).astype(np.int64) + bias
return data
debug = True
audio = Audio()
audio.run()
while True:
data = audio.read()
pdata = audio.pseudonymize(data)
if debug:
print "[PRE-PSEUDONYMIZED] Min: " + str(np.min(pdata)) + ", Max: " + str(np.max(pdata))
ndata = audio.normalize(pdata, 0xffffffff)
if debug:
print "[PRE-NORMALIZED] Min: " + str(np.min(ndata)) + ", Max: " + str(np.max(ndata))
print "[PRE-NORMALIZED] Level: " + str(int(10.0 * np.log10(np.max(np.absolute(ndata)))))
#ndata += 0.01 # When I comment in this line, it wreaks complete havoc!
if debug:
print "[POST-NORMALIZED] Level: " + str(int(10.0 * np.log10(np.max(np.absolute(ndata)))))
print "[POST-NORMALIZED] Min: " + str(np.min(ndata)) + ", Max: " + str(np.max(ndata))
pdata = audio.denormalize(ndata, 0xffffffff)
if debug:
print "[POST-PSEUDONYMIZED] Min: " + str(np.min(pdata)) + ", Max: " + str(np.max(pdata))
print ""
data = audio.depseudonymize(pdata)
audio.write(data)
然而,即使我对音频数据进行了最细微的修改(例如评论那一行),我也会在输出端听到很多噪音和极度失真。好像我没有正确处理 PCM 数据。奇怪的是 "level meter" 等的输出似乎都有意义。然而,当我稍微偏移它时,输出完全失真(但连续)。
编辑 3:我刚刚发现我的算法(此处未包含)在将它们应用于 wave 文件时有效。所以问题似乎真的归结为 ALSA API.
编辑 4:我终于找到了问题所在。他们是以下。
1st - ALSA 在请求 PCM_FORMAT_U32_LE 时悄悄地 "fell back" 到 PCM_FORMAT_U8_LE,因此我错误地解释了数据,假设每个样本都是 4 字节宽。它在我请求 PCM_FORMAT_S32_LE.
时起作用
2nd - ALSA 输出似乎期望周期大小在 字节 中,即使他们明确指出它在 帧 中是预期的规范。因此,如果您使用 32 位采样深度,则必须将输出的周期大小设置为四倍。
3rd - 即使在 Python(有 "global interpreter lock" 的地方),与线程相比,进程也很慢。您可以通过更改为线程来大幅降低延迟,因为 I/O 线程基本上不会执行任何计算密集型操作。
当你
- 读取一大块数据,
- 写入一大块数据,
- 然后等待读取第二块数据,
如果第二个块不比第一个块短,则输出设备的缓冲区将变为空。
在开始实际处理之前,您应该用静默填满输出设备的缓冲区。那么输入或输出处理中的小延迟将无关紧要。
您可以手动完成所有操作,正如@CL 在 his/her 中推荐的那样,但我建议只使用
GNU Radio 改为:
这是一个负责完成所有工作的框架 "getting small chunks of samples in and out your algorithm";它的扩展性很好,您可以用 Python 或 C++ 编写信号处理。
事实上,它带有直接与 ALSA 对话的音频源和音频接收器,并且只是 give/take 连续样本。我建议通读 GNU Radio 的 Guided Tutorials;他们准确地解释了为音频应用程序进行信号处理所必需的内容。
真正最小的流程图如下所示:
您可以将高通滤波器替换为您自己的信号处理块,或使用现有块的任意组合。
有一些有用的东西,例如文件和 wav 文件接收器和源、过滤器、重采样器、放大器(好吧,乘法器)……
终于找到问题了。他们是以下。
1st - ALSA 在请求 PCM_FORMAT_U32_LE 时悄悄地 "fell back" 到 PCM_FORMAT_U8_LE,因此我错误地解释了数据,假设每个样本都是 4 字节宽。它在我请求 PCM_FORMAT_S32_LE.
时起作用
2nd - ALSA 输出似乎期望以字节为单位的周期大小,尽管它们在规范中明确声明以帧为单位。因此,如果您使用 32 位采样深度,则必须将输出的周期大小设置为四倍。
3rd - 即使在 Python(有 "global interpreter lock" 的地方),与线程相比,进程也很慢。您可以通过更改为线程来大幅降低延迟,因为 I/O 线程基本上不会执行任何计算密集型操作。
音频现在没有间隙且没有失真,但延迟太高了。
我计划在 Python 中实现 "DSP-like" 信号处理器。它应该通过 ALSA 捕获小的音频片段,处理它们,然后通过 ALSA 播放它们。
为了开始,我编写了以下(非常简单的)代码。
import alsaaudio
inp = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NORMAL)
inp.setchannels(1)
inp.setrate(96000)
inp.setformat(alsaaudio.PCM_FORMAT_U32_LE)
inp.setperiodsize(1920)
outp = alsaaudio.PCM(alsaaudio.PCM_PLAYBACK, alsaaudio.PCM_NORMAL)
outp.setchannels(1)
outp.setrate(96000)
outp.setformat(alsaaudio.PCM_FORMAT_U32_LE)
outp.setperiodsize(1920)
while True:
l, data = inp.read()
# TODO: Perform some processing.
outp.write(data)
问题是,音频 "stutters" 不是无缝的。我尝试使用 PCM 模式进行试验,将其设置为 PCM_ASYNC 或 PCM_NONBLOCK,但问题仍然存在。我认为问题是样本 "between" 两次后续调用 "inp.read()" 都丢失了。
有没有办法在 Python 中捕获音频 "continuously"(最好不需要太 "specific"/"non-standard" 库)?我希望信号总是被捕获 "in the background" 到某个缓冲区,我可以从中读取一些 "momentary state",而即使在我执行读取的时候,音频也会被进一步捕获到缓冲区中操作。我怎样才能做到这一点?
即使我使用专用的 process/thread 来捕获音频,这个 process/thread 总是至少必须 (1) 从源读取音频,(2) 然后将其放入一些缓冲区("signal processing" process/thread 然后从中读取)。因此,这两个操作在时间上仍然是连续的,因此样本会丢失。我该如何避免这种情况?
非常感谢您的建议!
编辑 2: 现在我有了 运行。
import alsaaudio
from multiprocessing import Process, Queue
import numpy as np
import struct
"""
A class implementing buffered audio I/O.
"""
class Audio:
"""
Initialize the audio buffer.
"""
def __init__(self):
#self.__rate = 96000
self.__rate = 8000
self.__stride = 4
self.__pre_post = 4
self.__read_queue = Queue()
self.__write_queue = Queue()
"""
Reads audio from an ALSA audio device into the read queue.
Supposed to run in its own process.
"""
def __read(self):
inp = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NORMAL)
inp.setchannels(1)
inp.setrate(self.__rate)
inp.setformat(alsaaudio.PCM_FORMAT_U32_BE)
inp.setperiodsize(self.__rate / 50)
while True:
_, data = inp.read()
self.__read_queue.put(data)
"""
Writes audio to an ALSA audio device from the write queue.
Supposed to run in its own process.
"""
def __write(self):
outp = alsaaudio.PCM(alsaaudio.PCM_PLAYBACK, alsaaudio.PCM_NORMAL)
outp.setchannels(1)
outp.setrate(self.__rate)
outp.setformat(alsaaudio.PCM_FORMAT_U32_BE)
outp.setperiodsize(self.__rate / 50)
while True:
data = self.__write_queue.get()
outp.write(data)
"""
Pre-post data into the output buffer to avoid buffer underrun.
"""
def __pre_post_data(self):
zeros = np.zeros(self.__rate / 50, dtype = np.uint32)
for i in range(0, self.__pre_post):
self.__write_queue.put(zeros)
"""
Runs the read and write processes.
"""
def run(self):
self.__pre_post_data()
read_process = Process(target = self.__read)
write_process = Process(target = self.__write)
read_process.start()
write_process.start()
"""
Reads audio samples from the queue captured from the reading thread.
"""
def read(self):
return self.__read_queue.get()
"""
Writes audio samples to the queue to be played by the writing thread.
"""
def write(self, data):
self.__write_queue.put(data)
"""
Pseudonymize the audio samples from a binary string into an array of integers.
"""
def pseudonymize(self, s):
return struct.unpack(">" + ("I" * (len(s) / self.__stride)), s)
"""
Depseudonymize the audio samples from an array of integers into a binary string.
"""
def depseudonymize(self, a):
s = ""
for elem in a:
s += struct.pack(">I", elem)
return s
"""
Normalize the audio samples from an array of integers into an array of floats with unity level.
"""
def normalize(self, data, max_val):
data = np.array(data)
bias = int(0.5 * max_val)
fac = 1.0 / (0.5 * max_val)
data = fac * (data - bias)
return data
"""
Denormalize the data from an array of floats with unity level into an array of integers.
"""
def denormalize(self, data, max_val):
bias = int(0.5 * max_val)
fac = 0.5 * max_val
data = np.array(data)
data = (fac * data).astype(np.int64) + bias
return data
debug = True
audio = Audio()
audio.run()
while True:
data = audio.read()
pdata = audio.pseudonymize(data)
if debug:
print "[PRE-PSEUDONYMIZED] Min: " + str(np.min(pdata)) + ", Max: " + str(np.max(pdata))
ndata = audio.normalize(pdata, 0xffffffff)
if debug:
print "[PRE-NORMALIZED] Min: " + str(np.min(ndata)) + ", Max: " + str(np.max(ndata))
print "[PRE-NORMALIZED] Level: " + str(int(10.0 * np.log10(np.max(np.absolute(ndata)))))
#ndata += 0.01 # When I comment in this line, it wreaks complete havoc!
if debug:
print "[POST-NORMALIZED] Level: " + str(int(10.0 * np.log10(np.max(np.absolute(ndata)))))
print "[POST-NORMALIZED] Min: " + str(np.min(ndata)) + ", Max: " + str(np.max(ndata))
pdata = audio.denormalize(ndata, 0xffffffff)
if debug:
print "[POST-PSEUDONYMIZED] Min: " + str(np.min(pdata)) + ", Max: " + str(np.max(pdata))
print ""
data = audio.depseudonymize(pdata)
audio.write(data)
然而,即使我对音频数据进行了最细微的修改(例如评论那一行),我也会在输出端听到很多噪音和极度失真。好像我没有正确处理 PCM 数据。奇怪的是 "level meter" 等的输出似乎都有意义。然而,当我稍微偏移它时,输出完全失真(但连续)。
编辑 3:我刚刚发现我的算法(此处未包含)在将它们应用于 wave 文件时有效。所以问题似乎真的归结为 ALSA API.
编辑 4:我终于找到了问题所在。他们是以下。
1st - ALSA 在请求 PCM_FORMAT_U32_LE 时悄悄地 "fell back" 到 PCM_FORMAT_U8_LE,因此我错误地解释了数据,假设每个样本都是 4 字节宽。它在我请求 PCM_FORMAT_S32_LE.
时起作用2nd - ALSA 输出似乎期望周期大小在 字节 中,即使他们明确指出它在 帧 中是预期的规范。因此,如果您使用 32 位采样深度,则必须将输出的周期大小设置为四倍。
3rd - 即使在 Python(有 "global interpreter lock" 的地方),与线程相比,进程也很慢。您可以通过更改为线程来大幅降低延迟,因为 I/O 线程基本上不会执行任何计算密集型操作。
当你
- 读取一大块数据,
- 写入一大块数据,
- 然后等待读取第二块数据,
如果第二个块不比第一个块短,则输出设备的缓冲区将变为空。
在开始实际处理之前,您应该用静默填满输出设备的缓冲区。那么输入或输出处理中的小延迟将无关紧要。
您可以手动完成所有操作,正如@CL 在 his/her
这是一个负责完成所有工作的框架 "getting small chunks of samples in and out your algorithm";它的扩展性很好,您可以用 Python 或 C++ 编写信号处理。
事实上,它带有直接与 ALSA 对话的音频源和音频接收器,并且只是 give/take 连续样本。我建议通读 GNU Radio 的 Guided Tutorials;他们准确地解释了为音频应用程序进行信号处理所必需的内容。
真正最小的流程图如下所示:
您可以将高通滤波器替换为您自己的信号处理块,或使用现有块的任意组合。
有一些有用的东西,例如文件和 wav 文件接收器和源、过滤器、重采样器、放大器(好吧,乘法器)……
终于找到问题了。他们是以下。
1st - ALSA 在请求 PCM_FORMAT_U32_LE 时悄悄地 "fell back" 到 PCM_FORMAT_U8_LE,因此我错误地解释了数据,假设每个样本都是 4 字节宽。它在我请求 PCM_FORMAT_S32_LE.
时起作用2nd - ALSA 输出似乎期望以字节为单位的周期大小,尽管它们在规范中明确声明以帧为单位。因此,如果您使用 32 位采样深度,则必须将输出的周期大小设置为四倍。
3rd - 即使在 Python(有 "global interpreter lock" 的地方),与线程相比,进程也很慢。您可以通过更改为线程来大幅降低延迟,因为 I/O 线程基本上不会执行任何计算密集型操作。
音频现在没有间隙且没有失真,但延迟太高了。