Raspi python 多线程,我想一直引用一个全局变量
Raspi python MultiThreading, I want to keep referencing one global variable
这是我的代码
RPi.GPIO
是raspberrypi GPIO输入。
多线程
import threading
import time
import test_razer
global email_flag
email_flag = 0
def Mail():
global email_flag
while True:
print("Mail - flipping to 1")
email_flag = 1
time.sleep(5)
print("Mail - flipping to 0")
email_flag = 0
time.sleep(5)
def razer():
global email_flag
while True:
time.sleep(1)
print("Razer reports flag as:",email_flag)
test_razer.razer(email_flag)
thread_Mail = threading.Thread(target=Mail)
thread_Mail.start()
thread_razer = threading.Thread(target=razer)
thread_razer.start()
test_razer
import time
def razer(A1):
while(True):
print("result: %d"%A1)
time.sleep(1)
结果
Mail - flipping to 1
Razer reports flag as: 1
result: 1
result: 1
result: 1
result: 1
Mail - flipping to 0
result: 1
result: 1
result: 1
result: 1
result: 1
Mail - flipping to 1
result: 1
result: 1
result: 1
result: 1
我想要的是实时引用email_flag
中的值。
但是,该值仅在第一次调用该函数时被引用。
while:razer(email_flag)
是不允许的。在razer()
中,它加载了raspberry pi的模块,还有一个count
和while
语句。
似乎没有办法寻找它。
我该怎么办?
ps。
原创test_razer
import RPi.GPIO as GPIO
import VL53L1X
import time
import sys
import signal
from subprocess import call
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
push_red = 22
push_white = 23
GPIO.setup(push_red, GPIO.IN)
GPIO.setup(push_white, GPIO.IN)
#
tof = VL53L1X.VL53L1X(i2c_bus=1, i2c_address=0x29)
tof.open()
tof.start_rangin(1)
running = True
def exit_handler(signal, frame):
global running
running = False
tof.stop_ranging()
print()
sys.exit(0)
def runtof(email_flag, main_flag):
count = 0
while running:
if GPIO.input(push_red) == 1 or GPIO.input(push_white) == 1:
distance_in_mm = tof.get_distance()
print("distance: {}mm".format(distance_in_mm)) #
if 0 < distance_in_mm <= 500:
count += 1
time.sleep(0.3)
if count == 3:
count = 0
call(["mpg123", "Razer_voice.mp3"])
time.sleep(1)
else:
count = 0
if GPIO.input(push_red) == 0 or GPIO.input(push_white) == 0:
while email_flag:
time.sleep(1)
while main_flag:
time.sleep(1)
time.sleep(0.1)
好的,完成重写答案。
您将需要创建另一个模块来定义一个 super
全局的 super
全局模块。
N.B。您会注意到我没有在您调用的地方调用 test_razer.razer()
,因为 test_razer.razer()
本身会执行一个无限的 while
循环。相反,我给它,它是自己的线程。否则 main 的 razer 例程只会执行一次。
config_var.py
email_flag = 0
test_razer.py
import time
import config_var
def razer():
while(True):
print("Test Razer reports: %d"%config_var.email_flag)
time.sleep(0.5)
main.py
import threading
import time
import config_var
import test_razer
def Mail():
while True:
print("Mail - flipping to 1")
config_var.email_flag = 1
time.sleep(5)
print("Mail - flipping to 0")
config_var.email_flag = 0
time.sleep(5)
def razer():
while True:
time.sleep(1)
print("Razer reports flag as:",config_var.email_flag)
#test_razer.razer()
thread_Mail = threading.Thread(target=Mail)
thread_Mail.start()
thread_razer = threading.Thread(target=razer)
thread_razer.start()
thread_test_razer = threading.Thread(target=test_razer.razer)
thread_test_razer.start()
结果:
Mail - flipping to 1
Test Razer reports: 1
Test Razer reports: 1
Razer reports flag as: 1
Test Razer reports: 1
Test Razer reports: 1
Razer reports flag as: 1
Test Razer reports: 1
Test Razer reports: 1
Razer reports flag as: 1
Test Razer reports: 1
Test Razer reports: 1
Razer reports flag as: 1
Test Razer reports: 1
Test Razer reports: 1
Mail - flipping to 0
Razer reports flag as: 0
Test Razer reports: 0
Test Razer reports: 0
Razer reports flag as: 0
Test Razer reports: 0
Test Razer reports: 0
Razer reports flag as: 0
Test Razer reports: 0
Test Razer reports: 0
Razer reports flag as: 0
这是我的代码
RPi.GPIO
是raspberrypi GPIO输入。
多线程
import threading
import time
import test_razer
global email_flag
email_flag = 0
def Mail():
global email_flag
while True:
print("Mail - flipping to 1")
email_flag = 1
time.sleep(5)
print("Mail - flipping to 0")
email_flag = 0
time.sleep(5)
def razer():
global email_flag
while True:
time.sleep(1)
print("Razer reports flag as:",email_flag)
test_razer.razer(email_flag)
thread_Mail = threading.Thread(target=Mail)
thread_Mail.start()
thread_razer = threading.Thread(target=razer)
thread_razer.start()
test_razer
import time
def razer(A1):
while(True):
print("result: %d"%A1)
time.sleep(1)
结果
Mail - flipping to 1
Razer reports flag as: 1
result: 1
result: 1
result: 1
result: 1
Mail - flipping to 0
result: 1
result: 1
result: 1
result: 1
result: 1
Mail - flipping to 1
result: 1
result: 1
result: 1
result: 1
我想要的是实时引用email_flag
中的值。
但是,该值仅在第一次调用该函数时被引用。
while:razer(email_flag)
是不允许的。在razer()
中,它加载了raspberry pi的模块,还有一个count
和while
语句。
似乎没有办法寻找它。
我该怎么办?
ps。 原创test_razer
import RPi.GPIO as GPIO
import VL53L1X
import time
import sys
import signal
from subprocess import call
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
push_red = 22
push_white = 23
GPIO.setup(push_red, GPIO.IN)
GPIO.setup(push_white, GPIO.IN)
#
tof = VL53L1X.VL53L1X(i2c_bus=1, i2c_address=0x29)
tof.open()
tof.start_rangin(1)
running = True
def exit_handler(signal, frame):
global running
running = False
tof.stop_ranging()
print()
sys.exit(0)
def runtof(email_flag, main_flag):
count = 0
while running:
if GPIO.input(push_red) == 1 or GPIO.input(push_white) == 1:
distance_in_mm = tof.get_distance()
print("distance: {}mm".format(distance_in_mm)) #
if 0 < distance_in_mm <= 500:
count += 1
time.sleep(0.3)
if count == 3:
count = 0
call(["mpg123", "Razer_voice.mp3"])
time.sleep(1)
else:
count = 0
if GPIO.input(push_red) == 0 or GPIO.input(push_white) == 0:
while email_flag:
time.sleep(1)
while main_flag:
time.sleep(1)
time.sleep(0.1)
好的,完成重写答案。
您将需要创建另一个模块来定义一个 super
全局的 super
全局模块。
N.B。您会注意到我没有在您调用的地方调用 test_razer.razer()
,因为 test_razer.razer()
本身会执行一个无限的 while
循环。相反,我给它,它是自己的线程。否则 main 的 razer 例程只会执行一次。
config_var.py
email_flag = 0
test_razer.py
import time
import config_var
def razer():
while(True):
print("Test Razer reports: %d"%config_var.email_flag)
time.sleep(0.5)
main.py
import threading
import time
import config_var
import test_razer
def Mail():
while True:
print("Mail - flipping to 1")
config_var.email_flag = 1
time.sleep(5)
print("Mail - flipping to 0")
config_var.email_flag = 0
time.sleep(5)
def razer():
while True:
time.sleep(1)
print("Razer reports flag as:",config_var.email_flag)
#test_razer.razer()
thread_Mail = threading.Thread(target=Mail)
thread_Mail.start()
thread_razer = threading.Thread(target=razer)
thread_razer.start()
thread_test_razer = threading.Thread(target=test_razer.razer)
thread_test_razer.start()
结果:
Mail - flipping to 1
Test Razer reports: 1
Test Razer reports: 1
Razer reports flag as: 1
Test Razer reports: 1
Test Razer reports: 1
Razer reports flag as: 1
Test Razer reports: 1
Test Razer reports: 1
Razer reports flag as: 1
Test Razer reports: 1
Test Razer reports: 1
Razer reports flag as: 1
Test Razer reports: 1
Test Razer reports: 1
Mail - flipping to 0
Razer reports flag as: 0
Test Razer reports: 0
Test Razer reports: 0
Razer reports flag as: 0
Test Razer reports: 0
Test Razer reports: 0
Razer reports flag as: 0
Test Razer reports: 0
Test Razer reports: 0
Razer reports flag as: 0