如何 python 转换机器人应用程序/嵌入式系统
How to python transitions for robotic applications / embedded systems
我正在考虑使用转换为机器人应用程序构建状态机。我以前使用 simulink stateflow 来完成这种任务。在这里,可以定期调用状态机并评估当前活动状态,以查看是否可以退出转换到新状态,否则会保持当前状态(每次轮询可以进行一次转换)。带着这种心态,我正在尝试一个带有过渡的初始玩具示例,看看我是否能掌握它,但我觉得我需要以不同的方式考虑这个解决方案。
我想到的玩具示例是那些机场自动护照门之一,其中相机在 Z 方向平移以匹配用户的面部高度。这是我的(希望是自我描述的)尝试:
from transitions import Machine
from time import sleep, time
from random import choice
class Passport_cam_sm(object):
def __init__(self):
self.face_ok = False
self.height_ok = False
states = ['move2face', 'validate_face', 'open_gate']
transitions = [
{'trigger': 'at_face', 'source': 'move2face', 'dest': 'validate_face', 'conditions': 'height_ok'},
{'trigger': 'face_valid', 'source': 'validate_face', 'dest': 'open_gate', 'conditions': 'face_valid'},
{'trigger': 'face_invalid', 'source': 'validate_face', 'dest': 'move2face'},
]
class Passport_cam(object):
def __init__(self, terminate_time_s=10, time_step_s=1):
self.model = Passport_cam_sm()
self.machine = Machine(model=self.model, states=states, transitions=transitions, initial='move2face', send_event=True)
self.running = True
self.init_time = time()
self.terminate_time_s = terminate_time_s
self.time_step_s = time_step_s
self.face_pos_z = 5
self.camera_pos_z = 0
self.error_z = 888
def run(self):
'''
main program loop
:return:
'''
while self.running and not self.main_timeout():
self.machine.face_ok = self.face_ok()
self.machine.height_ok = self.height_ok()
print ('At state ' + self.model.state) #, ' camera height (', self.error_z ,') ok is ', self.machine.height_ok, ' face check is ', self.machine.face_ok)
# todo - poll latest state here? (self.model.state)
self.camera_dynamics()
sleep(1)
def face_ok(self):
'''
very robust method for determining if the face is valid...
:return:
'''
return choice([True, False])
def height_ok(self, tol=0.5):
'''
Checks if the face height is OK to do comparison.
:return:
'''
if abs(self.error_z) < tol:
return True
else:
return False
def camera_dynamics(self, max_displacement=1):
'''
Moves camera height towards face height at a maximum of "max_displacement" per function call
:return:
'''
self.error_z = self.camera_pos_z - self.face_pos_z
threshold_error = (min(max(self.error_z, -max_displacement), max_displacement))
self.camera_pos_z = self.camera_pos_z - threshold_error
print ('Camera height error is: {0}'.format(self.error_z))
def main_timeout(self):
if time() > self.init_time + self.terminate_time_s:
return True
else:
return False
pc = Passport_cam()
pc.run()
我希望找到的是某种轮询代码中 'todo' 部分的最后状态的方法,以检查现在是否有任何退出条件有效。有没有办法做到这一点?重新进入当前状态应该没问题,但我想某种 "during" 方法会更理想。
否则有没有更好的方法来构建这种项目and/or有这样的示例项目吗?
欢迎来到 Stack Overflow。我建议宁愿 'poll' 过渡并等待它成功进行。成功执行后,转换将 return True
,如果 a) 准备失败,b) 条件未满足或 c) 进入状态期间出现问题或 d) 问题,则 False
在处理 after
事件期间发生。
我缩小了您的示例代码的范围,以说明如何使用它来检查 face
和 height
是否已成功检索:
from transitions import Machine
from time import sleep
from random import choice
class Passport_cam_sm(object):
def __init__(self):
self._face_okay = False
self._height_okay = False
def face_ok(self, even_data):
self._face_okay = choice([True, False])
return self._face_okay
def height_ok(self, even_data):
# tol = even_data.kwargs.pop('tol', 0.5)
self._height_okay = choice([True, False])
return self._height_okay
states = ['move2face', 'validate_face', 'open_gate']
transitions = [
{'trigger': 'at_face', 'source': 'move2face', 'dest': 'validate_face',
'conditions': ['height_ok', 'face_ok']},
{'trigger': 'face_valid', 'source': 'validate_face', 'dest': 'open_gate',
'conditions': 'face_valid'},
{'trigger': 'face_invalid', 'source': 'validate_face', 'dest': 'move2face'},
]
class Passport_cam(object):
def __init__(self):
self.model = Passport_cam_sm()
self.machine = Machine(model=self.model, states=states, transitions=transitions,
initial='move2face', send_event=True)
self.running = True
def run(self):
while self.running:
print('At state ' + self.model.state)
# model.at_face will only return True when both conditions are met
while not self.model.at_face():
print('Checking ...')
sleep(1)
print('Face and height are okay!')
self.camera_dynamics()
sleep(1)
def camera_dynamics(self):
print("Processing...")
self.running = False
pc = Passport_cam()
pc.run()
print('Done')
我添加了两个检查 (face/height_ok) 作为有效转换的条件。
如果您想先分配它们并且只检查它们在 conditions
中的值,您可以使用转换关键字 prepare
。
prepare 中的 Functions/methods 将在 conditions
之前执行,并且不需要布尔值 return。
当您指定 send_event=True
时,所有回调都应该期待该事件。这就是 face/height_ok
需要上面使用的签名的原因。
传递给触发事件的参数(例如 model.at_face(tol=0.5)
)将分配给 event_data.args
或 event_data.kwargs
.
请注意,我已将您的条件检查分配给模型。字符串总是被假定为模型方法的名称。如果您想从其他地方分配 functions/methods ,您可以传递对这些函数的引用而不是字符串。另请注意,这仅在立即处理事件时才有效。 transitions
支持排队事件处理(将 queued=True
传递给 Machine
构造函数),这在事件可能触发其他事件时派上用场。当 queued=True
事件将 ALWAYS return 为真。
如何以不可知的方式进行轮询?
状态机的一个核心特征是能够根据其当前状态调整其行为。同样的事件可能会导致不同的结果。如果您希望您的机器不断轮询而不是对事件做出反应,您可以定义所有转换由同一事件触发:
transitions = [
{'trigger': 'check', 'source': 'move2face', 'dest': 'validate_face',
'conditions': ['height_ok', 'face_ok']},
{'trigger': 'check', 'source': 'validate_face', 'dest': 'open_gate',
'conditions': 'face_valid'}, # (1)
{'trigger': 'check', 'source': 'validate_face', 'dest': 'move2face'}, # (2)
]
...
# class Passport_cam
def run(self):
while self.running:
print('At state ' + self.model.state)
while not self.model.check():
sleep(1)
循环可以简化为只调用 model.check
。这样就可以在不需要更改轮询循环的情况下引入转换、检查和状态。
转换按照添加的顺序执行。这意味着 (1) 和 (2) 形成规则 In state 'validate_face' go to 'open_gate' if 'face_valid', otherwise go to 'move2face'
。不必检查源状态与当前状态不同的转换,也不会产生太多开销。
最重要的是独立于框架的机器状态和转换的总体设计。如果条件检查变得臃肿,请考虑将状态拆分为更小的功能更具体的状态。
您还可以拆分转换配置 and/or 将您的模型组合成几个专用模型。
我正在考虑使用转换为机器人应用程序构建状态机。我以前使用 simulink stateflow 来完成这种任务。在这里,可以定期调用状态机并评估当前活动状态,以查看是否可以退出转换到新状态,否则会保持当前状态(每次轮询可以进行一次转换)。带着这种心态,我正在尝试一个带有过渡的初始玩具示例,看看我是否能掌握它,但我觉得我需要以不同的方式考虑这个解决方案。
我想到的玩具示例是那些机场自动护照门之一,其中相机在 Z 方向平移以匹配用户的面部高度。这是我的(希望是自我描述的)尝试:
from transitions import Machine
from time import sleep, time
from random import choice
class Passport_cam_sm(object):
def __init__(self):
self.face_ok = False
self.height_ok = False
states = ['move2face', 'validate_face', 'open_gate']
transitions = [
{'trigger': 'at_face', 'source': 'move2face', 'dest': 'validate_face', 'conditions': 'height_ok'},
{'trigger': 'face_valid', 'source': 'validate_face', 'dest': 'open_gate', 'conditions': 'face_valid'},
{'trigger': 'face_invalid', 'source': 'validate_face', 'dest': 'move2face'},
]
class Passport_cam(object):
def __init__(self, terminate_time_s=10, time_step_s=1):
self.model = Passport_cam_sm()
self.machine = Machine(model=self.model, states=states, transitions=transitions, initial='move2face', send_event=True)
self.running = True
self.init_time = time()
self.terminate_time_s = terminate_time_s
self.time_step_s = time_step_s
self.face_pos_z = 5
self.camera_pos_z = 0
self.error_z = 888
def run(self):
'''
main program loop
:return:
'''
while self.running and not self.main_timeout():
self.machine.face_ok = self.face_ok()
self.machine.height_ok = self.height_ok()
print ('At state ' + self.model.state) #, ' camera height (', self.error_z ,') ok is ', self.machine.height_ok, ' face check is ', self.machine.face_ok)
# todo - poll latest state here? (self.model.state)
self.camera_dynamics()
sleep(1)
def face_ok(self):
'''
very robust method for determining if the face is valid...
:return:
'''
return choice([True, False])
def height_ok(self, tol=0.5):
'''
Checks if the face height is OK to do comparison.
:return:
'''
if abs(self.error_z) < tol:
return True
else:
return False
def camera_dynamics(self, max_displacement=1):
'''
Moves camera height towards face height at a maximum of "max_displacement" per function call
:return:
'''
self.error_z = self.camera_pos_z - self.face_pos_z
threshold_error = (min(max(self.error_z, -max_displacement), max_displacement))
self.camera_pos_z = self.camera_pos_z - threshold_error
print ('Camera height error is: {0}'.format(self.error_z))
def main_timeout(self):
if time() > self.init_time + self.terminate_time_s:
return True
else:
return False
pc = Passport_cam()
pc.run()
我希望找到的是某种轮询代码中 'todo' 部分的最后状态的方法,以检查现在是否有任何退出条件有效。有没有办法做到这一点?重新进入当前状态应该没问题,但我想某种 "during" 方法会更理想。
否则有没有更好的方法来构建这种项目and/or有这样的示例项目吗?
欢迎来到 Stack Overflow。我建议宁愿 'poll' 过渡并等待它成功进行。成功执行后,转换将 return True
,如果 a) 准备失败,b) 条件未满足或 c) 进入状态期间出现问题或 d) 问题,则 False
在处理 after
事件期间发生。
我缩小了您的示例代码的范围,以说明如何使用它来检查 face
和 height
是否已成功检索:
from transitions import Machine
from time import sleep
from random import choice
class Passport_cam_sm(object):
def __init__(self):
self._face_okay = False
self._height_okay = False
def face_ok(self, even_data):
self._face_okay = choice([True, False])
return self._face_okay
def height_ok(self, even_data):
# tol = even_data.kwargs.pop('tol', 0.5)
self._height_okay = choice([True, False])
return self._height_okay
states = ['move2face', 'validate_face', 'open_gate']
transitions = [
{'trigger': 'at_face', 'source': 'move2face', 'dest': 'validate_face',
'conditions': ['height_ok', 'face_ok']},
{'trigger': 'face_valid', 'source': 'validate_face', 'dest': 'open_gate',
'conditions': 'face_valid'},
{'trigger': 'face_invalid', 'source': 'validate_face', 'dest': 'move2face'},
]
class Passport_cam(object):
def __init__(self):
self.model = Passport_cam_sm()
self.machine = Machine(model=self.model, states=states, transitions=transitions,
initial='move2face', send_event=True)
self.running = True
def run(self):
while self.running:
print('At state ' + self.model.state)
# model.at_face will only return True when both conditions are met
while not self.model.at_face():
print('Checking ...')
sleep(1)
print('Face and height are okay!')
self.camera_dynamics()
sleep(1)
def camera_dynamics(self):
print("Processing...")
self.running = False
pc = Passport_cam()
pc.run()
print('Done')
我添加了两个检查 (face/height_ok) 作为有效转换的条件。
如果您想先分配它们并且只检查它们在 conditions
中的值,您可以使用转换关键字 prepare
。
prepare 中的 Functions/methods 将在 conditions
之前执行,并且不需要布尔值 return。
当您指定 send_event=True
时,所有回调都应该期待该事件。这就是 face/height_ok
需要上面使用的签名的原因。
传递给触发事件的参数(例如 model.at_face(tol=0.5)
)将分配给 event_data.args
或 event_data.kwargs
.
请注意,我已将您的条件检查分配给模型。字符串总是被假定为模型方法的名称。如果您想从其他地方分配 functions/methods ,您可以传递对这些函数的引用而不是字符串。另请注意,这仅在立即处理事件时才有效。 transitions
支持排队事件处理(将 queued=True
传递给 Machine
构造函数),这在事件可能触发其他事件时派上用场。当 queued=True
事件将 ALWAYS return 为真。
如何以不可知的方式进行轮询?
状态机的一个核心特征是能够根据其当前状态调整其行为。同样的事件可能会导致不同的结果。如果您希望您的机器不断轮询而不是对事件做出反应,您可以定义所有转换由同一事件触发:
transitions = [
{'trigger': 'check', 'source': 'move2face', 'dest': 'validate_face',
'conditions': ['height_ok', 'face_ok']},
{'trigger': 'check', 'source': 'validate_face', 'dest': 'open_gate',
'conditions': 'face_valid'}, # (1)
{'trigger': 'check', 'source': 'validate_face', 'dest': 'move2face'}, # (2)
]
...
# class Passport_cam
def run(self):
while self.running:
print('At state ' + self.model.state)
while not self.model.check():
sleep(1)
循环可以简化为只调用 model.check
。这样就可以在不需要更改轮询循环的情况下引入转换、检查和状态。
转换按照添加的顺序执行。这意味着 (1) 和 (2) 形成规则 In state 'validate_face' go to 'open_gate' if 'face_valid', otherwise go to 'move2face'
。不必检查源状态与当前状态不同的转换,也不会产生太多开销。
最重要的是独立于框架的机器状态和转换的总体设计。如果条件检查变得臃肿,请考虑将状态拆分为更小的功能更具体的状态。 您还可以拆分转换配置 and/or 将您的模型组合成几个专用模型。