如何减少 'readline()' 从串行数据中花费的时间
How to reduce time spent by 'readline()' from serial data
我正在尝试创建一个函数来从传感器获取陀螺仪组件 X、Y、Z。
函数如下:
def bimu_get_gyroscope_raw():
#ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=15)
ser = serial.Serial('/dev/tty.usbserial-00002014', 115200, timeout=15)
ser_io = io.TextIOWrapper(io.BufferedRWPair(ser, ser, 1),
newline = '\r',
line_buffering = True)
try:
ser.isOpen()
print('serial is open')
except:
print('error_1')
exit()
#--------------------
i = 0
gyro_dict = dict()
if (ser.isOpen()):
ser.flushInput()
# write the function to get
while (i==0):
try:
print('serial is open_1')
line = ser_io.readline()
print('serial is open_2')
print(line)
except serial.SerialException as err:
print("Error ocurred while reading data: {}".format(err))
if not line.endswith('\r'):
print("Attempt to read from serial port timed out ... Exiting.")
break # terminate the loop and let the program exit
if line.startswith('S,'):
i += 1
line = line.split(',')
print(line)
if len(line)==12:
gyro_dict = {'x':float(line[1]), 'y': float(line[2]), 'z': float(line[3]) }
else:
print('Cannot open serial port')
return gyro_dict
我得到以下输出:
raw = bimu_get_gyroscope_raw()
print(raw)
serial is open
serial is open_1
-43,-122,-2833,83,65
serial is open_2
serial is open_1
S,2,0,0,-20,19,1014,-146,184,-158,168,99
serial is open_2
['S', '2', '0', '0', '-20', '19', '1014', '-146', '184', '-158', '168', '99\r']
{'z': 0.0, 'y': 0.0, 'x': 2.0}
我遇到的问题是,在我第一次调用线路 line = ser_io.readline()
之间,用手计时表在屏幕上书写需要大约 2.25 秒 serial is open_2
。
如果该函数需要再次调用 ser_io.readline()
则没有延迟并且行
serial is open_1
和serial is open_2
几乎同时出现
我认为第一次调用 readline()
在内部对端口或数据缓冲区做了一些已经完成的事情使得对 readline()
到 运行 的连续调用很多更快。
有什么方法可以解决这个问题并使函数 运行 始终快速。
编辑
我用 time
python 模块测试了很多次并修改了 readline 部分,像这样:
while (i<=5):
try:
print('before readline')
start_time = time.time()
line = ser_io.readline()
#print(line)
print("--- %s seconds ---" % (time.time() - start_time))
#print(line)
print('after readline')
except serial.SerialException as err:
print("Error ocurred while reading data: {}".format(err))
if not line.endswith('\r'):
print("Attempt to read from serial port timed out ... Exiting.")
break # terminate the loop and let the program exit
if line.startswith('S,'):
i += 1
line = line.split(',')
print(line)
if len(line)==12:
gyro_dict = {'x':float(line[1]), 'y': float(line[2]), 'z': float(line[3]) }
结果如下:
serial is open
before readline
--- 2.1859400272369385 seconds ---
after readline
before readline
--- 5.9604644775390625e-06 seconds ---
after readline
['S', '0', '0', '0', '380', '0', '-902', '-497', '-228', '200', '63', '103\r']
before readline
--- 2.86102294921875e-06 seconds ---
after readline
before readline
--- 3.814697265625e-06 seconds ---
after readline
['S', '-1', '0', '1', '375', '-8', '-918', '-497', '-223', '194', '64', '108\r']
before readline
--- 3.0994415283203125e-06 seconds ---
after readline
['S', '1', '0', '2', '380', '-10', '-909', '-500', '-223', '200', '65', '113\r']
before readline
--- 2.1457672119140625e-06 seconds ---
after readline
before readline
--- 1.9073486328125e-06 seconds ---
after readline
['S', '0', '0', '0', '379', '-1', '-914', '-500', '-220', '197', '66', '69\r']
before readline
--- 2.1457672119140625e-06 seconds ---
after readline
['S', '0', '0', '-1', '374', '-5', '-902', '-500', '-225', '1\r']
before readline
--- 3.0994415283203125e-06 seconds ---
after readline
['S', '1', '1', '1', '376', '-2', '-915', '-500', '-223', '192', '37', '75\r']
函数第一次迭代需要两秒多,其余迭代非常快。
我认为您使用 c 中的实现可能会有所帮助。 python 代码在内部调用 c 函数来执行其任务。如果你直接使用底层函数,它会节省时间。
我还没有尝试过。但这可能会有所帮助。
https://github.com/EveryTimeIWill18/Cython_Repo/blob/master/FastFileProcessingWithCython.ipynb
我有几个建议给你。我编写 Windows 使用串口的应用程序并且我使用不同的方法 - 我假设所有 OS 的原则都是相同的。我首先在程序开始时创建并打开端口,然后保持打开状态。在你的程序存在之前关闭端口是一个很好的做法,但这并不是真正必要的,因为 OS 会在之后清理。
但是您的代码将在您每次调用函数时创建和初始化端口。完成后您没有明确关闭它;也许你可以摆脱它,因为端口对象被垃圾收集了。在您尝试再次打开它之前,您信任串行库在 OS 级别正确关闭端口。在任何情况下,如果创建端口对象有开销,为什么不招致一次并完成呢?
您根本不需要创建 TextIOWrapper,更不用说双向的了。您想知道这是否是导致性能问题的原因,那么为什么不摆脱它呢?串行端口库具有您需要的所有功能:查看 read_until
函数。
我认为您应该从类似这样的框架开始。我无法 运行 测试这个程序,所以它只是一个示意图。我已经删除了所有的错误处理代码。一个小问题是串行端口以字节为单位运行,您必须将其转换为字符串。
ser = serial.Serial('/dev/tty.usbserial-00002014', 115200, timeout=15)
def bimu_get_gyroscope_raw():
while True:
ser.flushInput()
b = ser.read_until('\r')
s = str(b, encoding='latin1') # convert to str
if a.startswith('S,'):
line = s.split(',')
if len(line)==12:
return dict(x = float(line[1]),
y = float(line[2]),
z = float(line[3]))
我已将 ser
设为全局变量,但您也可以将其作为参数传递给函数。
记住串行端口在现代 OS 上的工作方式。您永远不会直接从硬件读取字符 - OS 正在为您执行此操作,并将字符放入输入缓冲区。当您从端口 "read" 时,您实际上是在从缓冲区中检索任何字符,或者等待它们的到来。您观察到的情况 - 长时间延迟之后是快速连续的数据行 - 可以通过陀螺仪硬件在几秒钟内不做任何事情来解释,然后产生一连串超过一行的数据。我不知道你的陀螺仪是怎么工作的所以我不能说真的是这样。
PySerial 实现实际上是一组操作系统调用的包装器。 Python 开销非常小,其中大部分是错误处理代码。我相信您可以使用 Python 每秒接收数千个字符 - 我一直这样做。在现代 PC 上,三秒接近永恒。必须有另一种解释。暂时不要认为 Python 是您的瓶颈。
通过查看屏幕并单击秒表来为事件计时很笨拙。查看Python时间包。您可以简单地在每个打印语句中打印 time.time() 的值,然后收起您的计时器。
您可以独立测试实施的数据收集部分。只需剥离解析数据的逻辑,并永远留在 while 循环中。打印数据以及每个接收行的时间戳。如果您有另一台与串行端口通信的仪器,您可以将仪器的性能与软件的性能隔离开来。
最后,什么事件导致陀螺仪进行数据传输?它是那些只是定期广播其数据的仪器之一,还是您必须向它发送一些命令来请求数据?如果前者和每三秒播放一次,谜团就解开了;同样,如果是后者并且响应延迟为三秒。我可以想象可能会出现这样的情况,因为仪器必须读取一些传感器并将结果转换为字符串。您没有向我们展示整个程序或告诉我们这些仪器是如何工作的,所以这只是猜测。
注意一些你应该注意的事情。
首先,当您启动您的应用程序时,您应该将所有资源实例化并留在那里以供使用。
串口有输入缓冲区,可以异步方式访问数据,不需要一直监听客户端,但是如果关闭端口,打开之前收到的所有数据都会被丢弃并没有添加到缓冲区。这就是为什么您必须始终打开端口。
我在应用程序中通常做的是将串口抽象出来,创建一个对象来处理设备。
该对象将打开端口并有一个 Rx 线程(不需要 Tx 线程,因为 python 由于 GIL 以同步方式运行)并继续监视新数据。
要发送数据,您只需调用“__send_command”即可。
获取接收到的数据,从对象的Rx队列中取出。
这是我经常用来与串口设备通信的代码(我从事工业自动化工作,其中串口是一个通用接口)。它一直在满足我的需求。我做了一些更改以从我的设备(工业规模)中删除设备特定的辅助功能,但留下一个作为 .只需添加您的!
不要忘记检查你的模块发送数据的频率,通常这个时间基础可以在模块的特定寄存器上配置。通常从几毫秒到几秒的间隔。
import serial # pip install pyserial
import io
import time
import threading
class SerialDevice():
def __init__(self, baudrate=9600, port='COM1', timeout=0.1):
self.ser = serial.Serial()
self.ser.baudrate = baudrate
self.ser.port = port
self.ser.timeout = timeout
self.sio = io.TextIOWrapper(io.BufferedRWPair(self.ser, self.ser), newline='\r\n')
self.received_data = []
def connect(self):
try:
self.ser.open()
time.sleep(0.1)
self.clear_data()
self.__control_thread = threading.Thread(target=self.__receive_handler, args=())
self.__control_thread.start()
except Exception as e:
print(e)
return False
return self.ser.is_open
def disconnect(self):
try:
if(self.ser.is_open):
self.ser.close()
return True
else:
return False
except Exception as e:
print(e)
return False
def connected(self):
return self.ser.is_open
def data_available(self):
return len(self.received_data)
def get_data(self):
'''Pop the first item from the received data list'''
if(len(self.received_data)):
return self.received_data.pop(0)
return None
def peek_data(self):
'''Pop the first item from the received data list'''
if(len(self.received_data)):
return self.received_data[0]
return None
def clear_data(self):
'''Clear the received data list'''
self.received_data.clear()
def __receive_handler(self):
while(not self.ser.is_open): # Waits for the port to open
time.sleep(0.1)
# Clear serial input buffer
self.ser.read(self.ser.in_waiting)
while(self.ser.is_open):
try:
if(self.ser.in_waiting):
data = self.ser.readline()
self.received_data.append(self.__unpack_data(data))
print('received! {}'.format(data))
except Exception as e:
print(e)
time.sleep(0.001)
def __unpack_data(self, data=''):
'''Unpacks the received data to the measurement format
Receives the binary array and returns a Measurement object'''
# Decode the received data here and return it processed as an object to the received_data queue
# in this case I'll just return the same daata
return data
def __send_command(self, command):
# send the command using serial port
# Return 1 if success, 0 if error
try:
if(self.ser.is_open):
self.ser.write((command + '\r\n').encode('ascii'))
return 1
except Exception as e:
print(e)
return 0
# this is a helper function to send commands to your device
def send_global_reset(self):
'''Global reset to reset all menu settings to the original factory defaults'''
return self.__send_command('Esc R')
我正在尝试创建一个函数来从传感器获取陀螺仪组件 X、Y、Z。
函数如下:
def bimu_get_gyroscope_raw():
#ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=15)
ser = serial.Serial('/dev/tty.usbserial-00002014', 115200, timeout=15)
ser_io = io.TextIOWrapper(io.BufferedRWPair(ser, ser, 1),
newline = '\r',
line_buffering = True)
try:
ser.isOpen()
print('serial is open')
except:
print('error_1')
exit()
#--------------------
i = 0
gyro_dict = dict()
if (ser.isOpen()):
ser.flushInput()
# write the function to get
while (i==0):
try:
print('serial is open_1')
line = ser_io.readline()
print('serial is open_2')
print(line)
except serial.SerialException as err:
print("Error ocurred while reading data: {}".format(err))
if not line.endswith('\r'):
print("Attempt to read from serial port timed out ... Exiting.")
break # terminate the loop and let the program exit
if line.startswith('S,'):
i += 1
line = line.split(',')
print(line)
if len(line)==12:
gyro_dict = {'x':float(line[1]), 'y': float(line[2]), 'z': float(line[3]) }
else:
print('Cannot open serial port')
return gyro_dict
我得到以下输出:
raw = bimu_get_gyroscope_raw()
print(raw)
serial is open
serial is open_1
-43,-122,-2833,83,65
serial is open_2
serial is open_1
S,2,0,0,-20,19,1014,-146,184,-158,168,99
serial is open_2
['S', '2', '0', '0', '-20', '19', '1014', '-146', '184', '-158', '168', '99\r']
{'z': 0.0, 'y': 0.0, 'x': 2.0}
我遇到的问题是,在我第一次调用线路 line = ser_io.readline()
之间,用手计时表在屏幕上书写需要大约 2.25 秒 serial is open_2
。
如果该函数需要再次调用 ser_io.readline()
则没有延迟并且行
serial is open_1
和serial is open_2
几乎同时出现
我认为第一次调用 readline()
在内部对端口或数据缓冲区做了一些已经完成的事情使得对 readline()
到 运行 的连续调用很多更快。
有什么方法可以解决这个问题并使函数 运行 始终快速。
编辑
我用 time
python 模块测试了很多次并修改了 readline 部分,像这样:
while (i<=5):
try:
print('before readline')
start_time = time.time()
line = ser_io.readline()
#print(line)
print("--- %s seconds ---" % (time.time() - start_time))
#print(line)
print('after readline')
except serial.SerialException as err:
print("Error ocurred while reading data: {}".format(err))
if not line.endswith('\r'):
print("Attempt to read from serial port timed out ... Exiting.")
break # terminate the loop and let the program exit
if line.startswith('S,'):
i += 1
line = line.split(',')
print(line)
if len(line)==12:
gyro_dict = {'x':float(line[1]), 'y': float(line[2]), 'z': float(line[3]) }
结果如下:
serial is open
before readline
--- 2.1859400272369385 seconds ---
after readline
before readline
--- 5.9604644775390625e-06 seconds ---
after readline
['S', '0', '0', '0', '380', '0', '-902', '-497', '-228', '200', '63', '103\r']
before readline
--- 2.86102294921875e-06 seconds ---
after readline
before readline
--- 3.814697265625e-06 seconds ---
after readline
['S', '-1', '0', '1', '375', '-8', '-918', '-497', '-223', '194', '64', '108\r']
before readline
--- 3.0994415283203125e-06 seconds ---
after readline
['S', '1', '0', '2', '380', '-10', '-909', '-500', '-223', '200', '65', '113\r']
before readline
--- 2.1457672119140625e-06 seconds ---
after readline
before readline
--- 1.9073486328125e-06 seconds ---
after readline
['S', '0', '0', '0', '379', '-1', '-914', '-500', '-220', '197', '66', '69\r']
before readline
--- 2.1457672119140625e-06 seconds ---
after readline
['S', '0', '0', '-1', '374', '-5', '-902', '-500', '-225', '1\r']
before readline
--- 3.0994415283203125e-06 seconds ---
after readline
['S', '1', '1', '1', '376', '-2', '-915', '-500', '-223', '192', '37', '75\r']
函数第一次迭代需要两秒多,其余迭代非常快。
我认为您使用 c 中的实现可能会有所帮助。 python 代码在内部调用 c 函数来执行其任务。如果你直接使用底层函数,它会节省时间。 我还没有尝试过。但这可能会有所帮助。
https://github.com/EveryTimeIWill18/Cython_Repo/blob/master/FastFileProcessingWithCython.ipynb
我有几个建议给你。我编写 Windows 使用串口的应用程序并且我使用不同的方法 - 我假设所有 OS 的原则都是相同的。我首先在程序开始时创建并打开端口,然后保持打开状态。在你的程序存在之前关闭端口是一个很好的做法,但这并不是真正必要的,因为 OS 会在之后清理。
但是您的代码将在您每次调用函数时创建和初始化端口。完成后您没有明确关闭它;也许你可以摆脱它,因为端口对象被垃圾收集了。在您尝试再次打开它之前,您信任串行库在 OS 级别正确关闭端口。在任何情况下,如果创建端口对象有开销,为什么不招致一次并完成呢?
您根本不需要创建 TextIOWrapper,更不用说双向的了。您想知道这是否是导致性能问题的原因,那么为什么不摆脱它呢?串行端口库具有您需要的所有功能:查看 read_until
函数。
我认为您应该从类似这样的框架开始。我无法 运行 测试这个程序,所以它只是一个示意图。我已经删除了所有的错误处理代码。一个小问题是串行端口以字节为单位运行,您必须将其转换为字符串。
ser = serial.Serial('/dev/tty.usbserial-00002014', 115200, timeout=15)
def bimu_get_gyroscope_raw():
while True:
ser.flushInput()
b = ser.read_until('\r')
s = str(b, encoding='latin1') # convert to str
if a.startswith('S,'):
line = s.split(',')
if len(line)==12:
return dict(x = float(line[1]),
y = float(line[2]),
z = float(line[3]))
我已将 ser
设为全局变量,但您也可以将其作为参数传递给函数。
记住串行端口在现代 OS 上的工作方式。您永远不会直接从硬件读取字符 - OS 正在为您执行此操作,并将字符放入输入缓冲区。当您从端口 "read" 时,您实际上是在从缓冲区中检索任何字符,或者等待它们的到来。您观察到的情况 - 长时间延迟之后是快速连续的数据行 - 可以通过陀螺仪硬件在几秒钟内不做任何事情来解释,然后产生一连串超过一行的数据。我不知道你的陀螺仪是怎么工作的所以我不能说真的是这样。
PySerial 实现实际上是一组操作系统调用的包装器。 Python 开销非常小,其中大部分是错误处理代码。我相信您可以使用 Python 每秒接收数千个字符 - 我一直这样做。在现代 PC 上,三秒接近永恒。必须有另一种解释。暂时不要认为 Python 是您的瓶颈。
通过查看屏幕并单击秒表来为事件计时很笨拙。查看Python时间包。您可以简单地在每个打印语句中打印 time.time() 的值,然后收起您的计时器。
您可以独立测试实施的数据收集部分。只需剥离解析数据的逻辑,并永远留在 while 循环中。打印数据以及每个接收行的时间戳。如果您有另一台与串行端口通信的仪器,您可以将仪器的性能与软件的性能隔离开来。
最后,什么事件导致陀螺仪进行数据传输?它是那些只是定期广播其数据的仪器之一,还是您必须向它发送一些命令来请求数据?如果前者和每三秒播放一次,谜团就解开了;同样,如果是后者并且响应延迟为三秒。我可以想象可能会出现这样的情况,因为仪器必须读取一些传感器并将结果转换为字符串。您没有向我们展示整个程序或告诉我们这些仪器是如何工作的,所以这只是猜测。
注意一些你应该注意的事情。
首先,当您启动您的应用程序时,您应该将所有资源实例化并留在那里以供使用。 串口有输入缓冲区,可以异步方式访问数据,不需要一直监听客户端,但是如果关闭端口,打开之前收到的所有数据都会被丢弃并没有添加到缓冲区。这就是为什么您必须始终打开端口。
我在应用程序中通常做的是将串口抽象出来,创建一个对象来处理设备。 该对象将打开端口并有一个 Rx 线程(不需要 Tx 线程,因为 python 由于 GIL 以同步方式运行)并继续监视新数据。 要发送数据,您只需调用“__send_command”即可。 获取接收到的数据,从对象的Rx队列中取出。
这是我经常用来与串口设备通信的代码(我从事工业自动化工作,其中串口是一个通用接口)。它一直在满足我的需求。我做了一些更改以从我的设备(工业规模)中删除设备特定的辅助功能,但留下一个作为 .只需添加您的!
不要忘记检查你的模块发送数据的频率,通常这个时间基础可以在模块的特定寄存器上配置。通常从几毫秒到几秒的间隔。
import serial # pip install pyserial
import io
import time
import threading
class SerialDevice():
def __init__(self, baudrate=9600, port='COM1', timeout=0.1):
self.ser = serial.Serial()
self.ser.baudrate = baudrate
self.ser.port = port
self.ser.timeout = timeout
self.sio = io.TextIOWrapper(io.BufferedRWPair(self.ser, self.ser), newline='\r\n')
self.received_data = []
def connect(self):
try:
self.ser.open()
time.sleep(0.1)
self.clear_data()
self.__control_thread = threading.Thread(target=self.__receive_handler, args=())
self.__control_thread.start()
except Exception as e:
print(e)
return False
return self.ser.is_open
def disconnect(self):
try:
if(self.ser.is_open):
self.ser.close()
return True
else:
return False
except Exception as e:
print(e)
return False
def connected(self):
return self.ser.is_open
def data_available(self):
return len(self.received_data)
def get_data(self):
'''Pop the first item from the received data list'''
if(len(self.received_data)):
return self.received_data.pop(0)
return None
def peek_data(self):
'''Pop the first item from the received data list'''
if(len(self.received_data)):
return self.received_data[0]
return None
def clear_data(self):
'''Clear the received data list'''
self.received_data.clear()
def __receive_handler(self):
while(not self.ser.is_open): # Waits for the port to open
time.sleep(0.1)
# Clear serial input buffer
self.ser.read(self.ser.in_waiting)
while(self.ser.is_open):
try:
if(self.ser.in_waiting):
data = self.ser.readline()
self.received_data.append(self.__unpack_data(data))
print('received! {}'.format(data))
except Exception as e:
print(e)
time.sleep(0.001)
def __unpack_data(self, data=''):
'''Unpacks the received data to the measurement format
Receives the binary array and returns a Measurement object'''
# Decode the received data here and return it processed as an object to the received_data queue
# in this case I'll just return the same daata
return data
def __send_command(self, command):
# send the command using serial port
# Return 1 if success, 0 if error
try:
if(self.ser.is_open):
self.ser.write((command + '\r\n').encode('ascii'))
return 1
except Exception as e:
print(e)
return 0
# this is a helper function to send commands to your device
def send_global_reset(self):
'''Global reset to reset all menu settings to the original factory defaults'''
return self.__send_command('Esc R')