运行 python 脚本每小时
Run python script every hour
我希望每小时安排 运行 我的 python 脚本并将数据保存在 elasticsearch 索引中。所以我使用了我编写的函数 set_interval,它使用了 tweepy 库。但它不起作用,因为我需要它工作。每分钟 运行s 并将数据保存在索引中。即使在秒数等于 3600 之后,它每分钟 运行s。但我想按小时将其配置为 运行。
我该如何解决这个问题?这是我的 python 脚本:
def call_at_interval(time, callback, args):
while True:
timer = Timer(time, callback, args=args)
timer.start()
timer.join()
def set_interval(time, callback, *args):
Thread(target=call_at_interval, args=(time, callback, args)).start()
def get_all_tweets(screen_name):
# authorize twitter, initialize tweepy
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth)
screen_name = ""
# initialize a list to hold all the tweepy Tweets
alltweets = []
# make initial request for most recent tweets (200 is the maximum allowed count)
new_tweets = api.user_timeline(screen_name=screen_name, count=200)
# save most recent tweets
alltweets.extend(new_tweets)
# save the id of the oldest tweet less one
oldest = alltweets[-1].id - 1
# keep grabbing tweets until there are no tweets left to grab
while len(new_tweets) > 0:
#print
#"getting tweets before %s" % (oldest)
# all subsiquent requests use the max_id param to prevent duplicates
new_tweets = api.user_timeline(screen_name=screen_name, count=200, max_id=oldest)
# save most recent tweets
alltweets.extend(new_tweets)
# update the id of the oldest tweet less one
oldest = alltweets[-1].id - 1
#print
#"...%s tweets downloaded so far" % (len(alltweets))
outtweets = [{'ID': tweet.id_str, 'Text': tweet.text, 'Date': tweet.created_at, 'author': tweet.user.screen_name} for tweet in alltweets]
def save_es(outtweets, es): # Peps8 convention
data = [ # Please without s in data
{
"_index": "index name",
"_type": "type name",
"_id": index,
"_source": ID
}
for index, ID in enumerate(outtweets)
]
helpers.bulk(es, data)
save_es(outtweets, es)
print('Run at:')
print(datetime.now())
print("\n")
set_interval(3600, get_all_tweets(screen_name))
摆脱所有定时器代码只写逻辑和
cron 将为您完成这项工作,将其添加到 crontab -e
之后的文件末尾
0 * * * * /path/to/python /path/to/script.py
0 * * * *
表示 运行 每 零 分钟你可以找到更多解释 here
我还注意到你正在递归调用 get_all_tweets(screen_name)
我认为你可能必须从外部调用它
你的剧本就这么点吧
def get_all_tweets(screen_name):
# authorize twitter, initialize tweepy
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth)
screen_name = ""
# initialize a list to hold all the tweepy Tweets
alltweets = []
# make initial request for most recent tweets (200 is the maximum allowed count)
new_tweets = api.user_timeline(screen_name=screen_name, count=200)
# save most recent tweets
alltweets.extend(new_tweets)
# save the id of the oldest tweet less one
oldest = alltweets[-1].id - 1
# keep grabbing tweets until there are no tweets left to grab
while len(new_tweets) > 0:
#print
#"getting tweets before %s" % (oldest)
# all subsiquent requests use the max_id param to prevent duplicates
new_tweets = api.user_timeline(screen_name=screen_name, count=200, max_id=oldest)
# save most recent tweets
alltweets.extend(new_tweets)
# update the id of the oldest tweet less one
oldest = alltweets[-1].id - 1
#print
#"...%s tweets downloaded so far" % (len(alltweets))
outtweets = [{'ID': tweet.id_str, 'Text': tweet.text, 'Date': tweet.created_at, 'author': tweet.user.screen_name} for tweet in alltweets]
def save_es(outtweets, es): # Peps8 convention
data = [ # Please without s in data
{
"_index": "index name",
"_type": "type name",
"_id": index,
"_source": ID
}
for index, ID in enumerate(outtweets)
]
helpers.bulk(es, data)
save_es(outtweets, es)
get_all_tweets("") #your screen name here
为什么每个小时都需要如此复杂的任务来完成?您可以按照下面的方式 运行 每隔一小时编写一次脚本,请注意 运行需要 1 小时 + 时间来完成工作:
import time
def do_some_work():
print("Do some work")
time.sleep(1)
print("Some work is done!")
if __name__ == "__main__":
time.sleep(60) # imagine you would like to start work in 1 minute first time
while True:
do_some_work()
time.sleep(3600) # do work every one hour
如果您想 运行 每隔一小时执行一次脚本,请执行以下代码:
import time
import threading
def do_some_work():
print("Do some work")
time.sleep(4)
print("Some work is done!")
if __name__ == "__main__":
time.sleep(60) # imagine you would like to start work in 1 minute first time
while True:
thr = threading.Thread(target=do_some_work)
thr.start()
time.sleep(3600) # do work every one hour
在这种情况下,thr 应该比 3600 秒更快地完成它的工作,虽然它没有,但您仍然会得到结果,但结果将来自另一次尝试,请参见下面的示例:
import time
import threading
class AttemptCount:
def __init__(self, attempt_number):
self.attempt_number = attempt_number
def do_some_work(_attempt_number):
print(f"Do some work {_attempt_number.attempt_number}")
time.sleep(4)
print(f"Some work is done! {_attempt_number.attempt_number}")
_attempt_number.attempt_number += 1
if __name__ == "__main__":
attempt_number = AttemptCount(1)
time.sleep(1) # imagine you would like to start work in 1 minute first time
while True:
thr = threading.Thread(target=do_some_work, args=(attempt_number, ),)
thr.start()
time.sleep(1) # do work every one hour
你会得到的结果是:
做一些工作 1
做一些工作 1
做一些工作 1
做一些工作 1
完成了一些工作! 1个
做一些工作 2
完成了一些工作! 2个
做一些工作 3
完成了一些工作! 3个
做一些工作 4
完成了一些工作! 4个
做一些工作 5
完成了一些工作! 5个
做一些工作 6
完成了一些工作! 6个
做一些工作 7
完成了一些工作! 7
做一些工作 8
完成了一些工作! 8个
做一些工作 9
我喜欢使用 subprocess.Popen 来完成这样的任务,如果子进程由于任何原因没有在一小时内完成它的工作,你只需终止它并开始一个新的。
您也可以使用 CRON 安排一些进程每隔一小时运行。
我希望每小时安排 运行 我的 python 脚本并将数据保存在 elasticsearch 索引中。所以我使用了我编写的函数 set_interval,它使用了 tweepy 库。但它不起作用,因为我需要它工作。每分钟 运行s 并将数据保存在索引中。即使在秒数等于 3600 之后,它每分钟 运行s。但我想按小时将其配置为 运行。
我该如何解决这个问题?这是我的 python 脚本:
def call_at_interval(time, callback, args):
while True:
timer = Timer(time, callback, args=args)
timer.start()
timer.join()
def set_interval(time, callback, *args):
Thread(target=call_at_interval, args=(time, callback, args)).start()
def get_all_tweets(screen_name):
# authorize twitter, initialize tweepy
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth)
screen_name = ""
# initialize a list to hold all the tweepy Tweets
alltweets = []
# make initial request for most recent tweets (200 is the maximum allowed count)
new_tweets = api.user_timeline(screen_name=screen_name, count=200)
# save most recent tweets
alltweets.extend(new_tweets)
# save the id of the oldest tweet less one
oldest = alltweets[-1].id - 1
# keep grabbing tweets until there are no tweets left to grab
while len(new_tweets) > 0:
#print
#"getting tweets before %s" % (oldest)
# all subsiquent requests use the max_id param to prevent duplicates
new_tweets = api.user_timeline(screen_name=screen_name, count=200, max_id=oldest)
# save most recent tweets
alltweets.extend(new_tweets)
# update the id of the oldest tweet less one
oldest = alltweets[-1].id - 1
#print
#"...%s tweets downloaded so far" % (len(alltweets))
outtweets = [{'ID': tweet.id_str, 'Text': tweet.text, 'Date': tweet.created_at, 'author': tweet.user.screen_name} for tweet in alltweets]
def save_es(outtweets, es): # Peps8 convention
data = [ # Please without s in data
{
"_index": "index name",
"_type": "type name",
"_id": index,
"_source": ID
}
for index, ID in enumerate(outtweets)
]
helpers.bulk(es, data)
save_es(outtweets, es)
print('Run at:')
print(datetime.now())
print("\n")
set_interval(3600, get_all_tweets(screen_name))
摆脱所有定时器代码只写逻辑和
cron 将为您完成这项工作,将其添加到 crontab -e
0 * * * * /path/to/python /path/to/script.py
0 * * * *
表示 运行 每 零 分钟你可以找到更多解释 here
我还注意到你正在递归调用 get_all_tweets(screen_name)
我认为你可能必须从外部调用它
你的剧本就这么点吧
def get_all_tweets(screen_name):
# authorize twitter, initialize tweepy
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth)
screen_name = ""
# initialize a list to hold all the tweepy Tweets
alltweets = []
# make initial request for most recent tweets (200 is the maximum allowed count)
new_tweets = api.user_timeline(screen_name=screen_name, count=200)
# save most recent tweets
alltweets.extend(new_tweets)
# save the id of the oldest tweet less one
oldest = alltweets[-1].id - 1
# keep grabbing tweets until there are no tweets left to grab
while len(new_tweets) > 0:
#print
#"getting tweets before %s" % (oldest)
# all subsiquent requests use the max_id param to prevent duplicates
new_tweets = api.user_timeline(screen_name=screen_name, count=200, max_id=oldest)
# save most recent tweets
alltweets.extend(new_tweets)
# update the id of the oldest tweet less one
oldest = alltweets[-1].id - 1
#print
#"...%s tweets downloaded so far" % (len(alltweets))
outtweets = [{'ID': tweet.id_str, 'Text': tweet.text, 'Date': tweet.created_at, 'author': tweet.user.screen_name} for tweet in alltweets]
def save_es(outtweets, es): # Peps8 convention
data = [ # Please without s in data
{
"_index": "index name",
"_type": "type name",
"_id": index,
"_source": ID
}
for index, ID in enumerate(outtweets)
]
helpers.bulk(es, data)
save_es(outtweets, es)
get_all_tweets("") #your screen name here
为什么每个小时都需要如此复杂的任务来完成?您可以按照下面的方式 运行 每隔一小时编写一次脚本,请注意 运行需要 1 小时 + 时间来完成工作:
import time
def do_some_work():
print("Do some work")
time.sleep(1)
print("Some work is done!")
if __name__ == "__main__":
time.sleep(60) # imagine you would like to start work in 1 minute first time
while True:
do_some_work()
time.sleep(3600) # do work every one hour
如果您想 运行 每隔一小时执行一次脚本,请执行以下代码:
import time
import threading
def do_some_work():
print("Do some work")
time.sleep(4)
print("Some work is done!")
if __name__ == "__main__":
time.sleep(60) # imagine you would like to start work in 1 minute first time
while True:
thr = threading.Thread(target=do_some_work)
thr.start()
time.sleep(3600) # do work every one hour
在这种情况下,thr 应该比 3600 秒更快地完成它的工作,虽然它没有,但您仍然会得到结果,但结果将来自另一次尝试,请参见下面的示例:
import time
import threading
class AttemptCount:
def __init__(self, attempt_number):
self.attempt_number = attempt_number
def do_some_work(_attempt_number):
print(f"Do some work {_attempt_number.attempt_number}")
time.sleep(4)
print(f"Some work is done! {_attempt_number.attempt_number}")
_attempt_number.attempt_number += 1
if __name__ == "__main__":
attempt_number = AttemptCount(1)
time.sleep(1) # imagine you would like to start work in 1 minute first time
while True:
thr = threading.Thread(target=do_some_work, args=(attempt_number, ),)
thr.start()
time.sleep(1) # do work every one hour
你会得到的结果是:
做一些工作 1 做一些工作 1 做一些工作 1 做一些工作 1 完成了一些工作! 1个 做一些工作 2 完成了一些工作! 2个 做一些工作 3 完成了一些工作! 3个 做一些工作 4 完成了一些工作! 4个 做一些工作 5 完成了一些工作! 5个 做一些工作 6 完成了一些工作! 6个 做一些工作 7 完成了一些工作! 7 做一些工作 8 完成了一些工作! 8个 做一些工作 9
我喜欢使用 subprocess.Popen 来完成这样的任务,如果子进程由于任何原因没有在一小时内完成它的工作,你只需终止它并开始一个新的。
您也可以使用 CRON 安排一些进程每隔一小时运行。