如何在一段时间内(例如从上午 10 点到 12:30pm)start/stop 一个 Python 函数?
How to start/stop a Python function within a time period (ex. from 10 am to 12:30pm)?
我正在尝试创建一个函数(例如 def startTime()
)来执行另一个函数,例如 def runFunc()
,每天上午 10 点使用 python 脚本执行并自动停止(或脚本结束)在 12:30 下午。
示例:startTime(start_time, stop_time,runFunc)
有人可以帮我吗?
我正在尝试安排 startTime
从上午 10 点到 12:30 下午。
import threading
import schedule
import time
def runFunc(interval, innerFunc, iterations = 0):
if iterations != 1:
threading.Timer (interval,runFunc, [interval, innerFunc , 0 ]).start ()
innerFunc ()
def A():
print "Hello World- A"
def B():
print "Hello World- B"
我试过了但是没用:
def startTime(job):
schedule.every().day.at("10:00").do(job)
while True:
schedule.run_pending()
startTime(runFunc(60,A))
startTime(runFunc(300,B))
runFunc(60,A
) 运行良好,但无法将 runFunc 安排在上午 10 点到 12:30 下午。
另一种方式
from datetime import datetime, time
now = datetime.now()
now_time = now.time()
now_time
if time(5,27) <= now.time() <= time(5,28):
runFunc(10,A)
runFunc
确实停止了,但在结束时间后继续执行。
整个故事有点复杂,很大程度上取决于您对脚本的真正需求。
例如,这段代码可以正常工作:
import threading
import schedule
import time
import datetime
import sys
def test():
print('{} This is a test'.format(datetime.datetime.now())) #this works ok
def exit():
print('{} Now the system will exit '.format(datetime.datetime.now())) #this works ok
sys.exit()
schedule.every().day.at("09:57").do(test)
schedule.every().day.at('09:58').do(exit)
while True:
schedule.run_pending()
time.sleep(1)
您将在终端中看到 "test message",一分钟后您将看到 "exit message",这实际上终止了脚本。
但是如果你在上面的函数测试中应用一些循环,比如:
def test():
while True:
print "This is a test"
time.sleep(5)
那么脚本将不会退出。
实际上,函数 exit 甚至不会被调用,因为 Python 被函数测试中的 while 循环困住,并且会一直继续下去。
计划文档指出计划作业是按顺序调用的,因此如果前一个作业未完成,则下一个作业实际上不会开始。
我怀疑你的目的是让一种功能运行在10:00持续不断,你想在12:30强制停止这个功能。如果不是这样,您的主要功能将在他完成工作后立即退出,您不需要时间框架。
在这种情况下,为了解决 Python & Schedule 的序列化方式,您需要使用线程。
结合 "how to execute jobs in parallel" section and info from other answers in Overflow like how to stop a running thread 上的计划文档中的信息,此示例在我的 Python 2.7:
电脑上运行良好
import threading
import schedule
import time
import datetime
import sys
def doit(stop_event, arg):
while not stop_event.wait(1):
#By wait(1) you repeat the loop every 1 sec.
#Applying wait(0) , loops run immediatelly until to be stopped by stop_event
print ("working on %s" % arg)
print("Stopping as you wish.")
def startit():
global pill2kill
global t
pill2kill = threading.Event()
t = threading.Thread(target=doit, args=(pill2kill, "task"))
t.start()
def stopit():
global pill2kill
global t
pill2kill.set()
t.join()
#startit() #Manual call for Testing
#time.sleep(5) #Wait 5 seconds
#stopit() #Manual call for Testing
schedule.every().day.at("12:48").do(startit)
schedule.every().day.at('12:49').do(stopit)
#schedule.every().day.at("12:50").do(startit) #Uncomment this to recall it for testing
#schedule.every().day.at('12:51').do(stopit) #Unocmment this to recall it for testing
while 1:
schedule.run_pending()
time.sleep(1)
您还可以查看 Python Crontab library 以符合您的需要。
PS: 顺便说一句,快速查看Python Schedule Lib 的源代码,似乎整个故事都是通过捕获整个脚本并不断比较date.now() 日期设置为 运行 一份工作。这个逻辑可以用几个默认命令和一个无限的主循环来重构,以连续比较日期(就像 Schedule Lib 所做的那样)。
This post 有一些很好的片段可以让你自己做 cron 作业,但只是为了测试这个简化的脚本在没有外部库的情况下也能正常工作,当 datetime.now 在所需的 start/stop 框架内时调用函数测试.
from datetime import datetime
import time
def test():
global hasrun
print('{} This is a test'.format(datetime.now()))
time.sleep(5)
hasrun=True
year,month,day,hour,minute=2016,12,23,15,55
hasrun=False
now=datetime.now()
print "Now the time is :", now
jobstart=datetime(year,month,day,hour,minute)
jobstop=datetime(year,month, day,hour,minute+1)
print "Job will run at: ", jobstart
print "Job will finish at: ", jobstop
#print datetime.now() - jobstart
while True:
while ((datetime.now() > jobstart) and (datetime.now() < jobstop )):
test()
else:
print('{} Please Wait...'.format(datetime.now()))
if hasrun:
# day=day+1
minute=minute+2 #Just for Testing
jobstart=datetime(year,month,day,hour,minute)
jobstop=datetime(year,month, day,hour,minute+1)
print "the job will run again ", jobstart
print "and will finish at ", jobstop
hasrun=False
time.sleep(5)
我正在尝试创建一个函数(例如 def startTime()
)来执行另一个函数,例如 def runFunc()
,每天上午 10 点使用 python 脚本执行并自动停止(或脚本结束)在 12:30 下午。
示例:startTime(start_time, stop_time,runFunc)
有人可以帮我吗?
我正在尝试安排 startTime
从上午 10 点到 12:30 下午。
import threading
import schedule
import time
def runFunc(interval, innerFunc, iterations = 0):
if iterations != 1:
threading.Timer (interval,runFunc, [interval, innerFunc , 0 ]).start ()
innerFunc ()
def A():
print "Hello World- A"
def B():
print "Hello World- B"
我试过了但是没用:
def startTime(job):
schedule.every().day.at("10:00").do(job)
while True:
schedule.run_pending()
startTime(runFunc(60,A))
startTime(runFunc(300,B))
runFunc(60,A
) 运行良好,但无法将 runFunc 安排在上午 10 点到 12:30 下午。
另一种方式
from datetime import datetime, time
now = datetime.now()
now_time = now.time()
now_time
if time(5,27) <= now.time() <= time(5,28):
runFunc(10,A)
runFunc
确实停止了,但在结束时间后继续执行。
整个故事有点复杂,很大程度上取决于您对脚本的真正需求。 例如,这段代码可以正常工作:
import threading
import schedule
import time
import datetime
import sys
def test():
print('{} This is a test'.format(datetime.datetime.now())) #this works ok
def exit():
print('{} Now the system will exit '.format(datetime.datetime.now())) #this works ok
sys.exit()
schedule.every().day.at("09:57").do(test)
schedule.every().day.at('09:58').do(exit)
while True:
schedule.run_pending()
time.sleep(1)
您将在终端中看到 "test message",一分钟后您将看到 "exit message",这实际上终止了脚本。
但是如果你在上面的函数测试中应用一些循环,比如:
def test():
while True:
print "This is a test"
time.sleep(5)
那么脚本将不会退出。 实际上,函数 exit 甚至不会被调用,因为 Python 被函数测试中的 while 循环困住,并且会一直继续下去。
计划文档指出计划作业是按顺序调用的,因此如果前一个作业未完成,则下一个作业实际上不会开始。
我怀疑你的目的是让一种功能运行在10:00持续不断,你想在12:30强制停止这个功能。如果不是这样,您的主要功能将在他完成工作后立即退出,您不需要时间框架。
在这种情况下,为了解决 Python & Schedule 的序列化方式,您需要使用线程。
结合 "how to execute jobs in parallel" section and info from other answers in Overflow like how to stop a running thread 上的计划文档中的信息,此示例在我的 Python 2.7:
电脑上运行良好import threading
import schedule
import time
import datetime
import sys
def doit(stop_event, arg):
while not stop_event.wait(1):
#By wait(1) you repeat the loop every 1 sec.
#Applying wait(0) , loops run immediatelly until to be stopped by stop_event
print ("working on %s" % arg)
print("Stopping as you wish.")
def startit():
global pill2kill
global t
pill2kill = threading.Event()
t = threading.Thread(target=doit, args=(pill2kill, "task"))
t.start()
def stopit():
global pill2kill
global t
pill2kill.set()
t.join()
#startit() #Manual call for Testing
#time.sleep(5) #Wait 5 seconds
#stopit() #Manual call for Testing
schedule.every().day.at("12:48").do(startit)
schedule.every().day.at('12:49').do(stopit)
#schedule.every().day.at("12:50").do(startit) #Uncomment this to recall it for testing
#schedule.every().day.at('12:51').do(stopit) #Unocmment this to recall it for testing
while 1:
schedule.run_pending()
time.sleep(1)
您还可以查看 Python Crontab library 以符合您的需要。
PS: 顺便说一句,快速查看Python Schedule Lib 的源代码,似乎整个故事都是通过捕获整个脚本并不断比较date.now() 日期设置为 运行 一份工作。这个逻辑可以用几个默认命令和一个无限的主循环来重构,以连续比较日期(就像 Schedule Lib 所做的那样)。
This post 有一些很好的片段可以让你自己做 cron 作业,但只是为了测试这个简化的脚本在没有外部库的情况下也能正常工作,当 datetime.now 在所需的 start/stop 框架内时调用函数测试.
from datetime import datetime
import time
def test():
global hasrun
print('{} This is a test'.format(datetime.now()))
time.sleep(5)
hasrun=True
year,month,day,hour,minute=2016,12,23,15,55
hasrun=False
now=datetime.now()
print "Now the time is :", now
jobstart=datetime(year,month,day,hour,minute)
jobstop=datetime(year,month, day,hour,minute+1)
print "Job will run at: ", jobstart
print "Job will finish at: ", jobstop
#print datetime.now() - jobstart
while True:
while ((datetime.now() > jobstart) and (datetime.now() < jobstop )):
test()
else:
print('{} Please Wait...'.format(datetime.now()))
if hasrun:
# day=day+1
minute=minute+2 #Just for Testing
jobstart=datetime(year,month,day,hour,minute)
jobstop=datetime(year,month, day,hour,minute+1)
print "the job will run again ", jobstart
print "and will finish at ", jobstop
hasrun=False
time.sleep(5)