Linux 和 Windows 的多线程键盘检测器

Multithreaded Keyboard Detector for Linux and Windows

我想为 Linux 添加键盘检测到我现有的 Keyboard Detector for Windows。所以我用 pyudev 创建了一个 LinuxKeyboardDetector.

脚本可以启动,出现图形用户界面,可惜键盘检测什么都认不出来,也不报错。

我怀疑使用QRunnable的多线程有问题。

代码

import sys
from datetime import datetime
import platform

from PyQt5. QtCore import QObject, QRunnable, QThreadPool, pyqtSignal
from PyQt5.QtWidgets import QApplication, QMainWindow, QTableWidget, QTableWidgetItem, QHeaderView


current_platform = platform.system()
if current_platform == "Windows":
    import pythoncom
    import wmi
elif current_platform == "Linux":
    import pyudev
    from pyudev.pyqt5 import MonitorObserver


def create_keyboard_detector():
    keyboard_detector = None
    if current_platform == "Windows":
        keyboard_detector = WindowsKeyboardDetector()
    elif current_platform == "Linux":
        keyboard_detector = LinuxKeyboardDetector()
    return keyboard_detector


class KeyboardDetectorSignals(QObject):
    keyboard_changed = pyqtSignal(str)


class WindowsKeyboardDetector(QRunnable):
    def __init__(self):
        super().__init__()

        self.signals = KeyboardDetectorSignals()

    def run(self):
        pythoncom.CoInitialize()
        device_connected_wql = "SELECT * FROM __InstanceCreationEvent WITHIN 2 WHERE TargetInstance ISA \'Win32_Keyboard\'"
        device_disconnected_wql = "SELECT * FROM __InstanceDeletionEvent WITHIN 2 WHERE TargetInstance ISA \'Win32_Keyboard\'"

        c = wmi.WMI()
        connected_watcher = c.watch_for(raw_wql=device_connected_wql)
        disconnected_watcher = c.watch_for(raw_wql=device_disconnected_wql)

        while True:
            try:
                connected = connected_watcher(timeout_ms=10)
            except wmi.x_wmi_timed_out:
                pass
            else:
                if connected:
                    self.signals.keyboard_changed.emit("Keyboard connected.")

            try:
                disconnected = disconnected_watcher(timeout_ms=10)
            except wmi.x_wmi_timed_out:
                pass
            else:
                if disconnected:
                    self.signals.keyboard_changed.emit("Keyboard disconnected.")


class LinuxKeyboardDetector(QRunnable):
    def __init__(self):
        super().__init__()
        self.signals = KeyboardDetectorSignals()
        self.context = pyudev.Context()
        self.monitor = pyudev.Monitor.from_netlink(self.context)
        self.observer = MonitorObserver(self.monitor)

    def run(self):
        self.monitor.filter_by(subsystem="usb", device_type="usb_device")
        self.observer.deviceEvent.connect(self.process_device_event)
        self.monitor.start()

    def process_device_event(self, device):
        if device['ID_INPUT_KEYBOARD'] == '1':
            if device.action == "add":
                self.signals.keyboard_changed.emit("Keyboard connected.")
            if device.action == "remove":
                self.signals.keyboard_changed.emit("Keyboard disconnected.")


class MainWindow(QMainWindow):

    def __init__(self):
        super().__init__()

        self.setGeometry(100, 100, 500, 500)
        self.setWindowTitle("Keyboard Logger")

        self.log_table = QTableWidget()
        self.log_table.setColumnCount(2)
        self.log_table.setShowGrid(True)
        self.log_table.setHorizontalHeaderLabels(["Time", "Event"])
        self.log_table.horizontalHeader().setStretchLastSection(True)
        self.log_table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeToContents)
        self.setCentralWidget(self.log_table)
        self.show()

        self.threadpool = QThreadPool()
        keyboard_detector = create_keyboard_detector()
        keyboard_detector.signals.keyboard_changed.connect(self.add_row)
        self.threadpool.start(keyboard_detector)

    def add_row(self, event: str):
        now = datetime.now()
        datetime_string = now.strftime("%Y-%m-%d %H:%M:%S")

        row_count = self.log_table.rowCount()
        self.log_table.insertRow(row_count)
        self.log_table.setItem(row_count, 0, QTableWidgetItem(datetime_string))
        self.log_table.setItem(row_count, 1, QTableWidgetItem(event))


def main():
    app = QApplication(sys.argv)
    window = MainWindow()
    sys.exit(app.exec_())


if __name__ == '__main__':
    main()

编辑 1: 更新 LinuxKeyboardDetector class 以使用基本 pyudev.MonitorObserver,而不是专用的 pyqt 版本。

class LinuxKeyboardDetector(QRunnable):
    def __init__(self):
        super().__init__()
        self.signals = KeyboardDetectorSignals()
        self.context = pyudev.Context()
        self.monitor = pyudev.Monitor.from_netlink(self.context)
        # self.observer = MonitorObserver(self.monitor)
        self.observer = pyudev.MonitorObserver(self.monitor, self.process_device_event)

    def run(self):
        self.monitor.filter_by(subsystem="usb", device_type="usb_device")
        # self.observer.deviceEvent.connect(self.process_device_event)
        # self.monitor.start()
        self.observer.start()

    def process_device_event(self, device):
        if device['ID_INPUT_KEYBOARD'] == '1':
            if device.action == "add":
                self.signals.keyboard_changed.emit("Keyboard connected.")
            if device.action == "remove":
                self.signals.keyboard_changed.emit("Keyboard disconnected.")

结果一:USB键盘插拔时出现如下错误信息

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/ata/source/venv/lib/python3.6/site-packages/pyudev/monitor.py", line 532, in run
    self._callback(device)
  File "/home/ata/source/venv/lib/python3.6/site-packages/pyudev/monitor.py", line 508, in <lambda>
    callback = lambda d: event_handler(d.action, d)
TypeError: process_device_event() takes 2 positional arguments but 3 were given

根据,您必须在 Qt 应用程序事件循环之前启动监视器。在这种情况下,您必须 而不是 使用 QRunnable,因为监视器将作为标准 QObject 工作,异步工作并在需要时发送信号。

如果你仍然想保持相同的界面,使用 QRunnable,我认为唯一的解决办法是使用基本的 pyudev.MonitorObserver,而不是专用的 pyqt 版本。

class LinuxKeyboardDetector(QRunnable):
    def __init__(self):
        super().__init__()
        self.signals = KeyboardDetectorSignals()
        self.context = pyudev.Context()
        self.monitor = pyudev.Monitor.from_netlink(self.context)
        self.observer = pyudev.MonitorObserver(self.monitor, self.process_device_event)

    def run(self):
        self.monitor.filter_by(subsystem="usb", device_type="usb_device")
        self.observer.start()