python 多处理代理和 udp 侦听器
python multiprocessing agents and udp listeners
我有一个主服务器 class(服务器需要一个更好的术语),它使用 mulitprocessing 库创建多个客户端(来自客户端 class)。
class mantransact:
def __init__(self,runMode,f_xml):
#call the build nodes function
self.buildNodes()
sockLisProcess = multiprocessing.Process(target=self.sockListener())
sockLisProcess.start()
self.initiateTransactions()
def buildNodes(self,):
n_source = self.f_xml.getElement("nodeSource")
self.log.addToLog ("node source is - %s" % n_source)
self.n_file = load_source.load_source(n_source,"csv")
#remove header from node list
del self.n_file.data_list[0]
self.nodes = [self.mkproc(node, l) for l in self.n_file.data_list]
self.log.addToLog(self.nodes)
def mkproc(self, func, line):
l = "-".join(line)
p = multiprocessing.Process(target=func, args=(l, self.f_xml))
p.start()
return (line[0], p)
def sockListener(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_addresss = ('localhost',10099)
self.sock.bind(server_addresss)
while True:
self.log.addToLog("server is waitin")
data, address = self.sock.recvfrom(1024)
self.log.addToLog(data, address)
def sockSender(self,client_address,d):
self.sock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock2.bind(('localhost',10098))
recip = ('localhost',int(client_address))
self.sock2.sendto(str(d),recip)
self.sock2.close()
def initiateTransactions(self,):
#loop through transaction and then loop node list to match at match transmit transaction
#using UDP ports
for t in self.t_file.data_list:
for n in self.nodes:
if t[0] == n[0]:
for l in self.n_file.data_list:
if l[0] == n[0]:
self.log.addToLog ("gonna transmit UDP transaction to node - %s" % n[0])
client_address = l[1]
pip = n[2]
t.insert(0, "nTransaction")
self.sockSender(client_address, t)
我正在尝试在客户端和节点上创建 UDP 侦听器:
class node:
def __init__(self,child_conn, line, f_xml):
l = line.split("-")
"""extract calues from array and use f_xml for config"""
self.proofProcess = multiprocessing.Process(target=self.runProof(self.waitingTransactions))
self.proofProcess.start()
self.listenProcess = Multiprocessing.Process(target=self.udpListener())
self.listenProcess.start()
def udpListener(self):
lsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
lsock.bind(("localhost",int(self.IP)))
while 1 ==1:
data, addr = lsock.recvfrom(1024)
print ("received message", data)
"""do some things with data"""
我有两个问题:
1 对于我的服务器,我希望我的代码启动这些进程,然后继续实例化或执行其他任务,但代码只是挂起,等待侦听器接收数据包。我是否错误地实例化了处理:
2 我的客户正在执行一项任务来解决问题,并且在该任务完成之前不要启动侦听器。他们不能同时开始他们的任务和聆听吗?如果另一个客户端先解决它,则侦听器旨在中断计算,然后它将从服务器接收新任务
我找到了解决方案。
通过将多处理元素放入单独的进程中:
def loopProcesses(self,procedureName):
processX = multiprocessing.Process(target=procedureName)
processX.start()
return processX
并将要使用的进程的名称放入数组中,循环调用 loopProcesses() 进程,这两个进程并行启动。
m_processes = [self.sockListener(), self.initiateTransactions()]
l_processes = [self.loopProcesses(mp) for mp in m_processes]
上面的方法不起作用,因为调用的函数在找到多个解决方案之前处于连续循环中。调用第一个函数时出现问题,它会在没有启动命令的情况下启动。后来我发现我必须在不使用“()”的情况下调用该函数,然后该函数将等待。修改后的代码是:
p = [multiprocessing.Process(target=self.sockListener),multiprocessing.Process(target=self.initiateTransactions)]
for prc in p:
prc.start()
经过大量搜索后我发现了这个并遇到了这个:Socketserver multiprocessing.Process is starting without calling start()
我有一个主服务器 class(服务器需要一个更好的术语),它使用 mulitprocessing 库创建多个客户端(来自客户端 class)。
class mantransact:
def __init__(self,runMode,f_xml):
#call the build nodes function
self.buildNodes()
sockLisProcess = multiprocessing.Process(target=self.sockListener())
sockLisProcess.start()
self.initiateTransactions()
def buildNodes(self,):
n_source = self.f_xml.getElement("nodeSource")
self.log.addToLog ("node source is - %s" % n_source)
self.n_file = load_source.load_source(n_source,"csv")
#remove header from node list
del self.n_file.data_list[0]
self.nodes = [self.mkproc(node, l) for l in self.n_file.data_list]
self.log.addToLog(self.nodes)
def mkproc(self, func, line):
l = "-".join(line)
p = multiprocessing.Process(target=func, args=(l, self.f_xml))
p.start()
return (line[0], p)
def sockListener(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_addresss = ('localhost',10099)
self.sock.bind(server_addresss)
while True:
self.log.addToLog("server is waitin")
data, address = self.sock.recvfrom(1024)
self.log.addToLog(data, address)
def sockSender(self,client_address,d):
self.sock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock2.bind(('localhost',10098))
recip = ('localhost',int(client_address))
self.sock2.sendto(str(d),recip)
self.sock2.close()
def initiateTransactions(self,):
#loop through transaction and then loop node list to match at match transmit transaction
#using UDP ports
for t in self.t_file.data_list:
for n in self.nodes:
if t[0] == n[0]:
for l in self.n_file.data_list:
if l[0] == n[0]:
self.log.addToLog ("gonna transmit UDP transaction to node - %s" % n[0])
client_address = l[1]
pip = n[2]
t.insert(0, "nTransaction")
self.sockSender(client_address, t)
我正在尝试在客户端和节点上创建 UDP 侦听器:
class node:
def __init__(self,child_conn, line, f_xml):
l = line.split("-")
"""extract calues from array and use f_xml for config"""
self.proofProcess = multiprocessing.Process(target=self.runProof(self.waitingTransactions))
self.proofProcess.start()
self.listenProcess = Multiprocessing.Process(target=self.udpListener())
self.listenProcess.start()
def udpListener(self):
lsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
lsock.bind(("localhost",int(self.IP)))
while 1 ==1:
data, addr = lsock.recvfrom(1024)
print ("received message", data)
"""do some things with data"""
我有两个问题:
1 对于我的服务器,我希望我的代码启动这些进程,然后继续实例化或执行其他任务,但代码只是挂起,等待侦听器接收数据包。我是否错误地实例化了处理:
2 我的客户正在执行一项任务来解决问题,并且在该任务完成之前不要启动侦听器。他们不能同时开始他们的任务和聆听吗?如果另一个客户端先解决它,则侦听器旨在中断计算,然后它将从服务器接收新任务
我找到了解决方案。
通过将多处理元素放入单独的进程中:
def loopProcesses(self,procedureName):
processX = multiprocessing.Process(target=procedureName)
processX.start()
return processX
并将要使用的进程的名称放入数组中,循环调用 loopProcesses() 进程,这两个进程并行启动。
m_processes = [self.sockListener(), self.initiateTransactions()]
l_processes = [self.loopProcesses(mp) for mp in m_processes]
上面的方法不起作用,因为调用的函数在找到多个解决方案之前处于连续循环中。调用第一个函数时出现问题,它会在没有启动命令的情况下启动。后来我发现我必须在不使用“()”的情况下调用该函数,然后该函数将等待。修改后的代码是:
p = [multiprocessing.Process(target=self.sockListener),multiprocessing.Process(target=self.initiateTransactions)]
for prc in p:
prc.start()
经过大量搜索后我发现了这个并遇到了这个:Socketserver multiprocessing.Process is starting without calling start()