在单独的线程中进行数据采集和绘图

Data acquisition and plotting in separate threads

我有一个 class 进行数据采集,即 PySession.scope 在一个单独的线程中。我想在基于 PyQt5ScopeGUI 中绘制每个范围。显然,GUI 需要在主线程中 运行,但是我希望能够随时调用另一个 PySession.scope,也就是说,控制台必须可以自由提交新命令。

总而言之:假设我同时启动两个范围,在收集数据时,应该在 ScopeGUI 的两个单独实例中将其可视化。同时,控制台应该可以自由启动额外的范围。在 Python 中实现此行为的最佳做法是什么?

这是我目前拥有的一个可重现的例子:

.
├── main.py
├── scope.py
├── alosaclient.py

main.py

import os
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from alosaclient import PySession
import matplotlib.pyplot as plt

if __name__ == '__main__':
    session = PySession(
        "C:/ProjTMS/alosa1-client/test/config/linkcommand.toml")
    variables = "speedcpu U_Z"
    future1 = session.scope(
        varnames=variables, nsamples=20, ndiv=1, realtime=True)
    future2 = session.scope("m_1 m_u", nsamples=20, ndiv=1, realtime=True)
    print("----------------------------------------------MAIN THREAD----------------------------------------------")
    session.work()
    result1 = future1.result()
    print(result1.data[1].time)
    print(result1.data[1].values)
    result2 = future2.result()
    print(result2.data[1].time)
    print(result2.data[1].values)

scope.py

import numpy as np
import sys
from PyQt5 import QtWidgets
from matplotlib.backends.backend_qt5agg import (FigureCanvas,
                                                NavigationToolbar2QT as
                                                NavigationToolbar)
from matplotlib.figure import Figure


class ScopeGUI(QtWidgets.QMainWindow):
    def __init__(self, varnames, nsamples):
        super().__init__()
        self.varnames = varnames
        self.nsamples = nsamples
        self.setEnabled(True)
        self.setGeometry(0, 0, 800, 600)
        self.setMinimumSize(800, 600)

        self.main_widget = QtWidgets.QWidget()
        self.setCentralWidget(self.main_widget)

        self.main_layout = QtWidgets.QVBoxLayout()
        self.main_widget.setLayout(self.main_layout)

        self._fig = Figure(figsize=(8, 6))
        self._canvas = FigureCanvas(self._fig)
        self.main_layout.addWidget(self._canvas)

        self._axes = self._fig.subplots()
        self._axes.grid(True, which="both")
        self._axes.set_xlabel('Time (s)')
        self._axes.set_ylabel('Data (DSPu)')
        self._axes.set_xlim(left=0, right=6.3)
        self._axes.set_ylim(bottom=-1.5, top=1.5)
        self.lines = []
        self.initialize_lines()

        self.addToolBar(NavigationToolbar(self._canvas, self))
        self.show()

    def initialize_lines(self):
        variables = self.varnames.split()
        for var in variables:
            line, = self._axes.plot([], [])
            line.set_marker('.')
            self.lines.append(line)

    def plot(self, scope_data):
        print("plotting")
        for signal, line in zip(scope_data, self.lines):
            x = signal.time
            y = signal.values
            common_length = min(len(x), len(y))
            line.set_xdata(x[:common_length])
            line.set_ydata(y[:common_length])
            self._canvas.draw()
            self._canvas.flush_events()

alosaclient.py

import sys
import time
from concurrent.futures import ThreadPoolExecutor
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import random
from PyQt5 import QtWidgets
from scope import ScopeGUI
import asyncio

app = QtWidgets.QApplication(sys.argv)


class ScopeDataElement:
    def __init__(self, variable_index, time, values):
        self.variable_index = variable_index
        self.time = time
        self.values = values


class Scope:
    def __init__(self, data):
        self.data = data


class PySession:
    def __init__(self, lcmdfile):
        self.pool = ThreadPoolExecutor(8)
        self.loop = asyncio.get_event_loop()
        self.tasks = list()
        self.scope_buffer = list()

    def work(self):
        self.loop.run_until_complete(asyncio.wait(self.tasks))

    def scope(self, varnames, nsamples=1, ndiv=1, realtime=False):
        future = self.pool.submit(
            self.dummy_scope, varnames, nsamples, ndiv)
        if realtime:
            scope_gui = ScopeGUI(varnames, nsamples)
            task = self.loop.create_task(self.update(scope_gui, varnames))
            self.tasks.append(task)
        return future

    async def update(self, scope_gui, varnames):
        variable_indices = [self.name_to_index(
            var) for var in varnames.split()]
        # find corresponding scope_buffer is it may potentially grow dynamically
        scope_index = self.find_scope_index(variable_indices)
        # as long as empty, wait
        while not self.scope_buffer:
            await asyncio.sleep(25e-3)
        # while the data is not complete, update to GUI
        while not all([len(signal.time) == scope_gui.nsamples for signal in self.scope_buffer[scope_index].data]):
            scope_gui.plot(self.scope_buffer[scope_index].data)
            await asyncio.sleep(25e-3)

    @staticmethod
    def name_to_index(varname):
        # dummy cross reference table: get index from variable name
        varnames = ["speedcpu", "U_Z", "m_1", "m_u"]
        return varnames.index(varname)

    def find_scope_index(self, variable_indices):
        # get scope index from variable_indices, may change if scopes run parallel
        result = filter(lambda pair: all([signal.variable_index == varindex for varindex, signal in zip(
            variable_indices, pair[1].data)]), enumerate(self.scope_buffer))
        index = list(result)[0][0]
        return index

    def find_data_index(self, scope, varname):
        result = filter(lambda pair: self.name_to_index(varname) ==
                        pair[1].variable_index, enumerate(scope.data))
        index = list(result)[0][0]
        return index

    def dummy_scope(self, varnames, nsamples, ndiv):
        variables = varnames.split()
        variable_indices = [self.name_to_index(
            var) for var in variables]
        content = [ScopeDataElement(self.name_to_index(
            var), list(), list()) for var in variables]
        scope = Scope(content)
        self.scope_buffer.append(scope)
        for var in variables:
            scope_index = self.find_scope_index(variable_indices)
            data_index = self.find_data_index(
                self.scope_buffer[scope_index], var)
            linspace = np.linspace(0, 2*np.pi, nsamples)
            for arrayidx, point in enumerate(linspace):
                print(f"scope index is {scope_index}")
                print(f"data index is {data_index}")
                self.scope_buffer[scope_index].data[data_index].time.append(
                    point)
                self.scope_buffer[scope_index].data[data_index].values.append(
                    np.sin(point) + random.uniform(0, 0.2))
                time.sleep(10e-3)
        return self.scope_buffer[scope_index]


基本上,此代码执行以下操作:

  1. 开始作用域(PySession.scope)

    • 触发两个虚拟示波器函数来收集一些虚拟数据
    • 根据 GUI 更新任务推回事件循环
  2. 处​​理事件循环(PySession.work)

    • 一旦所有需要的作用域都被初始化,事件循环就会开始处理,即 GUI 被更新

问题:

重要 我正在使用 Python 交互式控制台,这就是为什么没有 app.exec_() 命令的原因。可重现的示例以 python3 -i main.py 开始,与 I.

在同一页面上开始

我完全知道我尝试这样做的方式可能是完全错误的,这就是为什么我向你们寻求帮助。

提前致谢!

这是如何完成的,不再需要 asyncio

更新函数需要适配如下。将 canvas.draw() 从 GUI 移动到 update 例程以避免图形闪烁。

    def update(self, scope_gui, varnames):
        try:
            variable_indices = [self.name_to_index(
            var) for var in varnames.split()]
            # find corresponding scope_buffer is it may potentially grow dynamically
            scope_index = self.find_scope_index(variable_indices)
            done = False
            while not self.scope_buffer:
                time.sleep(10e-3)
            # while the data is not complete, update to GUI
            while not all([len(signal.time) == scope_gui.nsamples for signal in self.scope_buffer[scope_index].data]):
                scope_gui.plot(self.scope_buffer[scope_index].data)
                scope_gui._canvas.draw()
                time.sleep(25e-3)

        except Exception as e:
            print(f"EXCEPTION OCURRED: {e}")
            raise(e)

相应地,范围也进行了调整。

    def scope(self, varnames, nsamples=1, ndiv=1, realtime=False):
        future = self.pool.submit(
            self.dummy_scope, varnames, nsamples, ndiv)
        if realtime:
            scope_gui = ScopeGUI(varnames, nsamples)
            self.pool.submit(self.update, scope_gui, varnames)
        return future

这确保 GUI 在主线程中运行,而更新任务 运行 在单独的线程中,因此不会阻塞控制台。