Python 未调用 MQTT 回调

Python MQTT callbacks not called

我正在开发一个 mqtt-subscriber,它将消息转发到 beanstalk-tube 或 api-endpoint。我想出了以下代码:

#!/usr/bin/python
import pymysql.cursors
import sys
import time
import paho.mqtt.client as mqtt
from threading import Thread


class Process(Thread):
    def __init__(self, sid=None, host=None, username=None, password=None, topic=None, topic_qos=None, destination_type=None, destination=None):
        Thread.__init__(self)
        self.sid = sid
        self.host = host
        self.username = username
        self.password = password
        self.topic = topic
        self.topic_qos = topic_qos
        self.destination_type = destination_type
        self.destination = destination
        self.client = None

    def on_connect(self, client, obj, flags, rc):
        print("connected")
        self.client.subscribe(self.topic, qos=self.topic_qos)
        self.client.loop_forever()

    def on_message(self, client, obj, msg):
        print(str(msg.payload))

    def run(self):
        self.client = mqtt.Client(str(self.sid) + "_subscriber")
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.username_pw_set(self.username, self.password)
        self.client.connect(self.host, 1883, 60)


def main(argv):
    db_connection = pymysql.connect(host=argv[0],
                                    user=argv[1],
                                    password=argv[2],
                                    db=argv[3],
                                    charset='utf8mb4',
                                    cursorclass=pymysql.cursors.DictCursor)
    processes = []
    try:
        with db_connection.cursor() as cursor:
            cursor.execute("SELECT `id`,`topic`,`topic_qos`,`target_type`,`target` FROM mqtt_subscriptions;")
            result = cursor.fetchall()
            for subscription in result:
                process = Process(subscription['id'], argv[4], argv[5], argv[6], subscription['topic'],
                                         subscription['topic_qos'], subscription['target_type'],
                                         subscription['target'])
                process.start()
                processes.append(process)
    finally:
        db_connection.close()
        while True:
            #print("check for new imports")
            time.sleep(4)


if __name__ == "__main__":
    main(sys.argv[1:])

问题是 class 进程中的方法没有被触发。这可能是因为我正在使用 OOP 而所有示例都没有。但这一定是可能的吧?

很想听听您的想法或建议。

吉诺

将对 self.client.loop_forever() 的调用从 on_connect() 回调中移出,并立即将其放在对 self.client.connect(self.host, 1883, 60)

的调用之后

回调应该 return 很快,您所做的就是做到这一点,这样回调永远不会 returns。