Flask Celery Python 导入
Flask Celery Python import
我在将 Celery 集成到我的 Flask 应用程序中时遇到问题。
这是回购 https://github.com/theobouwman/community-python.
我通过 运行ning app.py
启动我的应用程序,它导入我的应用程序(其中添加了蓝图和配置)和 Celery。
在 /tasks/add.py
中,我有一个示例任务,我在其中再次为 @celery.task
装饰器导入 Celery
对象。
至此一切正常。我可以 运行 我的 Flask 应用程序和 运行 Celery worker。
但是当我尝试从蓝图中的控制器内触发任务时,就像这里 https://github.com/theobouwman/community-python/blob/master/auth/controllers/RegistrationController.py#L38 它说它无法导入它,这是一个逻辑反应。
Traceback (most recent call last):
File "app.py", line 2, in <module>
from flask_app import app
File "/development/projects/python/Community/flask_app.py", line 4, in <module>
from auth.routes import auth
File "/development/projects/python/Community/auth/routes.py", line 3, in <module>
from .controllers import RegistrationController, AuthenticationController, LogoutController
File "/development/projects/python/Community/auth/controllers/RegistrationController.py", line 10, in <module>
from tasks.add import add
File "/development/projects/python/Community/tasks/add.py", line 1, in <module>
from app import celery
File "/development/projects/python/Community/app.py", line 2, in <module>
from flask_app import app
ImportError: cannot import name 'app'
我不知道如何解决这个导入周期,这就是这个问题的原因。我用谷歌搜索了大约 3 个小时,但找不到解决方案。
我希望这里有人能帮助我。
空气中有 Flask Slack 或 Gitter 吗?
提前致谢。
将RegistrationController.py
中的导入更改为本地导入以解决循环导入:
from ..blueprint import auth
from models import User
from flask import redirect, url_for, request, render_template, flash
import bcrypt
from ..forms.register import SimpleRegistrationForm
"""
Error in python3.6 app.py
Says cyclus import error
"""
<b># Comment out the line below
# from tasks.add import add</b>
@auth.route('/register', methods=['GET', 'POST'])
def register():
form = SimpleRegistrationForm(request.form)
if request.method == 'POST' and form.validate():
fname = request.form['fname']
sname = request.form['sname']
email = request.form['email']
password = request.form['password']
hashed = bcrypt.hashpw(password.encode('utf-8 '), bcrypt.gensalt())
user = User.select().where(User.email == email)
if user.exists():
flash('Er bestaat al een account met dit email adres')
return redirect(url_for('auth.register'))
user = User(fname=fname, sname=sname, email=email, password=hashed)
user.save()
flash('Uw account is aangemaakt. Kijk in uw mailbox voor de activatie link')
return redirect(url_for('auth.register'))
return render_template('pages/register.html', form=form)
@auth.route('/register/test')
def register_test():
# local import avoids the cycle
<b>from tasks.add import add</b>
add.delay()
# hashed = bcrypt.hashpw('test'.encode('utf-8 '), bcrypt.gensalt())
# user = User(
# fname='Theo',
# sname='Bouwman',
# email='theobouwman98@gmail.com',
# password=hashed
# )
# user.save()
return redirect(url_for('auth.login'))
我在将 Celery 集成到我的 Flask 应用程序中时遇到问题。 这是回购 https://github.com/theobouwman/community-python.
我通过 运行ning app.py
启动我的应用程序,它导入我的应用程序(其中添加了蓝图和配置)和 Celery。
在 /tasks/add.py
中,我有一个示例任务,我在其中再次为 @celery.task
装饰器导入 Celery
对象。
至此一切正常。我可以 运行 我的 Flask 应用程序和 运行 Celery worker。
但是当我尝试从蓝图中的控制器内触发任务时,就像这里 https://github.com/theobouwman/community-python/blob/master/auth/controllers/RegistrationController.py#L38 它说它无法导入它,这是一个逻辑反应。
Traceback (most recent call last):
File "app.py", line 2, in <module>
from flask_app import app
File "/development/projects/python/Community/flask_app.py", line 4, in <module>
from auth.routes import auth
File "/development/projects/python/Community/auth/routes.py", line 3, in <module>
from .controllers import RegistrationController, AuthenticationController, LogoutController
File "/development/projects/python/Community/auth/controllers/RegistrationController.py", line 10, in <module>
from tasks.add import add
File "/development/projects/python/Community/tasks/add.py", line 1, in <module>
from app import celery
File "/development/projects/python/Community/app.py", line 2, in <module>
from flask_app import app
ImportError: cannot import name 'app'
我不知道如何解决这个导入周期,这就是这个问题的原因。我用谷歌搜索了大约 3 个小时,但找不到解决方案。 我希望这里有人能帮助我。
空气中有 Flask Slack 或 Gitter 吗?
提前致谢。
将RegistrationController.py
中的导入更改为本地导入以解决循环导入:
from ..blueprint import auth
from models import User
from flask import redirect, url_for, request, render_template, flash
import bcrypt
from ..forms.register import SimpleRegistrationForm
"""
Error in python3.6 app.py
Says cyclus import error
"""
<b># Comment out the line below
# from tasks.add import add</b>
@auth.route('/register', methods=['GET', 'POST'])
def register():
form = SimpleRegistrationForm(request.form)
if request.method == 'POST' and form.validate():
fname = request.form['fname']
sname = request.form['sname']
email = request.form['email']
password = request.form['password']
hashed = bcrypt.hashpw(password.encode('utf-8 '), bcrypt.gensalt())
user = User.select().where(User.email == email)
if user.exists():
flash('Er bestaat al een account met dit email adres')
return redirect(url_for('auth.register'))
user = User(fname=fname, sname=sname, email=email, password=hashed)
user.save()
flash('Uw account is aangemaakt. Kijk in uw mailbox voor de activatie link')
return redirect(url_for('auth.register'))
return render_template('pages/register.html', form=form)
@auth.route('/register/test')
def register_test():
# local import avoids the cycle
<b>from tasks.add import add</b>
add.delay()
# hashed = bcrypt.hashpw('test'.encode('utf-8 '), bcrypt.gensalt())
# user = User(
# fname='Theo',
# sname='Bouwman',
# email='theobouwman98@gmail.com',
# password=hashed
# )
# user.save()
return redirect(url_for('auth.login'))