在 python 中从数据库中实时获取数据

Fetching data in realtime from database in python

我在 Python 中有一个用于多处理的 class,它创建了 3 个不同的进程。第一个过程是检查我的硬件是否有任何信号并将其推入队列,第二个过程是从队列中取出数据并将其推入数据库,第三个过程是从数据库中取出数据并将其推送到服务器上。

obj = QE()
stdFunct = standardFunctions()

watchDogProcess = multiprocessing.Process(target=obj.watchDog)
watchDogProcess.start()
        
pushToDBSProcess = multiprocessing.Process(target=obj.pushToDBS)
pushToDBSProcess.start()
        
pushToCloud = multiprocessing.Process(target=stdFunct.uploadCycleTime)
pushToCloud.start()
        
watchDogProcess.join()
pushToDBSProcess.join()
pushToCloud.join()

我的前两个过程 运行 完全符合要求,但是我在第三个过程中遇到了困难。下面是我的第三个过程的代码:

def uploadCycleTime(self):
    while True:
        uploadCycles = []
        lastUpPointer = "SELECT id FROM lastUploaded"
        lastUpPointer = self.dbFetchone(lastUpPointer)
        lastUpPointer = lastUpPointer[0]
        # print("lastUploaded :"+str(lastUpPointer))
        cyclesToUploadSQL = "SELECT id,machineId,startDateTime,endDateTime,type FROM cycletimes WHERE id > "+str(lastUpPointer)
        cyclesToUpload = self.dbfetchMany(cyclesToUploadSQL,15)
        cyclesUploadLength = len(cyclesToUpload)
        if(cyclesUploadLength>0):
            for cycles in cyclesToUpload:
                uploadCycles.append({"dataId":cycles[0],"machineId":cycles[1],"startDateTime":cycles[2].strftime('%Y-%m-%d %H:%M:%S.%f'),"endDateTime":cycles[3].strftime('%Y-%m-%d %H:%M:%S.%f'),"type":cycles[4]})        
            # print("length : "+str(cyclesUploadLength))
            lastUpPointer = uploadCycles[cyclesUploadLength-1]["dataId"]
            
            uploadCycles = json.dumps(uploadCycles)
            api = self.dalUrl+"/cycle-times"
            uploadResponse = self.callPostAPI(api,str(uploadCycles))
            print(lastUpPointer)
            changePointerSQL = "UPDATE lastUploaded SET id="+str(lastUpPointer)
            try:
                changePointerSQL = self.dbAbstraction(changePointerSQL)
            except Exception as errorPointer:
                print("Pointer change Error : "+str(errorPointer))
            time.sleep(2)

现在我正在保存一个指针以记住上次上传的 ID,并从那里继续上传 15 个数据包。当数据库中存在数据时,代码运行良好,但是如果在启动过程时不存在数据,然后发送数据,则无法从数据库中获取数据。

我尝试实时打印长度,尽管数据不断实时推送到数据库中,但它一直给我 0。

在我的上传过程中,我错过了一个 commit()

def dbFetchAll(self,dataString):
    # dataToPush = self.cycletimeQueue.get()
    # print(dataToPush)
    dbTry = 1
    try:
        while(dbTry == 1): # This while is to ensure the data has been pushed
            sql = dataString
            self.conn.execute(sql)
            response = self.conn.fetchall()
            dbTry = 0
            return response
            # print(self.conn.rowcount, "record inserted.")
    except Exception as error:
        print ("Error : "+str(error))
        return dbTry

    ***finally:
        self.mydb.commit()***