在 python 中使用异步或并发技术控制重复检测任务
Control a recurring detection task using asynchronous or concurrent techniques in python
我正在开发一个检测工具,我希望能够通过网络控制它。 Web服务应支持工具启停和修改检测配置。但是我不知道如何让服务和工具并行工作,因为检测工具是不断循环检测的。
我对网络和异步有一些了解。但是我知道的那些方法是
关于“阻塞并等待所有可并行任务最后完成”,但我的探测任务应该在激活后不断执行,这让我感到困惑。
我写了一个小例子来说明这一点:
from typing import Optional, List
from fastapi import FastAPI, Query
from pydantic import BaseModel
config = {}
should_detecting = False
is_detecting = False
config_changed = False
def detect_task():
global is_detecting
global config_changed
while should_detecting:
is_detecting = True
result = do_time_consuming_detect(config)
if should_detecting and not config_changed:
send(result)
config_changed = False
is_detecting = False
@app.get(root_route + "/detect")
def do_detect(start_flag: int):
global should_detecting
if start_flag == 1 and is_detecting != True:
should_detecting = True
execute_asynchronously(detect_task)
elif start_flag == 0:
should_detecting = False
return {}
@app.get(root_route + "/update_config")
def update_config(new_config: dict):
global config_changed
config_changed = True
config.update(new_config)
return {}
所以我想知道如何使此 Web 服务与 detect_task() 并行工作。在此先感谢您的帮助!
我找到了答案。因为fastAPI提供了BackgroundTasks功能,你只需要将任务放入即可。
def detect(flags, ips):
flags['is_detecting'] = True
while flags['should_detect']:
do_some_detect()
flags['is_detecting'] = False
@app.get(root_route + "/detect")
def read_detect(start_flag: int, background_tasks: BackgroundTasks):
if start_flag == 1 and not flags['is_detecting']:
flags['should_detect'] = True
background_tasks.add_task(detect, flags, detecors)
elif start_flag == 0:
flags['should_detect'] = False
return {}
标志和检测器只是作为参数传递到任务中的全局变量。
我正在开发一个检测工具,我希望能够通过网络控制它。 Web服务应支持工具启停和修改检测配置。但是我不知道如何让服务和工具并行工作,因为检测工具是不断循环检测的。
我对网络和异步有一些了解。但是我知道的那些方法是 关于“阻塞并等待所有可并行任务最后完成”,但我的探测任务应该在激活后不断执行,这让我感到困惑。
我写了一个小例子来说明这一点:
from typing import Optional, List
from fastapi import FastAPI, Query
from pydantic import BaseModel
config = {}
should_detecting = False
is_detecting = False
config_changed = False
def detect_task():
global is_detecting
global config_changed
while should_detecting:
is_detecting = True
result = do_time_consuming_detect(config)
if should_detecting and not config_changed:
send(result)
config_changed = False
is_detecting = False
@app.get(root_route + "/detect")
def do_detect(start_flag: int):
global should_detecting
if start_flag == 1 and is_detecting != True:
should_detecting = True
execute_asynchronously(detect_task)
elif start_flag == 0:
should_detecting = False
return {}
@app.get(root_route + "/update_config")
def update_config(new_config: dict):
global config_changed
config_changed = True
config.update(new_config)
return {}
所以我想知道如何使此 Web 服务与 detect_task() 并行工作。在此先感谢您的帮助!
我找到了答案。因为fastAPI提供了BackgroundTasks功能,你只需要将任务放入即可。
def detect(flags, ips):
flags['is_detecting'] = True
while flags['should_detect']:
do_some_detect()
flags['is_detecting'] = False
@app.get(root_route + "/detect")
def read_detect(start_flag: int, background_tasks: BackgroundTasks):
if start_flag == 1 and not flags['is_detecting']:
flags['should_detect'] = True
background_tasks.add_task(detect, flags, detecors)
elif start_flag == 0:
flags['should_detect'] = False
return {}
标志和检测器只是作为参数传递到任务中的全局变量。