数据库连接错误,而 celery worker 保持空闲 24 小时
Database connection error while celery worker remains idle for 24 hours
我有一个基于 Django 的 Web 应用程序,我在其中使用 Kafka 来处理一些订单。现在我使用 Celery Workers 为每个主题分配一个 Kafka 消费者。每个 Kafka 消费者都以 Kafka 任务的形式分配给一个 Kafka 主题。但是大约一天后,当我提交任务时出现以下错误:
_mysql.connection.query(self, query)
_mysql_exceptions.OperationalError: (2006, 'MySQL server has gone away')
The above exception was the direct cause of the following exception:
下面是我的 tasks.py 文件的样子:
@shared_task
def init_kafka_consumer(topic):
try:
if topic is None:
raise Exception("Topic is none, unable to initialize kafka consumer")
logger.info("Spawning new task to subscribe to topic")
params = []
params.append(topic)
background_thread = Thread(target=sunscribe_consumer, args=params)
background_thread.start()
except Exception :
logger.exception("An exception occurred while reading message from kafka")
def sunscribe_consumer(topic) :
try:
if topic is None:
raise Exception("Topic is none, unable to initialize kafka consumer")
conf = {'bootstrap.servers': "localhost:9092", 'group.id': 'test', 'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest'}
c = Consumer(conf)
logger.info("Subscribing consumer to topic "+str(topic[0]))
c.subscribe(topic)
# Read messages from Kafka
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
try:
objs = serializers.deserialize("json", msg.value())
for obj in objs:
order = obj.object
order = BuyOrder.objects.get(id=order.id) #Getting an error while accessing DB
if order.is_pushed_to_kafka :
return
order.is_pushed_to_kafka = True
order.save()
from web3 import HTTPProvider, Web3, exceptions
w3 = Web3(HTTPProvider(INFURA_MAIN_NET_ETH_URL))
processBuyerPayout(order,w3)
except Exception :
logger.exception("An exception occurred while de-serializing message")
except Exception :
logger.exception("An exception occurred while reading message from kafka")
finally:
c.close()
except Exception :
logger.exception("An exception occurred while reading message from kafka")
有没有办法在收到任务后立即检查数据库连接是否存在,如果不存在,我可以重新建立连接?
根据https://github.com/celery/django-celery-results/issues/58#issuecomment-418413369
以及上面的评论:
from django.db import close_old_connections
close_old_connections()
在您的任务中关闭旧连接并打开新连接应该会有所帮助。
我有一个基于 Django 的 Web 应用程序,我在其中使用 Kafka 来处理一些订单。现在我使用 Celery Workers 为每个主题分配一个 Kafka 消费者。每个 Kafka 消费者都以 Kafka 任务的形式分配给一个 Kafka 主题。但是大约一天后,当我提交任务时出现以下错误:
_mysql.connection.query(self, query)
_mysql_exceptions.OperationalError: (2006, 'MySQL server has gone away')
The above exception was the direct cause of the following exception:
下面是我的 tasks.py 文件的样子:
@shared_task
def init_kafka_consumer(topic):
try:
if topic is None:
raise Exception("Topic is none, unable to initialize kafka consumer")
logger.info("Spawning new task to subscribe to topic")
params = []
params.append(topic)
background_thread = Thread(target=sunscribe_consumer, args=params)
background_thread.start()
except Exception :
logger.exception("An exception occurred while reading message from kafka")
def sunscribe_consumer(topic) :
try:
if topic is None:
raise Exception("Topic is none, unable to initialize kafka consumer")
conf = {'bootstrap.servers': "localhost:9092", 'group.id': 'test', 'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest'}
c = Consumer(conf)
logger.info("Subscribing consumer to topic "+str(topic[0]))
c.subscribe(topic)
# Read messages from Kafka
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
try:
objs = serializers.deserialize("json", msg.value())
for obj in objs:
order = obj.object
order = BuyOrder.objects.get(id=order.id) #Getting an error while accessing DB
if order.is_pushed_to_kafka :
return
order.is_pushed_to_kafka = True
order.save()
from web3 import HTTPProvider, Web3, exceptions
w3 = Web3(HTTPProvider(INFURA_MAIN_NET_ETH_URL))
processBuyerPayout(order,w3)
except Exception :
logger.exception("An exception occurred while de-serializing message")
except Exception :
logger.exception("An exception occurred while reading message from kafka")
finally:
c.close()
except Exception :
logger.exception("An exception occurred while reading message from kafka")
有没有办法在收到任务后立即检查数据库连接是否存在,如果不存在,我可以重新建立连接?
根据https://github.com/celery/django-celery-results/issues/58#issuecomment-418413369 以及上面的评论:
from django.db import close_old_connections
close_old_connections()
在您的任务中关闭旧连接并打开新连接应该会有所帮助。