django celery 和 asyncio - 循环参数必须大约每 3 分钟与 Future 一致
django celery and asyncio - loop argument must agree with Future approx every 3 mins
我正在使用 django celery 和 celery beat 运行 周期性任务。我运行每分钟一个任务,通过SNMP获取一些数据。
我的函数使用如下所示的 asyncio。我检查了代码以检查循环是否关闭并创建一个新循环。
但似乎每隔几个任务就会发生一次失败,在 Django-tasks-results 数据库中我有以下回溯。似乎每 3 分钟就有一次失败,但每分钟都有成功,没有失败
错误:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 374, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 629, in __protected_call__
return self.run(*args, **kwargs)
File "/itapp/itapp/monitoring/tasks.py", line 32, in link_data
return get_link_data()
File "/itapp/itapp/monitoring/jobs/link_monitoring.py", line 209, in get_link_data
done, pending = loop.run_until_complete(asyncio.wait(tasks))
File "/usr/local/lib/python3.6/asyncio/base_events.py", line 468, in run_until_complete
return future.result()
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 311, in wait
fs = {ensure_future(f, loop=loop) for f in set(fs)}
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 311, in <setcomp>
fs = {ensure_future(f, loop=loop) for f in set(fs)}
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 514, in ensure_future
raise ValueError('loop argument must agree with Future')
ValueError: loop argument must agree with Future
函数:
async def retrieve_data(link):
poll_interval = 60
results = []
# credentials:
link_mgmt_ip = link.mgmt_ip
link_index = link.interface_index
snmp_user = link.device_circuit_subnet.device.snmp_data.name
snmp_auth = link.device_circuit_subnet.device.snmp_data.auth
snmp_priv = link.device_circuit_subnet.device.snmp_data.priv
hostname = link.device_circuit_subnet.device.hostname
print('polling data for {} on {}'.format(hostname,link_mgmt_ip))
# first poll for speeds
download_speed_data_poll1 = snmp_get(link_mgmt_ip, down_speed_oid % link_index ,snmp_user, snmp_auth, snmp_priv)
# check we were able to poll
if 'timeout' in str(get_snmp_value(download_speed_data_poll1)).lower():
return 'timeout trying to poll {} - {}'.format(hostname ,link_mgmt_ip)
upload_speed_data_poll1 = snmp_get(link_mgmt_ip, up_speed_oid % link_index, snmp_user, snmp_auth, snmp_priv)
# wait for poll interval
await asyncio.sleep(poll_interval)
# second poll for speeds
download_speed_data_poll2 = snmp_get(link_mgmt_ip, down_speed_oid % link_index, snmp_user, snmp_auth, snmp_priv)
upload_speed_data_poll2 = snmp_get(link_mgmt_ip, up_speed_oid % link_index, snmp_user, snmp_auth, snmp_priv)
# create deltas for speed
down_delta = int(get_snmp_value(download_speed_data_poll2)) - int(get_snmp_value(download_speed_data_poll1))
up_delta = int(get_snmp_value(upload_speed_data_poll2)) - int(get_snmp_value(upload_speed_data_poll1))
# set speed results
download_speed = round((down_delta * 8 / poll_interval) / 1048576)
upload_speed = round((up_delta * 8 / poll_interval) / 1048576)
# get description and interface state
int_desc = snmp_get(link_mgmt_ip, int_desc_oid % link_index, snmp_user, snmp_auth, snmp_priv)
int_state = snmp_get(link_mgmt_ip, int_state_oid % link_index, snmp_user, snmp_auth, snmp_priv)
...
return results
def get_link_data():
mgmt_ip = Subquery(
DeviceCircuitSubnets.objects.filter(device_id=OuterRef('device_circuit_subnet__device_id'),subnet__subnet_type__poll=True).values('subnet__subnet')[:1])
link_data = LinkTargets.objects.all() \
.select_related('device_circuit_subnet') \
.select_related('device_circuit_subnet__device') \
.select_related('device_circuit_subnet__device__snmp_data') \
.select_related('device_circuit_subnet__subnet') \
.select_related('device_circuit_subnet__circuit') \
.annotate(mgmt_ip=mgmt_ip)
tasks = []
loop = asyncio.get_event_loop()
if asyncio.get_event_loop().is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(asyncio.new_event_loop())
for link in link_data:
tasks.append(asyncio.ensure_future(retrieve_data(link)))
if tasks:
start = time.time()
done, pending = loop.run_until_complete(asyncio.wait(tasks))
loop.close()
results = []
for completed_task in done:
results.append(completed_task.result()[0])
end = time.time()
print("Poll time: {}".format(end - start))
return 'Link data updated for {}'.format(' \n '.join(results))
else:
return 'no tasks defined'
来自用户 4815162342 建议的这些网址
当 运行ning ascync 函数时,任何输入输出操作都需要异步兼容,但 运行 在内存中的函数除外。 (在我的例子中是一个正则表达式查询)
即任何需要从其他来源收集数据的函数(在我的示例中是一个 django 查询)不兼容异步必须在执行程序中 运行。
我想我现在已经通过 运行 在执行程序中调用所有 django 数据库来解决我的问题,从那以后我就没有遇到过 运行 临时脚本的问题。
但是我遇到了 celery 和 async 的兼容性问题(因为 celery 还不兼容 asyncio,这会引发一些错误,但不是我之前看到的错误)
我正在使用 django celery 和 celery beat 运行 周期性任务。我运行每分钟一个任务,通过SNMP获取一些数据。
我的函数使用如下所示的 asyncio。我检查了代码以检查循环是否关闭并创建一个新循环。
但似乎每隔几个任务就会发生一次失败,在 Django-tasks-results 数据库中我有以下回溯。似乎每 3 分钟就有一次失败,但每分钟都有成功,没有失败
错误:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 374, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 629, in __protected_call__
return self.run(*args, **kwargs)
File "/itapp/itapp/monitoring/tasks.py", line 32, in link_data
return get_link_data()
File "/itapp/itapp/monitoring/jobs/link_monitoring.py", line 209, in get_link_data
done, pending = loop.run_until_complete(asyncio.wait(tasks))
File "/usr/local/lib/python3.6/asyncio/base_events.py", line 468, in run_until_complete
return future.result()
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 311, in wait
fs = {ensure_future(f, loop=loop) for f in set(fs)}
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 311, in <setcomp>
fs = {ensure_future(f, loop=loop) for f in set(fs)}
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 514, in ensure_future
raise ValueError('loop argument must agree with Future')
ValueError: loop argument must agree with Future
函数:
async def retrieve_data(link):
poll_interval = 60
results = []
# credentials:
link_mgmt_ip = link.mgmt_ip
link_index = link.interface_index
snmp_user = link.device_circuit_subnet.device.snmp_data.name
snmp_auth = link.device_circuit_subnet.device.snmp_data.auth
snmp_priv = link.device_circuit_subnet.device.snmp_data.priv
hostname = link.device_circuit_subnet.device.hostname
print('polling data for {} on {}'.format(hostname,link_mgmt_ip))
# first poll for speeds
download_speed_data_poll1 = snmp_get(link_mgmt_ip, down_speed_oid % link_index ,snmp_user, snmp_auth, snmp_priv)
# check we were able to poll
if 'timeout' in str(get_snmp_value(download_speed_data_poll1)).lower():
return 'timeout trying to poll {} - {}'.format(hostname ,link_mgmt_ip)
upload_speed_data_poll1 = snmp_get(link_mgmt_ip, up_speed_oid % link_index, snmp_user, snmp_auth, snmp_priv)
# wait for poll interval
await asyncio.sleep(poll_interval)
# second poll for speeds
download_speed_data_poll2 = snmp_get(link_mgmt_ip, down_speed_oid % link_index, snmp_user, snmp_auth, snmp_priv)
upload_speed_data_poll2 = snmp_get(link_mgmt_ip, up_speed_oid % link_index, snmp_user, snmp_auth, snmp_priv)
# create deltas for speed
down_delta = int(get_snmp_value(download_speed_data_poll2)) - int(get_snmp_value(download_speed_data_poll1))
up_delta = int(get_snmp_value(upload_speed_data_poll2)) - int(get_snmp_value(upload_speed_data_poll1))
# set speed results
download_speed = round((down_delta * 8 / poll_interval) / 1048576)
upload_speed = round((up_delta * 8 / poll_interval) / 1048576)
# get description and interface state
int_desc = snmp_get(link_mgmt_ip, int_desc_oid % link_index, snmp_user, snmp_auth, snmp_priv)
int_state = snmp_get(link_mgmt_ip, int_state_oid % link_index, snmp_user, snmp_auth, snmp_priv)
...
return results
def get_link_data():
mgmt_ip = Subquery(
DeviceCircuitSubnets.objects.filter(device_id=OuterRef('device_circuit_subnet__device_id'),subnet__subnet_type__poll=True).values('subnet__subnet')[:1])
link_data = LinkTargets.objects.all() \
.select_related('device_circuit_subnet') \
.select_related('device_circuit_subnet__device') \
.select_related('device_circuit_subnet__device__snmp_data') \
.select_related('device_circuit_subnet__subnet') \
.select_related('device_circuit_subnet__circuit') \
.annotate(mgmt_ip=mgmt_ip)
tasks = []
loop = asyncio.get_event_loop()
if asyncio.get_event_loop().is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(asyncio.new_event_loop())
for link in link_data:
tasks.append(asyncio.ensure_future(retrieve_data(link)))
if tasks:
start = time.time()
done, pending = loop.run_until_complete(asyncio.wait(tasks))
loop.close()
results = []
for completed_task in done:
results.append(completed_task.result()[0])
end = time.time()
print("Poll time: {}".format(end - start))
return 'Link data updated for {}'.format(' \n '.join(results))
else:
return 'no tasks defined'
来自用户 4815162342 建议的这些网址
当 运行ning ascync 函数时,任何输入输出操作都需要异步兼容,但 运行 在内存中的函数除外。 (在我的例子中是一个正则表达式查询)
即任何需要从其他来源收集数据的函数(在我的示例中是一个 django 查询)不兼容异步必须在执行程序中 运行。
我想我现在已经通过 运行 在执行程序中调用所有 django 数据库来解决我的问题,从那以后我就没有遇到过 运行 临时脚本的问题。
但是我遇到了 celery 和 async 的兼容性问题(因为 celery 还不兼容 asyncio,这会引发一些错误,但不是我之前看到的错误)