如何 python 转换机器人应用程序/嵌入式系统

How to python transitions for robotic applications / embedded systems

我正在考虑使用转换为机器人应用程序构建状态机。我以前使用 simulink stateflow 来完成这种任务。在这里,可以定期调用状态机并评估当前活动状态,以查看是否可以退出转换到新状态,否则会保持当前状态(每次轮询可以进行一次转换)。带着这种心态,我正在尝试一个带有过渡的初始玩具示例,看看我是否能掌握它,但我觉得我需要以不同的方式考虑这个解决方案。

我想到的玩具示例是那些机场自动护照门之一,其中相机在 Z 方向平移以匹配用户的面部高度。这是我的(希望是自我描述的)尝试:

from transitions import Machine
from time import sleep, time
from random import choice

class Passport_cam_sm(object):

    def __init__(self):

        self.face_ok = False
        self.height_ok = False

states = ['move2face', 'validate_face', 'open_gate']

transitions = [
    {'trigger': 'at_face', 'source': 'move2face', 'dest': 'validate_face', 'conditions': 'height_ok'},
    {'trigger': 'face_valid', 'source': 'validate_face', 'dest': 'open_gate', 'conditions': 'face_valid'},
    {'trigger': 'face_invalid', 'source': 'validate_face', 'dest': 'move2face'},
]

class Passport_cam(object):

    def __init__(self, terminate_time_s=10, time_step_s=1):

        self.model = Passport_cam_sm()
        self.machine = Machine(model=self.model, states=states, transitions=transitions, initial='move2face', send_event=True)
        self.running = True
        self.init_time = time()
        self.terminate_time_s = terminate_time_s
        self.time_step_s = time_step_s
        self.face_pos_z = 5
        self.camera_pos_z = 0
        self.error_z = 888

    def run(self):
        '''
        main program loop
        :return: 
        '''

        while self.running and not self.main_timeout():

            self.machine.face_ok = self.face_ok()
            self.machine.height_ok = self.height_ok()
            print ('At state ' + self.model.state) #, ' camera height (', self.error_z ,') ok is ', self.machine.height_ok, ' face check is ', self.machine.face_ok)
            # todo - poll latest state here? (self.model.state)
            self.camera_dynamics()
            sleep(1)

    def face_ok(self):
        '''
        very robust method for determining if the face is valid...
        :return: 
        '''
        return choice([True, False])

    def height_ok(self, tol=0.5):
        '''
        Checks if the face height is OK to do comparison.
        :return: 
        '''
        if abs(self.error_z) < tol:
            return True
        else:
            return False

    def camera_dynamics(self, max_displacement=1):
        '''
        Moves camera height towards face height at a maximum of "max_displacement" per function call 
        :return: 
        '''
        self.error_z = self.camera_pos_z - self.face_pos_z
        threshold_error = (min(max(self.error_z, -max_displacement), max_displacement))
        self.camera_pos_z = self.camera_pos_z - threshold_error
        print ('Camera height error is: {0}'.format(self.error_z))

    def main_timeout(self):
        if time() > self.init_time + self.terminate_time_s:
            return True
        else:
            return False

pc = Passport_cam()
pc.run()

我希望找到的是某种轮询代码中 'todo' 部分的最后状态的方法,以检查现在是否有任何退出条件有效。有没有办法做到这一点?重新进入当前状态应该没问题,但我想某种 "during" 方法会更理想。

否则有没有更好的方法来构建这种项目and/or有这样的示例项目吗?

欢迎来到 Stack Overflow。我建议宁愿 'poll' 过渡并等待它成功进行。成功执行后,转换将 return True,如果 a) 准备失败,b) 条件未满足或 c) 进入状态期间出现问题或 d) 问题,则 False在处理 after 事件期间发生。 我缩小了您的示例代码的范围,以说明如何使用它来检查 faceheight 是否已成功检索:

from transitions import Machine
from time import sleep
from random import choice

class Passport_cam_sm(object):

    def __init__(self):
        self._face_okay = False
        self._height_okay = False

    def face_ok(self, even_data):
        self._face_okay = choice([True, False])
        return self._face_okay

    def height_ok(self, even_data):
        # tol = even_data.kwargs.pop('tol', 0.5)
        self._height_okay =  choice([True, False])
        return self._height_okay

states = ['move2face', 'validate_face', 'open_gate']

transitions = [
    {'trigger': 'at_face', 'source': 'move2face', 'dest': 'validate_face',
     'conditions': ['height_ok', 'face_ok']},
    {'trigger': 'face_valid', 'source': 'validate_face', 'dest': 'open_gate', 
     'conditions': 'face_valid'},
    {'trigger': 'face_invalid', 'source': 'validate_face', 'dest': 'move2face'},
]


class Passport_cam(object):

    def __init__(self):

        self.model = Passport_cam_sm()
        self.machine = Machine(model=self.model, states=states, transitions=transitions,
                               initial='move2face', send_event=True)
        self.running = True

    def run(self):
        while self.running:
            print('At state ' + self.model.state)
            # model.at_face will only return True when both conditions are met
            while not self.model.at_face():
                print('Checking ...') 
                sleep(1)
            print('Face and height are okay!')
            self.camera_dynamics()
            sleep(1)

    def camera_dynamics(self):
        print("Processing...")
        self.running = False


pc = Passport_cam()
pc.run()
print('Done')

我添加了两个检查 (face/height_ok) 作为有效转换的条件。 如果您想先分配它们并且只检查它们在 conditions 中的值,您可以使用转换关键字 prepare。 prepare 中的 Functions/methods 将在 conditions 之前执行,并且不需要布尔值 return。 当您指定 send_event=True 时,所有回调都应该期待该事件。这就是 face/height_ok 需要上面使用的签名的原因。 传递给触发事件的参数(例如 model.at_face(tol=0.5))将分配给 event_data.argsevent_data.kwargs.

请注意,我已将您的条件检查分配给模型。字符串总是被假定为模型方法的名称。如果您想从其他地方分配 functions/methods ,您可以传递对这些函数的引用而不是字符串。另请注意,这仅在立即处理事件时才有效。 transitions 支持排队事件处理(将 queued=True 传递给 Machine 构造函数),这在事件可能触发其他事件时派上用场。当 queued=True 事件将 ALWAYS return 为真。

如何以不可知的方式进行轮询?

状态机的一个核心特征是能够根据其当前状态调整其行为。同样的事件可能会导致不同的结果。如果您希望您的机器不断轮询而不是对事件做出反应,您可以定义所有转换由同一事件触发:

  transitions = [ 
      {'trigger': 'check', 'source': 'move2face', 'dest': 'validate_face',
     'conditions': ['height_ok', 'face_ok']},
      {'trigger': 'check', 'source': 'validate_face', 'dest': 'open_gate', 
     'conditions': 'face_valid'}, # (1)
      {'trigger': 'check', 'source': 'validate_face', 'dest': 'move2face'}, # (2)
  ]
  ...
  # class Passport_cam
  def run(self):
        while self.running:
            print('At state ' + self.model.state)
            while not self.model.check():
                sleep(1)

循环可以简化为只调用 model.check。这样就可以在不需要更改轮询循环的情况下引入转换、检查和状态。

转换按照添加的顺序执行。这意味着 (1) 和 (2) 形成规则 In state 'validate_face' go to 'open_gate' if 'face_valid', otherwise go to 'move2face' 。不必检查源状态与当前状态不同的转换,也不会产生太多开销。

最重要的是独立于框架的机器状态和转换的总体设计。如果条件检查变得臃肿,请考虑将状态拆分为更小的功能更具体的状态。 您还可以拆分转换配置 and/or 将您的模型组合成几个专用模型。