芹菜调用不同的功能并继续链接过程
Celery calling different function and continue the chaining process
我的任务链中包含三个任务 fetch_page、check_source 和商店页面
def update_page_info(**headers):
chain=fetch_page.s(headers['key']) | check_source.s(headers['key_1']) | store_info.s()
chain().apply_async()
fetch_page 获取页面并收集需要收集的内容:
@app.task(bind=True)
def fetch_page(self,url):
#fetch_page here and return a tuple so that it can be unpacked
# dosomething
现在获取页面后,它会在下一个任务中检查源 check_source。
@app.task(bind=True)
def check_source(self,page_and_url,handle):
try:
#unpack your stuffs here
page,url=page_and_url
get_result={}
if handle=='first_option':
get_result=select_first_option(one,two)
return get_result
elif handle=='second_option':
get_result=select_second_option(one,two)
return (get_result)
elif handle=='third_option':
get_result=select_third_option(one,two)
return (get_result)
else:
return "IGNORE FOR NOW"
except Exception as exc:
pass
所以困惑是我可以从这里调用其他任务吗??这样做会不会有任何不一致,或者工作人员会不会陷入僵局?
最后它应该执行 store_info() ,它只会存储从 check_source()
返回的东西
@app.task(bind=True)
def store_info(self,result):
print ("store_info ")
try:
#store the fetched pages
except Exception as exc:
#dosomething
finally:
pass
我正在遵循这种只需要稍作修改的方法 http://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks。
有人可以建议我应该如何做以及我需要更加小心的事情吗?
这一切都应该像您阅读(和交流)它应该的那样工作。这三个任务将按顺序执行,没有任何 "inconsistency."
如果您调用 update_page_info
一次,三个链接的子任务将 运行 互斥。也就是说,此设置仍有可能出现死锁。如果您调用 update_page_info
而上次调用它时之前的任务是 运行ning,那么您可以一次获得多个任务 运行ning。这会根据您的任务共享资源的方式引入死锁的可能性。
如果您的任务共享资源,我建议使用像 redis 或 memcached 这样的东西作为跨工作人员的锁定系统。
编辑:我现在看到的代码完全没问题,因为结果作为参数传递给下一个任务。
我的任务链中包含三个任务 fetch_page、check_source 和商店页面
def update_page_info(**headers):
chain=fetch_page.s(headers['key']) | check_source.s(headers['key_1']) | store_info.s()
chain().apply_async()
fetch_page 获取页面并收集需要收集的内容:
@app.task(bind=True)
def fetch_page(self,url):
#fetch_page here and return a tuple so that it can be unpacked
# dosomething
现在获取页面后,它会在下一个任务中检查源 check_source。
@app.task(bind=True)
def check_source(self,page_and_url,handle):
try:
#unpack your stuffs here
page,url=page_and_url
get_result={}
if handle=='first_option':
get_result=select_first_option(one,two)
return get_result
elif handle=='second_option':
get_result=select_second_option(one,two)
return (get_result)
elif handle=='third_option':
get_result=select_third_option(one,two)
return (get_result)
else:
return "IGNORE FOR NOW"
except Exception as exc:
pass
所以困惑是我可以从这里调用其他任务吗??这样做会不会有任何不一致,或者工作人员会不会陷入僵局?
最后它应该执行 store_info() ,它只会存储从 check_source()
返回的东西@app.task(bind=True)
def store_info(self,result):
print ("store_info ")
try:
#store the fetched pages
except Exception as exc:
#dosomething
finally:
pass
我正在遵循这种只需要稍作修改的方法 http://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks。
有人可以建议我应该如何做以及我需要更加小心的事情吗?
这一切都应该像您阅读(和交流)它应该的那样工作。这三个任务将按顺序执行,没有任何 "inconsistency."
如果您调用 update_page_info
一次,三个链接的子任务将 运行 互斥。也就是说,此设置仍有可能出现死锁。如果您调用 update_page_info
而上次调用它时之前的任务是 运行ning,那么您可以一次获得多个任务 运行ning。这会根据您的任务共享资源的方式引入死锁的可能性。
如果您的任务共享资源,我建议使用像 redis 或 memcached 这样的东西作为跨工作人员的锁定系统。
编辑:我现在看到的代码完全没问题,因为结果作为参数传递给下一个任务。