Flask Celery TypeError: send_() got multiple values for argument 'time_'
Flask Celery TypeError: send_() got multiple values for argument 'time_'
我在后台使用 Celery
作为 运行 我的功能。所有代码都运行良好,但是当我实际使用 celery
时,它一直告诉我同样的错误:
File "C:\Users\intel\AppData\Local\Programs\Python\Python38\Lib\site-packages\celery\app\task.py", line 526, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: send_() got multiple values for argument 'time_'
我正在做的实际上是抓取网页并通过从 URL 获取时间参数的调度发送电子邮件。点赞:http://127.0.0.1:5000/10
需要 10 秒。
我这样做是为了不需要等10秒先发邮件再显示网页
这是我的代码:
import time
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import ssl
import smtplib
from jinja2 import Template
from flask import Flask, render_template
from test import *
from celery import Celery
# Broker URL for RabbitMQ task queue
broker_url = 'amqp://guest@localhost'
app = Flask(__name__)
celery = Celery(app.name, broker=broker_url)
# Your celery configurations in a celeryconfig.py
# celery.config_from_object('celeryconfig')
celery.conf.update(
CELERY_DEFAULT_QUEUE="main",
CELERY_DEFAULT_EXCHANGE="main",
CELERY_DEFAULT_EXCHANGE_TYPE="direct",
CELERY_DEFAULT_ROUTING_KEY="main",
)
sender_email = "andrew.whiteman77@gmail.com"
receiver_email = "andrew.whiteman77@gmail.com"
password = "pass"
message = MIMEMultipart("alternative")
message["Subject"] = "multipart test"
message["From"] = sender_email
message["To"] = receiver_email
# Create the plain-text and HTML version of your message
t = Template("""\
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns:v="urn:schemas-microsoft-com:vml">
<body>
<h1>Hello!</h1>
{% for i in result %}
<li>{{i}}</li>
{% endfor %}
</body>
</html>
""")
@celery.task(bind=True)
def send_(time_, result):
time.sleep(time_)
html = t.render(result=result)
part2 = MIMEText(html, "HTML")
message.attach(part2)
context = ssl.create_default_context()
with smtplib.SMTP_SSL("smtp.gmail.com", 465, context=context) as server:
server.login(sender_email, password)
server.sendmail(
sender_email, receiver_email, message.as_string()
)
@app.route('/<time>')
def hello_world(time):
from bs4 import BeautifulSoup
import requests as req
resp = req.get('https://zulie.medium.com/')
soup = BeautifulSoup(resp.text, "lxml")
lst = []
for div in soup.find_all("div"):
soup = BeautifulSoup(str(div), 'html5lib')
div_tag = soup.find()
try:
title = div_tag.section.div.h1.a.text
if title not in lst:
lst.append(title)
except:
pass
send_.delay(time_=int(time), result=lst)
return render_template('hello.html', result=lst)
if __name__ == '__main__':
app.run(debug=True)
记住:您需要输入自己的密码才能发送邮件
您将任务定义为 bound task(使用 bind=True
),因此任务的第一个参数将始终是任务实例。您在函数定义中缺少 self
作为第一个参数,因此 Celery 会将任务实例提供给 time_
参数(作为位置参数)。当您使用
调用任务时
send_.delay(time_=int(time), result=lst)
您再次提供 time_
,这次是作为关键字参数。因此,您会收到有关 time_
参数的多个值的错误。
要么将任务函数定义改为
@celery.task(bind=True)
def send_(self, time_, result):
...
或者去掉任务装饰器中的bind=True
@celery.task
def send_(time_, result):
...
因为您似乎并不需要它(请参阅 了解用例)。
我在后台使用 Celery
作为 运行 我的功能。所有代码都运行良好,但是当我实际使用 celery
时,它一直告诉我同样的错误:
File "C:\Users\intel\AppData\Local\Programs\Python\Python38\Lib\site-packages\celery\app\task.py", line 526, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: send_() got multiple values for argument 'time_'
我正在做的实际上是抓取网页并通过从 URL 获取时间参数的调度发送电子邮件。点赞:http://127.0.0.1:5000/10
需要 10 秒。
我这样做是为了不需要等10秒先发邮件再显示网页
这是我的代码:
import time
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import ssl
import smtplib
from jinja2 import Template
from flask import Flask, render_template
from test import *
from celery import Celery
# Broker URL for RabbitMQ task queue
broker_url = 'amqp://guest@localhost'
app = Flask(__name__)
celery = Celery(app.name, broker=broker_url)
# Your celery configurations in a celeryconfig.py
# celery.config_from_object('celeryconfig')
celery.conf.update(
CELERY_DEFAULT_QUEUE="main",
CELERY_DEFAULT_EXCHANGE="main",
CELERY_DEFAULT_EXCHANGE_TYPE="direct",
CELERY_DEFAULT_ROUTING_KEY="main",
)
sender_email = "andrew.whiteman77@gmail.com"
receiver_email = "andrew.whiteman77@gmail.com"
password = "pass"
message = MIMEMultipart("alternative")
message["Subject"] = "multipart test"
message["From"] = sender_email
message["To"] = receiver_email
# Create the plain-text and HTML version of your message
t = Template("""\
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns:v="urn:schemas-microsoft-com:vml">
<body>
<h1>Hello!</h1>
{% for i in result %}
<li>{{i}}</li>
{% endfor %}
</body>
</html>
""")
@celery.task(bind=True)
def send_(time_, result):
time.sleep(time_)
html = t.render(result=result)
part2 = MIMEText(html, "HTML")
message.attach(part2)
context = ssl.create_default_context()
with smtplib.SMTP_SSL("smtp.gmail.com", 465, context=context) as server:
server.login(sender_email, password)
server.sendmail(
sender_email, receiver_email, message.as_string()
)
@app.route('/<time>')
def hello_world(time):
from bs4 import BeautifulSoup
import requests as req
resp = req.get('https://zulie.medium.com/')
soup = BeautifulSoup(resp.text, "lxml")
lst = []
for div in soup.find_all("div"):
soup = BeautifulSoup(str(div), 'html5lib')
div_tag = soup.find()
try:
title = div_tag.section.div.h1.a.text
if title not in lst:
lst.append(title)
except:
pass
send_.delay(time_=int(time), result=lst)
return render_template('hello.html', result=lst)
if __name__ == '__main__':
app.run(debug=True)
记住:您需要输入自己的密码才能发送邮件
您将任务定义为 bound task(使用 bind=True
),因此任务的第一个参数将始终是任务实例。您在函数定义中缺少 self
作为第一个参数,因此 Celery 会将任务实例提供给 time_
参数(作为位置参数)。当您使用
send_.delay(time_=int(time), result=lst)
您再次提供 time_
,这次是作为关键字参数。因此,您会收到有关 time_
参数的多个值的错误。
要么将任务函数定义改为
@celery.task(bind=True)
def send_(self, time_, result):
...
或者去掉任务装饰器中的bind=True
@celery.task
def send_(time_, result):
...
因为您似乎并不需要它(请参阅