连接 numpy 5 d 数组的子部分

Concatenate a sub part of a numpy 5 d array

我有一个形状为 (2, 6, 6, 2, 1) 的 5 维数组,其中包含我通过循环提供的不同类型的测量值。 第一个维度(2)对应一个物理参数(负压/正压),第二个维度对应某个x位置(共6个位置),第三个对应某个y位置(共6个位置),最后两个对应传感器测量值与时间的关系(2 是信号/时间向量的维度),1 对应于样本数(我不知道并且可能会随着迭代而改变)

对于我实验的第一个 while 循环,我有一个参数的测量矩阵(例如,Press positive,x=0,y=0),我想在 for 循环上提供我的大矩阵。 我尝试使用此功能:

Mesures_Press[P_pos][S_plus][P_plus]= np.concatenate((Mesures_Press[P_pos][S_plus][P_plus],Mes_Press[0:2,:]),axis=1)

但这失败了,因为我只连接了我的数组的一个子部分,产生了这个错误: 无法将形状 (2,102) 中的输入数组广播到形状 (2,1)

你知道我应该如何在我的 while 循环的每个步骤中输入我的矩阵吗?

    # -*- coding: utf-8 -*-
"""
Created on Fri Dec 11 00:56:26 2020

@author: labodezao
"""
import sys
import time
from rshell import main
from rshell import pyboard
import pytta
import numpy as np
import time
import matplotlib.pylab as plt
import os



#device = pytta.get_device_from_user()

measurementParams = {
    'lengthDomain': 'time',
    'timeLength': 1,
    'samplingRate': 48000,
        'freqMin': 20,
        'freqMax': 20000,
        'device': 1,
        'inChannels': [1, 2],
        'comment': 'Testing; 1, 2.'
}

msr = pytta.generate.measurement('rec',
                                     **measurementParams)
path = 'data\'
dir = os.path.dirname(path)
print(os.path.isdir(path))
if not os.path.isdir(path):
    os.mkdir(path)       

pyb = pyboard.Pyboard('COM4')
pyb.enter_raw_repl()
pyb.exec("from MesAnche import *")
pyb.exec("mes = MesAnches()")

Decoupe_Sections = 6
Decoupe_Pressions = 6
deplacement_screw_S_mm =15


#I2c : Pression débit température et surface
Mesures_Press= np.empty((2,Decoupe_Sections, Decoupe_Pressions,2,1))
MesuresTemperature=np.empty((2,Decoupe_Sections, Decoupe_Pressions,2,1))
Mesures_Debit=np.empty((2,Decoupe_Sections, Decoupe_Pressions,2,1))
Calculs_Surf=np.empty((2,Decoupe_Sections, Decoupe_Pressions,2,1))

#pytta
Mesures_Press_Acoustique=np.empty((2,Decoupe_Sections, Decoupe_Pressions,2,1))
Mesures_Accelerations = np.empty((2,Decoupe_Sections, Decoupe_Pressions,2,1))

#calculs
Mesures_Press_Pos=np.empty((2,Decoupe_Sections, Decoupe_Pressions,2,1))





#init script
Mes_bytes =0


try:
    # Boucle des pression positives
    
    for P_pos in range(2):
        print("Pressions Positive 1-Oui, 0-non : ", P_pos)
    
    # Boucle des sections
        pyb.exec("mes.Tempo_Vanne(1)")
        for S_plus in range(Decoupe_Sections):
            print("Section courante", S_plus)
            comm_s = f"mes.Move_to_Pos({(S_plus+1)*deplacement_screw_S_mm/Decoupe_Sections},500)"
            pyb.exec(comm_s)
            
    # Boucle des incréments de pression  
    
            for P_plus in range(Decoupe_Pressions):
                print("Pression courante", P_plus)
                # On as fixé une surface et on règle on pression :
                #on règle le ventilo
                comm_p=f"mes.Set_Press({int(4096/(6-P_plus))})" 
                pyb.exec(comm_p)
    
                #on ferme la vanne et on enclanche une tempo de 1 sec pui on l'ouvre et on fait les mesure dans la fonction Mesures_Full
                comm_mesure ="mes.Mes_Full_Mes()"
                pyb.exec_raw_no_follow(comm_mesure)
                #On démarre l'acquision de X sec
                med1 = msr.run()
                time.sleep(1)
                comm_grab ="print(mes.Grab_Full_Mes())"
                Mes_bytes_unformatted = pyb.exec(comm_grab)
                Mes_bytes = eval(str(Mes_bytes_unformatted, "ascii"))
    
                Mes_Press =np.frombuffer(Mes_bytes, dtype=np.float32)
                Mes_Press=Mes_Press.reshape(6,int(Mes_Press.size/6))
                
                #On récupère la valeur de la Surface
    
                Calc_Surf = float(pyb.exec("print(mes.Calc_Surf())"))
                print(Calc_Surf)
    
                #On sauvegarde les mesures
                name = f'PPos{P_pos}Sec{S_plus}PPlus{P_plus}'
                filename = f'{path}{name}'
                pytta.save(filename, med1)
                #h5f.create_dataset('dataset_1', data=a)
                
                print(S_plus,P_plus)
                Mesures_Press[P_pos][S_plus][P_plus]= np.concatenate((Mesures_Press[P_pos][S_plus][P_plus],Mes_Press[0:2,:]),axis=1)
                MesuresTemperature=np.append(MesuresTemperature,Mes_Press[2:3,:],axis=0)
                Mesures_Debit=np.append(Mesures_Debit,Mes_Press[4:5,:],axis=0)
                Mesures_Press_Pos=np.append(Mesures_Press_Pos,float((S_plus+1)*deplacement_screw_S_mm/Decoupe_Sections),axis=0)                             #1
                Calculs_Surf=np.append(Calculs_Surf,Calc_Surf,axis=0)                      #5     
                Mesures_Accelerations=np.append(Mesures_Accelerations,[med1.timeVector,med1.timeSignal[:,0]],axis=0)    #L est l'accéléromètre
                Mesures_Press_Acoustique=np.append(Mesures_Press_Acoustique,[med1.timeVector,med1.timeSignal[:,1]],axis=0)    #R est le microphone
            
            
            #Mesures_Press= np.append(Mesures_Press,Temp_Mesures_Press,axis=1)
            
            print(Mesures_Press_Pos)
            print(Calculs_Surf)
            print(Mesures_Accelerations)
            plt.plot(Mesures_Press_Acoustique[0][0][0][0],Mesures_Press_Acoustique[0][0][0][1]) # du type mat[pression_pos][Pos_section][Val_Pression][]
              
except Exception as e:
    exc_type, exc_obj, exc_tb = sys.exc_info()
    fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
    print("Error :",fname, exc_tb.tb_lineno,e)
    pass

pyb.exit_raw_repl()
pyb.close()

好的。所以,问题是大小不匹配,因为 np 期望我们有填充所有维度的值。一个快速修复方法是将 Mes_Press 的第 1-3 个维度的值设置为 0(默认值)。

示例代码如下:

import numpy as np

P_pos = 0
S_plus = 0
P_plus = 0
# Add some default values to test the code.
Mesures_Press = np.ones((2, 6, 6, 2, 1))
Mes_Press = 3*np.ones((2, 10))
############################################
# Get the number of samples to be added.
m = Mes_Press[0:2,:].shape[1]

# Create a temporary variable with zeros of 
# size (2, 6, 6, 2, #numsamples)
temp =  np.zeros((2, 6, 6, 2, m))
# Assign the values.
temp[:, :, :, 0:2, 0:m] = Mes_Press[0:2,:]

# Now append to the main array
Mesures_Press = np.append(Mesures_Press, temp, axis=4)

# Check the size
print(Mesures_Press.shape)
# (2, 6, 6, 2, 11)

这里有一些代码更接近我的需要。如果您有聪明的方法,请告诉我,因为我无法使用此代码管理具有不同样本长度的信号:

import numpy as np


Decoupe_Sections = 6
Decoupe_Pressions = 6


# Add some default values to test the code.
Mesures_Press = np.zeros((2, Decoupe_Sections, Decoupe_Pressions, 2, 0))   # structure for insert datas in the while loops : Mesures_Press[id_Pressure_Positive][id_CrossSection][id_Pressure][time,datas]

for id_Pressure_Positive in range(2):

    for id_CrossSection in range(Decoupe_Sections):
        

        for id_Pressure in range(Decoupe_Pressions):


            Mes_Press = id_Pressure*np.ones((2, 10)) # we simulate of data given by sensor as zeros values
            ############################################
            # Get the number of samples to be added for the id_CrossSection and the id_Pressure.
            m = Mes_Press[0:2,:].shape[1]
            if m != Mesures_Press.shape[5:]:
                Mesures_Press = np.resize(Mesures_Press,(2, Decoupe_Pressions, Decoupe_Sections, 2, m))
            # Create a temporary variable with zeros values
            # size (2, 6, 6, 2, #numsamples)
            
            # Assign the values.
            Mesures_Press[id_Pressure_Positive, id_CrossSection, id_Pressure, 0:2, 0:m] = Mes_Press[0:2,:]  # here there is a pbl when the loop id_CrossSection goes +1, the previous datas are replaced by news ones 
            #print(shape(temp[id_Pressure_Positive, id_CrossSection, id_Pressure, 0:2, 0:m]))
            # Now append to the main array
            
            # Check the size
            print("Matrix for ",Mesures_Press[id_Pressure_Positive, id_CrossSection, id_Pressure,:])
            print("#######################")