在一个线程中创建的 DatabaseWrapper 对象只能在同一个线程中使用。”尝试使用 celery 插入数据库时
DatabaseWrapper objects created in a thread can only be used in that same thread." when trying to insert into database using celery
我正在使用 celery 从 csv 中读取数据并将其上传到 Postgres。芹菜任务正在工作(我认为),但 django 抛出错误。我正在获取一个文件,将其转换为 pandas,删除 2 列,然后转换为 numpy 并传递给 celery。
#tasks.py
@shared_task
def a(data):
for x in data:
date=x[0]
date=datetime.datetime.strptime(date,"%m/%d/%Y")
date=str(date.date())
desc=x[1]
amount=x[3]
account=x[6]
cat=x[5]
account_type=x[4]
#changing account type
if account_type=="debit":
account_type=False
else:
account_type=True
#creating account if needed and assigning the the object to account
try:
account=t102_accounts.objects.get(c102_acc_name=account,c102_is_credit_fl=account_type)
except:
account=t102_accounts.objects.create(c102_acc_name=account,c102_is_credit_fl=account_type)
#creating cat if needed and assigning the object
try:
cat=t100_code_lookup.objects.get(c100_code_name=cat)
except:
cat=t100_code_lookup.objects.create(c100_code_name=cat,c100_for_exp_fl=True,c100_for_income_fl=True)
#creating the transaction if it is not existing already
try:
t106_transactions.objects.get(c106_trans_amount=amount,
c106_trans_date=date,c102_trans_card_num=account,
c100_trans_cat=cat,c106_trans_name=desc)
continue
except:
t106_transactions.objects.create(c106_trans_amount=amount,
c106_trans_date=date,c102_trans_card_num=account,
c100_trans_cat=cat,c106_trans_name=desc)
return 'done'
#calling the function in views
def upload(request):
if request.method=="POST":
form=UploadFile(request.POST,request.FILES)
if form.is_valid():
data=request.FILES["file"]
data=pd.read_csv(data)
del data["Notes"]
del data["Labels"]
data = data.dropna(axis=0)
data=data.to_numpy()
data=data.tolist()
a.delay(data)
return redirect(dashboard)
我的回溯列在这里。
回溯:
File "c:\users\yogab\appdata\local\programs\python\python38-32\lib\site-packages\celery\utils\dispatch\signal.py", line 288, in send
response = receiver(signal=self, sender=sender, **named)
File "c:\users\yogab\appdata\local\programs\python\python38-32\lib\site-packages\celery\fixups\django.py", line 172, in on_task_postrun
self.close_database()
File "c:\users\yogab\appdata\local\programs\python\python38-32\lib\site-packages\celery\fixups\django.py", line 177, in close_database
return self._close_database()
File "c:\users\yogab\appdata\local\programs\python\python38-32\lib\site-packages\celery\fixups\django.py", line 186, in _close_database
conn.close()
File "c:\users\yogab\appdata\local\programs\python\python38-32\lib\site-packages\django\utils\asyncio.py", line 24, in inner
return func(*args, **kwargs)
File "c:\users\yogab\appdata\local\programs\python\python38-32\lib\site-packages\django\db\backends\base\base.py", line 286, in close
self.validate_thread_sharing()
File "c:\users\yogab\appdata\local\programs\python\python38-32\lib\site-packages\django\db\backends\base\base.py", line 553, in validate_thread_sharing
raise DatabaseError(
django.db.utils.DatabaseError: DatabaseWrapper objects created in a thread can only be used in that same thread. The object with alias 'default' was created in thread id 54283112 and this is thread id 8121832.```
我发现了问题:我在 Windows.
一旦我部署到 heroku(在 Linux 上运行),它就成功了。
不一定是 OS 相关问题。
在我的例子中,这是由于将项目迁移到 python 3 而对我有用的解决方案是:
- 正在将 celery 和 eventlet 更新到最新版本
- 将CELERY_TASK_ALWAYS_EAGER添加到芹菜配置
#芹菜配置
CELERY_BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
CELERY_TASK_ALWAYS_EAGER = True
reading this article about celery issues was of great help
我想我有一个类似的。
你如何开始你的芹菜任务?你可以使用 --pool=eventlet
使用 --pool=solo 它对我有用。
示例:
celery -A yourapp.celery worker --loglevel=info --pool=solo
关于池选项的更多信息:https://www.distributedpython.com/2018/10/26/celery-execution-pool/
我正在使用 celery 从 csv 中读取数据并将其上传到 Postgres。芹菜任务正在工作(我认为),但 django 抛出错误。我正在获取一个文件,将其转换为 pandas,删除 2 列,然后转换为 numpy 并传递给 celery。
#tasks.py
@shared_task
def a(data):
for x in data:
date=x[0]
date=datetime.datetime.strptime(date,"%m/%d/%Y")
date=str(date.date())
desc=x[1]
amount=x[3]
account=x[6]
cat=x[5]
account_type=x[4]
#changing account type
if account_type=="debit":
account_type=False
else:
account_type=True
#creating account if needed and assigning the the object to account
try:
account=t102_accounts.objects.get(c102_acc_name=account,c102_is_credit_fl=account_type)
except:
account=t102_accounts.objects.create(c102_acc_name=account,c102_is_credit_fl=account_type)
#creating cat if needed and assigning the object
try:
cat=t100_code_lookup.objects.get(c100_code_name=cat)
except:
cat=t100_code_lookup.objects.create(c100_code_name=cat,c100_for_exp_fl=True,c100_for_income_fl=True)
#creating the transaction if it is not existing already
try:
t106_transactions.objects.get(c106_trans_amount=amount,
c106_trans_date=date,c102_trans_card_num=account,
c100_trans_cat=cat,c106_trans_name=desc)
continue
except:
t106_transactions.objects.create(c106_trans_amount=amount,
c106_trans_date=date,c102_trans_card_num=account,
c100_trans_cat=cat,c106_trans_name=desc)
return 'done'
#calling the function in views
def upload(request):
if request.method=="POST":
form=UploadFile(request.POST,request.FILES)
if form.is_valid():
data=request.FILES["file"]
data=pd.read_csv(data)
del data["Notes"]
del data["Labels"]
data = data.dropna(axis=0)
data=data.to_numpy()
data=data.tolist()
a.delay(data)
return redirect(dashboard)
我的回溯列在这里。 回溯:
File "c:\users\yogab\appdata\local\programs\python\python38-32\lib\site-packages\celery\utils\dispatch\signal.py", line 288, in send
response = receiver(signal=self, sender=sender, **named)
File "c:\users\yogab\appdata\local\programs\python\python38-32\lib\site-packages\celery\fixups\django.py", line 172, in on_task_postrun
self.close_database()
File "c:\users\yogab\appdata\local\programs\python\python38-32\lib\site-packages\celery\fixups\django.py", line 177, in close_database
return self._close_database()
File "c:\users\yogab\appdata\local\programs\python\python38-32\lib\site-packages\celery\fixups\django.py", line 186, in _close_database
conn.close()
File "c:\users\yogab\appdata\local\programs\python\python38-32\lib\site-packages\django\utils\asyncio.py", line 24, in inner
return func(*args, **kwargs)
File "c:\users\yogab\appdata\local\programs\python\python38-32\lib\site-packages\django\db\backends\base\base.py", line 286, in close
self.validate_thread_sharing()
File "c:\users\yogab\appdata\local\programs\python\python38-32\lib\site-packages\django\db\backends\base\base.py", line 553, in validate_thread_sharing
raise DatabaseError(
django.db.utils.DatabaseError: DatabaseWrapper objects created in a thread can only be used in that same thread. The object with alias 'default' was created in thread id 54283112 and this is thread id 8121832.```
我发现了问题:我在 Windows.
一旦我部署到 heroku(在 Linux 上运行),它就成功了。
不一定是 OS 相关问题。
在我的例子中,这是由于将项目迁移到 python 3 而对我有用的解决方案是:
- 正在将 celery 和 eventlet 更新到最新版本
- 将CELERY_TASK_ALWAYS_EAGER添加到芹菜配置
#芹菜配置
CELERY_BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
CELERY_TASK_ALWAYS_EAGER = True
reading this article about celery issues was of great help
我想我有一个类似的
你如何开始你的芹菜任务?你可以使用 --pool=eventlet
使用 --pool=solo 它对我有用。
示例:
celery -A yourapp.celery worker --loglevel=info --pool=solo
关于池选项的更多信息:https://www.distributedpython.com/2018/10/26/celery-execution-pool/