Paho-Mqtt Django, on_message() 函数运行两次
Paho-Mqtt django, on_message() function runs twice
我在 Django 中使用 paho-mqtt 来接收消息。一切正常。但是 on_message() 函数执行了两次。
我试过调试,但似乎该函数被调用了一次,但数据库插入发生了两次,消息打印发生了两次,on_message() 函数中的所有内容都发生了两次,我的数据每次发布都会插入两次。
我怀疑它是在并行线程中发生的,并安装了一个 celery redis 后端来对插入进行排队并避免重复插入。但数据仍然被插入了两次。
我也试过锁定变量,以避免并行线程中的问题,但仍然插入了两次数据。
我正在使用 Postgres 数据库
如何解决这个问题?我希望 on_message() 函数对每个 publish
只执行一次
我的init.py
from . import mqtt
mqtt.client.loop_start()
我的mqtt.py
import ast
import json
import paho.mqtt.client as mqtt
# Broker CONNACK response
from datetime import datetime
from raven.utils import logger
from kctsmarttransport import settings
def on_connect(client, userdata, flags, rc):
# Subcribing to topic and recoonect for
client.subscribe("data/gpsdata/server/#")
print 'subscribed to data/gpsdata/server/#'
# Receive message
def on_message(client, userdata, msg):
# from kctsmarttransport.celery import bus_position_insert_task
# bus_position_insert_task.delay(msg.payload)
from Transport.models import BusPosition
from Transport.models import Student, SpeedWarningLog, Bus
from Transport.models import Location
from Transport.models import IdleTimeLog
from pytz import timezone
try:
dumpData = json.dumps(msg.payload)
rawGpsData = json.loads(dumpData)
jsonGps = ast.literal_eval(rawGpsData)
bus = Bus.objects.get(bus_no=jsonGps['Busno'])
student = None
stop = None
if jsonGps['card'] is not False:
try:
student = Student.objects.get(rfid_value=jsonGps['UID'])
except Student.DoesNotExist:
student = None
if 'stop_id' in jsonGps:
stop = Location.objects.get(pk=jsonGps['stop_id'])
dates = datetime.strptime(jsonGps['Date&Time'], '%Y-%m-%d %H:%M:%S')
tz = timezone('Asia/Kolkata')
dates = tz.localize(dates)
lat = float(jsonGps['Latitude'])
lng = float(jsonGps['Longitude'])
speed = float(jsonGps['speed'])
# print msg.topic + " " + str(msg.payload)
busPosition = BusPosition.objects.filter(bus=bus, created_at=dates,
lat=lat,
lng=lng,
speed=speed,
geofence=stop,
student=student)
if busPosition.count() == 0:
busPosition = BusPosition.objects.create(bus=bus, created_at=dates,
lat=lat,
lng=lng,
speed=speed,
geofence=stop,
student=student)
if speed > 60:
SpeedWarningLog.objects.create(bus=busPosition.bus, speed=busPosition.speed,
lat=lat, lng=lng, created_at=dates)
sendSMS(settings.TRANSPORT_OFFICER_NUMBER, jsonGps['Busno'], jsonGps['speed'])
if speed <= 2:
try:
old_entry_query = IdleTimeLog.objects.filter(bus=bus, done=False).order_by('idle_start_time')
if old_entry_query.count() > 0:
old_entry = old_entry_query.reverse()[0]
old_entry.idle_end_time = dates
old_entry.save()
else:
new_entry = IdleTimeLog.objects.create(bus=bus, idle_start_time=dates, lat=lat, lng=lng)
except IdleTimeLog.DoesNotExist:
new_entry = IdleTimeLog.objects.create(bus=bus, idle_start_time=dates, lat=lat, lng=lng)
else:
try:
old_entry_query = IdleTimeLog.objects.filter(bus=bus, done=False).order_by('idle_start_time')
if old_entry_query.count() > 0:
old_entry = old_entry_query.reverse()[0]
old_entry.idle_end_time = dates
old_entry.done = True
old_entry.save()
except IdleTimeLog.DoesNotExist:
pass
except Exception, e:
logger.error(e.message, exc_info=True)
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("10.1.75.106", 1883, 60)
我遇到了同样的问题!
尝试使用:
def on_disconnect(client, userdata, rc):
client.loop_stop(force=False)
if rc != 0:
print("Unexpected disconnection.")
else:
print("Disconnected")
正如评论中提到的那样 运行 您的服务器使用 --noreload
例如:python manage.py 运行server --noreload
(张贴在这里以提高知名度。)
我在 Django 中使用 paho-mqtt 来接收消息。一切正常。但是 on_message() 函数执行了两次。
我试过调试,但似乎该函数被调用了一次,但数据库插入发生了两次,消息打印发生了两次,on_message() 函数中的所有内容都发生了两次,我的数据每次发布都会插入两次。
我怀疑它是在并行线程中发生的,并安装了一个 celery redis 后端来对插入进行排队并避免重复插入。但数据仍然被插入了两次。
我也试过锁定变量,以避免并行线程中的问题,但仍然插入了两次数据。
我正在使用 Postgres 数据库
如何解决这个问题?我希望 on_message() 函数对每个 publish
只执行一次我的init.py
from . import mqtt
mqtt.client.loop_start()
我的mqtt.py
import ast
import json
import paho.mqtt.client as mqtt
# Broker CONNACK response
from datetime import datetime
from raven.utils import logger
from kctsmarttransport import settings
def on_connect(client, userdata, flags, rc):
# Subcribing to topic and recoonect for
client.subscribe("data/gpsdata/server/#")
print 'subscribed to data/gpsdata/server/#'
# Receive message
def on_message(client, userdata, msg):
# from kctsmarttransport.celery import bus_position_insert_task
# bus_position_insert_task.delay(msg.payload)
from Transport.models import BusPosition
from Transport.models import Student, SpeedWarningLog, Bus
from Transport.models import Location
from Transport.models import IdleTimeLog
from pytz import timezone
try:
dumpData = json.dumps(msg.payload)
rawGpsData = json.loads(dumpData)
jsonGps = ast.literal_eval(rawGpsData)
bus = Bus.objects.get(bus_no=jsonGps['Busno'])
student = None
stop = None
if jsonGps['card'] is not False:
try:
student = Student.objects.get(rfid_value=jsonGps['UID'])
except Student.DoesNotExist:
student = None
if 'stop_id' in jsonGps:
stop = Location.objects.get(pk=jsonGps['stop_id'])
dates = datetime.strptime(jsonGps['Date&Time'], '%Y-%m-%d %H:%M:%S')
tz = timezone('Asia/Kolkata')
dates = tz.localize(dates)
lat = float(jsonGps['Latitude'])
lng = float(jsonGps['Longitude'])
speed = float(jsonGps['speed'])
# print msg.topic + " " + str(msg.payload)
busPosition = BusPosition.objects.filter(bus=bus, created_at=dates,
lat=lat,
lng=lng,
speed=speed,
geofence=stop,
student=student)
if busPosition.count() == 0:
busPosition = BusPosition.objects.create(bus=bus, created_at=dates,
lat=lat,
lng=lng,
speed=speed,
geofence=stop,
student=student)
if speed > 60:
SpeedWarningLog.objects.create(bus=busPosition.bus, speed=busPosition.speed,
lat=lat, lng=lng, created_at=dates)
sendSMS(settings.TRANSPORT_OFFICER_NUMBER, jsonGps['Busno'], jsonGps['speed'])
if speed <= 2:
try:
old_entry_query = IdleTimeLog.objects.filter(bus=bus, done=False).order_by('idle_start_time')
if old_entry_query.count() > 0:
old_entry = old_entry_query.reverse()[0]
old_entry.idle_end_time = dates
old_entry.save()
else:
new_entry = IdleTimeLog.objects.create(bus=bus, idle_start_time=dates, lat=lat, lng=lng)
except IdleTimeLog.DoesNotExist:
new_entry = IdleTimeLog.objects.create(bus=bus, idle_start_time=dates, lat=lat, lng=lng)
else:
try:
old_entry_query = IdleTimeLog.objects.filter(bus=bus, done=False).order_by('idle_start_time')
if old_entry_query.count() > 0:
old_entry = old_entry_query.reverse()[0]
old_entry.idle_end_time = dates
old_entry.done = True
old_entry.save()
except IdleTimeLog.DoesNotExist:
pass
except Exception, e:
logger.error(e.message, exc_info=True)
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("10.1.75.106", 1883, 60)
我遇到了同样的问题!
尝试使用:
def on_disconnect(client, userdata, rc):
client.loop_stop(force=False)
if rc != 0:
print("Unexpected disconnection.")
else:
print("Disconnected")
正如评论中提到的那样 运行 您的服务器使用 --noreload
例如:python manage.py 运行server --noreload
(张贴在这里以提高知名度。)