我可以从一个线程中循环调用一个线程吗?
Can I call a thread recurrently from within a thread?
我正在尝试使用 queue
将样本从线程 A(“采集”)传输到线程 B(“P300”),但我无法读取线程 B 中的任何数据,尽管样本 are 在线程 A 中分配。从我的输出来看,我认为我的线程 B 在我的线程 A 开始放入数据之前正在快速测试一些东西。
在下面查看我的代码结构的近似值:
import threading
import queue
from queue import Empty
import numpy as np
import warnings
warnings.filterwarnings("error")
class AcqThread(threading.Thread):
def __init__(self, dataOutQ1, dataOutQ2, stopQ1, stopQ2, saveQ):
threading.Thread.__init__(self)
self.stopQ2 = stopQ2
self.stopQ1 = stopQ1
self.dataOutQ2 = dataOutQ2
self.dataOutQ1 = dataOutQ1
self.saveQ = saveQ
def run(self):
Acquisition(inlet, self.dataOutQ1, self.dataOutQ2, self.stopQ1, self.stopQ2, self.saveQ)
class P300Thread(threading.Thread):
def __init__(self, dataInQ, featureQ, stopQ):
threading.Thread.__init__(self)
self.dataInQ = dataInQ
self.featureQ = featureQ
self.stopQ = stopQ
def run(self):
P300fun(self.dataInQ, self.featureQ, self.stopQ)
threadLock = threading.Lock()
SaveQ = queue.Queue()
DataOutQ1 = queue.Queue()
DataOutQ2 = queue.Queue()
StopQ1 = queue.Queue()
StopQ2 = queue.Queue()
FeatQ1 = queue.Queue()
StopQ1.put(0)
StopQ2.put(0)
#
AcqTh = AcqThread(DataOutQ1, DataOutQ2, StopQ1, StopQ2, SaveQ)
P300Th = P300Thread(DataOutQ1, FeatQ1, StopQ1)
def Acquisition(inlet, dataOutQ1, dataOutQ2, stopQ1, stopQ2, saveQ):
i = 0
print('Starting...')
while i<1250: #i is the number of samples
sample, timestamp = inlet.pull_sample() #samples coming in @ 250Hz
##Normalization, filtering##
threadLock.acquire()
dataOutQ1.put([filtsamples[:,-250:], rawtimestamps[-250:]]) #I only need the last 250 samples
threadLock.release()
i += 1
def P300fun(dataInQ, featureQ, stopQ):
p300sample = []
p300timestamp = []
print(f"Is DataInQ size true? {DataOutQ1.qsize()}")
print("Is dataInQ emtpy?", DataOutQ1.empty())
while dataInQ.qsize(): #or while not dataqueue.empty():
try:
print("DataInQ has data")
ss, ts = dataInQ.get(0)
print('<>P300\n>>Samples [', ss, ']\nTimestamp [', ts, ']')
except Empty:
return
print('Thread Finished')
if __name__ == '__main__':
print('Looking for an EEG stream...')
streams = resolve_stream('type', 'EEG')
inlet = StreamInlet(streams[0])
print('Connected!\n')
AcqTh.start()
P300Th.start()
AcqTh.join()
P300Th.join()
print("\n\n>>>DONE<<<\n\n")
并输出:
Looking for an EEG stream...
Connected!
Is DataInQ size true? 0
Starting...
Is dataInQ emtpy? True
Thread Finished
>>>DONE<<<
在我的研究中, appeared to present a similar problem, but it seems that the problem was in the image processing part (and they use the multiprocessing
package). seems to have a concurrency problem, which might be my problem, but I'm not sure how to translate it to my problemlet me know if I'm wrong, tho). 只是参数的顺序有问题,所以我认为这里不适用。
我该怎么办?我应该从线程 A 中循环调用线程 B 吗?我需要线程 B 上的循环或延迟吗? .join()
部分可能有问题吗?我需要在不久的将来添加更多线程,所以最好首先弄清楚如何只使用两个线程...
感谢所有帮助!
作为菜鸟可能很棘手...所以我将回答我自己的问题,以帮助其他可能也遇到此问题的初学者。
好吧,首先要做的是:不,不可能从一个线程中循环调用一个线程,因为每个线程只能调用一次。
但是有一种方法可以防止线程结束,让它们等待允许它们继续的触发器。经过更多研究,我遇到了 this question that showed me there is a way to create events for threads. The documentation can be found here。它非常简单:事件对象的行为类似于标志,可以是 set()
(表示 True)或 clear()
(表示 False,这是原始值)。要测试事件,可以对布尔问题使用 is_set()
方法,或者使用 wait()
方法代替计时器。就我而言,它为我节省了一些我将要使用的队列:
import threading
import queue
from queue import Empty
import numpy as np
class AcqThread(threading.Thread):
def __init__(self, dataOutQ1, dataOutQ2, saveQ):
threading.Thread.__init__(self)
self.dataOutQ2 = dataOutQ2
self.dataOutQ1 = dataOutQ1
self.saveQ = saveQ
def run(self):
Acquisition(inlet, self.dataOutQ1, self.dataOutQ2, self.saveQ)
class P300Thread(threading.Thread):
def __init__(self, dataInQ, featureQ):
threading.Thread.__init__(self)
self.dataInQ = dataInQ
self.featureQ = featureQ
def run(self):
P300fun(self.dataInQ, self.featureQ)
threadLock = threading.Lock()
SaveQ = queue.Queue()
DataOutQ1 = queue.Queue()
DataOutQ2 = queue.Queue()
FeatQ1 = queue.Queue()
FeatQ2 = queue.Queue()
#NEW:: initializes Events
E = threading.Event()
EP300 = threading.Event()
#
AcqTh = AcqThread(DataOutQ1, DataOutQ2, SaveQ)
P300Th = P300Thread(DataOutQ1, FeatQ1)
它允许我“循环”“调用”线程 B,因为它保持我的第一个线程处于活动状态(因为事件 E)并且仅在设置事件 EP300 时才进入处理部分。然后,完成后EP300被清除:
def Acquisition(inlet, dataOutQ1, dataOutQ2 saveQ):
i = 0
print('Starting...')
while i<1250:
sample, timestamp = inlet.pull_sample()
##Normalization, filtering##
if _condition_:
threadLock.acquire()
dataOutQ1.put([filtsamples[:,-250:], rawtimestamps[-250:]])
threadLock.release()
EP300.set() #NEW:: allows the P300 function to collect data from queue
i += 1
E.set() #NEW:: flaggs end data collection
def P300fun(dataInQ, featureQ):
p300sample = []
p300timestamp = []
while not E.is_set(): #NEW:: loop until collection is ended
if EP300.is_set(): #NEW:: activated when Event is triggered
while dataInQ.qsize():
try:
print("DataInQ has data")
ss, ts = dataInQ.get(0)
print('<>P300\n>>Samples [', ss, ']\nTimestamp [', ts, ']')
except Empty:
return
if not E.is_set(): #NEW:: Event is cleared in case data collection is not over, waiting for a new set()
EP300.clear()
print('Thread Finished')
if __name__ == '__main__':
print('Looking for an EEG stream...')
streams = resolve_stream('type', 'EEG')
inlet = StreamInlet(streams[0])
print('Connected!\n')
AcqTh.start()
P300Th.start()
AcqTh.join()
P300Th.join()
print("\n\n>>>DONE<<<\n\n")
我正在尝试使用 queue
将样本从线程 A(“采集”)传输到线程 B(“P300”),但我无法读取线程 B 中的任何数据,尽管样本 are 在线程 A 中分配。从我的输出来看,我认为我的线程 B 在我的线程 A 开始放入数据之前正在快速测试一些东西。
在下面查看我的代码结构的近似值:
import threading
import queue
from queue import Empty
import numpy as np
import warnings
warnings.filterwarnings("error")
class AcqThread(threading.Thread):
def __init__(self, dataOutQ1, dataOutQ2, stopQ1, stopQ2, saveQ):
threading.Thread.__init__(self)
self.stopQ2 = stopQ2
self.stopQ1 = stopQ1
self.dataOutQ2 = dataOutQ2
self.dataOutQ1 = dataOutQ1
self.saveQ = saveQ
def run(self):
Acquisition(inlet, self.dataOutQ1, self.dataOutQ2, self.stopQ1, self.stopQ2, self.saveQ)
class P300Thread(threading.Thread):
def __init__(self, dataInQ, featureQ, stopQ):
threading.Thread.__init__(self)
self.dataInQ = dataInQ
self.featureQ = featureQ
self.stopQ = stopQ
def run(self):
P300fun(self.dataInQ, self.featureQ, self.stopQ)
threadLock = threading.Lock()
SaveQ = queue.Queue()
DataOutQ1 = queue.Queue()
DataOutQ2 = queue.Queue()
StopQ1 = queue.Queue()
StopQ2 = queue.Queue()
FeatQ1 = queue.Queue()
StopQ1.put(0)
StopQ2.put(0)
#
AcqTh = AcqThread(DataOutQ1, DataOutQ2, StopQ1, StopQ2, SaveQ)
P300Th = P300Thread(DataOutQ1, FeatQ1, StopQ1)
def Acquisition(inlet, dataOutQ1, dataOutQ2, stopQ1, stopQ2, saveQ):
i = 0
print('Starting...')
while i<1250: #i is the number of samples
sample, timestamp = inlet.pull_sample() #samples coming in @ 250Hz
##Normalization, filtering##
threadLock.acquire()
dataOutQ1.put([filtsamples[:,-250:], rawtimestamps[-250:]]) #I only need the last 250 samples
threadLock.release()
i += 1
def P300fun(dataInQ, featureQ, stopQ):
p300sample = []
p300timestamp = []
print(f"Is DataInQ size true? {DataOutQ1.qsize()}")
print("Is dataInQ emtpy?", DataOutQ1.empty())
while dataInQ.qsize(): #or while not dataqueue.empty():
try:
print("DataInQ has data")
ss, ts = dataInQ.get(0)
print('<>P300\n>>Samples [', ss, ']\nTimestamp [', ts, ']')
except Empty:
return
print('Thread Finished')
if __name__ == '__main__':
print('Looking for an EEG stream...')
streams = resolve_stream('type', 'EEG')
inlet = StreamInlet(streams[0])
print('Connected!\n')
AcqTh.start()
P300Th.start()
AcqTh.join()
P300Th.join()
print("\n\n>>>DONE<<<\n\n")
并输出:
Looking for an EEG stream...
Connected!
Is DataInQ size true? 0
Starting...
Is dataInQ emtpy? True
Thread Finished
>>>DONE<<<
在我的研究中,multiprocessing
package).
我该怎么办?我应该从线程 A 中循环调用线程 B 吗?我需要线程 B 上的循环或延迟吗? .join()
部分可能有问题吗?我需要在不久的将来添加更多线程,所以最好首先弄清楚如何只使用两个线程...
感谢所有帮助!
作为菜鸟可能很棘手...所以我将回答我自己的问题,以帮助其他可能也遇到此问题的初学者。
好吧,首先要做的是:不,不可能从一个线程中循环调用一个线程,因为每个线程只能调用一次。
但是有一种方法可以防止线程结束,让它们等待允许它们继续的触发器。经过更多研究,我遇到了 this question that showed me there is a way to create events for threads. The documentation can be found here。它非常简单:事件对象的行为类似于标志,可以是 set()
(表示 True)或 clear()
(表示 False,这是原始值)。要测试事件,可以对布尔问题使用 is_set()
方法,或者使用 wait()
方法代替计时器。就我而言,它为我节省了一些我将要使用的队列:
import threading
import queue
from queue import Empty
import numpy as np
class AcqThread(threading.Thread):
def __init__(self, dataOutQ1, dataOutQ2, saveQ):
threading.Thread.__init__(self)
self.dataOutQ2 = dataOutQ2
self.dataOutQ1 = dataOutQ1
self.saveQ = saveQ
def run(self):
Acquisition(inlet, self.dataOutQ1, self.dataOutQ2, self.saveQ)
class P300Thread(threading.Thread):
def __init__(self, dataInQ, featureQ):
threading.Thread.__init__(self)
self.dataInQ = dataInQ
self.featureQ = featureQ
def run(self):
P300fun(self.dataInQ, self.featureQ)
threadLock = threading.Lock()
SaveQ = queue.Queue()
DataOutQ1 = queue.Queue()
DataOutQ2 = queue.Queue()
FeatQ1 = queue.Queue()
FeatQ2 = queue.Queue()
#NEW:: initializes Events
E = threading.Event()
EP300 = threading.Event()
#
AcqTh = AcqThread(DataOutQ1, DataOutQ2, SaveQ)
P300Th = P300Thread(DataOutQ1, FeatQ1)
它允许我“循环”“调用”线程 B,因为它保持我的第一个线程处于活动状态(因为事件 E)并且仅在设置事件 EP300 时才进入处理部分。然后,完成后EP300被清除:
def Acquisition(inlet, dataOutQ1, dataOutQ2 saveQ):
i = 0
print('Starting...')
while i<1250:
sample, timestamp = inlet.pull_sample()
##Normalization, filtering##
if _condition_:
threadLock.acquire()
dataOutQ1.put([filtsamples[:,-250:], rawtimestamps[-250:]])
threadLock.release()
EP300.set() #NEW:: allows the P300 function to collect data from queue
i += 1
E.set() #NEW:: flaggs end data collection
def P300fun(dataInQ, featureQ):
p300sample = []
p300timestamp = []
while not E.is_set(): #NEW:: loop until collection is ended
if EP300.is_set(): #NEW:: activated when Event is triggered
while dataInQ.qsize():
try:
print("DataInQ has data")
ss, ts = dataInQ.get(0)
print('<>P300\n>>Samples [', ss, ']\nTimestamp [', ts, ']')
except Empty:
return
if not E.is_set(): #NEW:: Event is cleared in case data collection is not over, waiting for a new set()
EP300.clear()
print('Thread Finished')
if __name__ == '__main__':
print('Looking for an EEG stream...')
streams = resolve_stream('type', 'EEG')
inlet = StreamInlet(streams[0])
print('Connected!\n')
AcqTh.start()
P300Th.start()
AcqTh.join()
P300Th.join()
print("\n\n>>>DONE<<<\n\n")