使用apscheduler时如何只登录一次?
How to login only once when using apscheduler?
我想在需要登录的网站上每隔设定的时间(使用 apscheduler)检查新帖子,并从 telegram bot 接收消息。
import requests
from bs4 import BeautifulSoup
import os
import telegram
import sys
from apscheduler.schedulers.blocking import BlockingScheduler
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
def scraping():
headers = {'User-Agent':'Mozilla/5.0'}
LOGIN_URL = 'Login page url'
LOGIN_DATA = {
"user_id":"id",
"password":"pw",
"keep_signed":"Y"
}
with requests.Session() as s:
login_req = s.post(LOGIN_URL, data=LOGIN_DATA, headers=headers)
url = "address"
req = s.get(url, headers=headers)
html = req.text
soup = BeautifulSoup(html, 'html.parser')
title = soup.select('#css values')
latest_title = title[0].text
token = "certain value"
bot = telegram.Bot(token=token)
chat_id = 'id'
with open(os.path.join(BASE_DIR, 'latest.txt'), 'r+') as f_read:
before = f_read.readline()
if before != latest_title:
bot.sendMessage(chat_id=chat_id, text= latest_title)
f_read.close()
with open(os.path.join(BASE_DIR, 'latest.txt'), 'w+') as f_write:
f_write.write(latest_title)
f_write.close()
scheduler = BlockingScheduler()
scheduler.add_job(scraping, 'interval', seconds=30)
scheduler.start()
使用此代码,登录过程也包含在每个间隔中,效率低下。
我怎样才能重复检查帖子,但只用一次登录就可以保持会话活跃?
我以前遇到过类似的问题,通过将会话存储为 redis 中的 pickled 对象解决了这个问题。
当您尝试登录时,获取 pickled 会话,unpickle,然后尝试使用它。如果它不再是有效会话(例如,他们在 api 上使您的登录会话超时),则创建一个新会话。
按照这些思路可能会起作用:
import pickle
import redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
conn = None
def connect(self):
if conn is None:
conn = # your login code here
redis_client.set(
"connection", pickle.dumps(# your session here)
)
connection = redis_client.get("connection")
conn = pickle.loads(connection) if connection else None
connect()
# make connection is not already connected.
timeout = time.time() + 60 * 3 # 3 mins from now
while True:
try:
connected = # code to check if you are connected.. for example get a url.
if not connected:
raise AssertionError()
break
except (AssertionError, ConnectionResetError) as e:
if time.time() <= timeout:
time.sleep(30) # wait 30 sec before retrying
# recreate login
connect()
continue
elif time.time() > timeout:
raise ValueError("Connection failed after timeout.")
else:
raise e
我想在需要登录的网站上每隔设定的时间(使用 apscheduler)检查新帖子,并从 telegram bot 接收消息。
import requests
from bs4 import BeautifulSoup
import os
import telegram
import sys
from apscheduler.schedulers.blocking import BlockingScheduler
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
def scraping():
headers = {'User-Agent':'Mozilla/5.0'}
LOGIN_URL = 'Login page url'
LOGIN_DATA = {
"user_id":"id",
"password":"pw",
"keep_signed":"Y"
}
with requests.Session() as s:
login_req = s.post(LOGIN_URL, data=LOGIN_DATA, headers=headers)
url = "address"
req = s.get(url, headers=headers)
html = req.text
soup = BeautifulSoup(html, 'html.parser')
title = soup.select('#css values')
latest_title = title[0].text
token = "certain value"
bot = telegram.Bot(token=token)
chat_id = 'id'
with open(os.path.join(BASE_DIR, 'latest.txt'), 'r+') as f_read:
before = f_read.readline()
if before != latest_title:
bot.sendMessage(chat_id=chat_id, text= latest_title)
f_read.close()
with open(os.path.join(BASE_DIR, 'latest.txt'), 'w+') as f_write:
f_write.write(latest_title)
f_write.close()
scheduler = BlockingScheduler()
scheduler.add_job(scraping, 'interval', seconds=30)
scheduler.start()
使用此代码,登录过程也包含在每个间隔中,效率低下。
我怎样才能重复检查帖子,但只用一次登录就可以保持会话活跃?
我以前遇到过类似的问题,通过将会话存储为 redis 中的 pickled 对象解决了这个问题。
当您尝试登录时,获取 pickled 会话,unpickle,然后尝试使用它。如果它不再是有效会话(例如,他们在 api 上使您的登录会话超时),则创建一个新会话。
按照这些思路可能会起作用:
import pickle
import redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
conn = None
def connect(self):
if conn is None:
conn = # your login code here
redis_client.set(
"connection", pickle.dumps(# your session here)
)
connection = redis_client.get("connection")
conn = pickle.loads(connection) if connection else None
connect()
# make connection is not already connected.
timeout = time.time() + 60 * 3 # 3 mins from now
while True:
try:
connected = # code to check if you are connected.. for example get a url.
if not connected:
raise AssertionError()
break
except (AssertionError, ConnectionResetError) as e:
if time.time() <= timeout:
time.sleep(30) # wait 30 sec before retrying
# recreate login
connect()
continue
elif time.time() > timeout:
raise ValueError("Connection failed after timeout.")
else:
raise e