使用aiogram创建后台进程
Create a background process using aiogram
我正在尝试在我正在使用的使用 aiogram 的电报机器人中发送加密货币的价格警报。我遇到的问题是我不确定如何启动一个函数作为后台、非阻塞线程,然后继续启动调度程序。我知道如何使用标准同步电报机器人来做到这一点,但我对我应该用 aiogram 做什么感到困惑。我读到我可以使用 dp.loop.create_task
但这会引发错误 Nonetype has no attribute create_task
。这是我试图用以下代码执行这些线程的代码:
print('Starting watchlist process, this needs to run as a non blocking daemon...')
dp.loop.create_task(wl.start_process())
print('Starting broadcaster, this needs to run as a non blocking daemon ... ')
dp.loop.create_task(broadcaster())
print('Starting the bot ...')
executor.start_polling(dp, skip_updates=True)
我只需要 wl.start_process
和 broadcaster
函数到 运行 在后台。我该如何实现?
这是start_process
:
async def start_process(self):
"""
Start the watchlist process.
:return:
"""
threading.Thread(target=self.do_schedule).start()
await self.loop_check_watchlist()
这是broadcaster
:
async def broadcaster():
count = 0
while True:
uid_alerts = que.__next__()
if uid_alerts:
for i in uid_alerts:
uid = i[0]
alert = i[1]
try:
if await send_message(uid, alert):
count += 1
await asyncio.sleep(.05) # 20 messages per second (Limit: 30 messages per second)
finally:
log.info(f"{count} messages successful sent.")
我为您创建了以下工作示例,它演示了如何在 bot 启动时创建后台任务以及如何在用户命令的帮助下创建后台任务。
from aiogram import Dispatcher, executor, Bot
from aiogram import types
import asyncio
import logging
TOKEN = "your token!"
async def background_on_start() -> None:
"""background task which is created when bot starts"""
while True:
await asyncio.sleep(5)
print("Hello World!")
async def background_on_action() -> None:
"""background task which is created when user asked"""
for _ in range(20):
await asyncio.sleep(3)
print("Action!")
async def background_task_creator(message: types.Message) -> None:
"""Creates background tasks"""
asyncio.create_task(background_on_action())
await message.reply("Another one background task create")
async def on_bot_start_up(dispatcher: Dispatcher) -> None:
"""List of actions which should be done before bot start"""
asyncio.create_task(background_on_start()) # creates background task
def create_bot_factory() -> None:
"""Creates and starts the bot"""
dp = Dispatcher(Bot(token=TOKEN))
# bot endpoints block:
dp.register_message_handler(
background_task_creator,
commands=['start']
)
# start bot
executor.start_polling(dp, skip_updates=True, on_startup=on_bot_start_up)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
create_bot_factory()
我正在尝试在我正在使用的使用 aiogram 的电报机器人中发送加密货币的价格警报。我遇到的问题是我不确定如何启动一个函数作为后台、非阻塞线程,然后继续启动调度程序。我知道如何使用标准同步电报机器人来做到这一点,但我对我应该用 aiogram 做什么感到困惑。我读到我可以使用 dp.loop.create_task
Nonetype has no attribute create_task
。这是我试图用以下代码执行这些线程的代码:
print('Starting watchlist process, this needs to run as a non blocking daemon...')
dp.loop.create_task(wl.start_process())
print('Starting broadcaster, this needs to run as a non blocking daemon ... ')
dp.loop.create_task(broadcaster())
print('Starting the bot ...')
executor.start_polling(dp, skip_updates=True)
我只需要 wl.start_process
和 broadcaster
函数到 运行 在后台。我该如何实现?
这是start_process
:
async def start_process(self):
"""
Start the watchlist process.
:return:
"""
threading.Thread(target=self.do_schedule).start()
await self.loop_check_watchlist()
这是broadcaster
:
async def broadcaster():
count = 0
while True:
uid_alerts = que.__next__()
if uid_alerts:
for i in uid_alerts:
uid = i[0]
alert = i[1]
try:
if await send_message(uid, alert):
count += 1
await asyncio.sleep(.05) # 20 messages per second (Limit: 30 messages per second)
finally:
log.info(f"{count} messages successful sent.")
我为您创建了以下工作示例,它演示了如何在 bot 启动时创建后台任务以及如何在用户命令的帮助下创建后台任务。
from aiogram import Dispatcher, executor, Bot
from aiogram import types
import asyncio
import logging
TOKEN = "your token!"
async def background_on_start() -> None:
"""background task which is created when bot starts"""
while True:
await asyncio.sleep(5)
print("Hello World!")
async def background_on_action() -> None:
"""background task which is created when user asked"""
for _ in range(20):
await asyncio.sleep(3)
print("Action!")
async def background_task_creator(message: types.Message) -> None:
"""Creates background tasks"""
asyncio.create_task(background_on_action())
await message.reply("Another one background task create")
async def on_bot_start_up(dispatcher: Dispatcher) -> None:
"""List of actions which should be done before bot start"""
asyncio.create_task(background_on_start()) # creates background task
def create_bot_factory() -> None:
"""Creates and starts the bot"""
dp = Dispatcher(Bot(token=TOKEN))
# bot endpoints block:
dp.register_message_handler(
background_task_creator,
commands=['start']
)
# start bot
executor.start_polling(dp, skip_updates=True, on_startup=on_bot_start_up)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
create_bot_factory()