Celery/Flask/Redis - apply_async 功能不起作用 - 更新 2

Celery/Flask/Redis - apply_async function doesn't work - Updated 2

大家。

app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)


def upload():
    if checkDouble == None:
        do_async_upload.apply_async(args=[checkDouble], countdown=15)
        return 'Saved ' + file.filename
    else:
        return "File exists"    

@celery.task
def do_async_upload(pic):
    print("hello2")
    db.session.add(pic)
    db.session.commit()
    return 0

但是do_sync_upload不起作用(hello2不打印),它只是跳过。 Redis 服务器正常工作

更新:

if checkDouble == None:
        print('Hello')
        async_result = do_async_upload.delay(checkDouble)
        print(async_result)
        result = async_result.get()


        return 'Saved ' + file.filename
    else:
        return "File exists"    

但是do_async_upload.apply_async好像根本没有执行。 也许我的 Redis 有问题?服务器正在运行,但在我尝试提交内容后,连接的客户端从 0 变为 4。 甚至我的日期时间设置也有问题。

更新 2:

我用 Flask 和 Celery 尝试了 2 个不同的示例,其中 none 个有效。 Redis 服务器显示一些 activity,但没有。 他们都使用相同的功能: 来自 https://github.com/ro6ley/flask-celery-demo 的代码:

@client.task
def send_mail(data):

    with app.app_context():
        msg = Message("Ping!",
                    sender="admin.ping",
                    recipients=[data['email']])
        msg.body = data['message']        
        mail.send(msg)


@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html')

    elif request.method == 'POST':
        data = {}
        data['email'] = request.form['email']
        data['first_name'] = request.form['first_name']
        data['last_name'] = request.form['last_name']
        data['message'] = request.form['message']
        duration = int(request.form['duration'])
        duration_unit = request.form['duration_unit']
        print("1")    
        send_mail.apply_async(args=[data], countdown=duration)
        print("2")
        flash(f"Email will be sent to {data['email']} in {request.form['duration']} {duration_unit}")    

它显示了闪现消息,但没有收到消息。 Config.py 设置正确。

下次申请:

 @app.route('/', methods=['GET', 'POST'])
    def index():
        if request.method == 'GET':
            return render_template('index.html', email=session.get('email', ''))
        email = request.form['email']
        session['email'] = email

        # send the email
        email_data = {
            'subject': 'Hello from Flask',
            'to': email,
            'body': 'This is a test email sent from a background Celery task.'
        }
        if request.form['submit'] == 'Send':
            # send right away
            send_async_email.delay(email_data)
            flash('Sending email to {0}'.format(email))
        else:
            # send in one minute
            send_async_email.apply_async(args=[email_data], countdown=60)
            flash('An email will be sent to {0} in one minute'.format(email))

        return redirect(url_for('index'))

    @celery.task
    def send_async_email(email_data):
        """Background task to send an email with Flask-Mail."""
        msg = Message(email_data['subject'],
                      sender=app.config['MAIL_DEFAULT_SENDER'],
                      recipients=[email_data['to']])
        msg.body = email_data['body']
        with app.app_context():
            mail.send(msg)

None 个选项有效。

apply_async() returns 立即生成一个 AsyncResult 对象,难怪你什么都看不到并认为 do_async_upload() 没有执行...

所以我建议你做两件事:

1) 将 upload() 修改为如下内容:

async_result = do_async_upload.apply_async(args=[checkDouble], countdown=15)
# .get() blocks until the task is done and returns
result = async_result.get()

2) 不要在 Celery 任务中调用 print()。使用 get_task_logger() 函数获取 Celery 的记录器对象并在您的任务中使用它。有个whole section in the documentation about it.