使用 Python 子进程处理交互式 shell
Handling interactive shells with Python subprocess
我正在尝试 运行 一个基于控制台的游戏的多个实例(地牢爬行石头汤——自然用于研究目的)使用多处理池来评估每个 运行。
过去,当我使用池来评估类似代码(遗传算法)时,我使用 subprocess.call
来拆分每个进程。但是,由于 dcss 具有共享子 shell 的交互性,这似乎是有问题的。
我有我通常用于此类事情的代码,用爬网替换我已经投入 GA 的其他应用程序。有没有比这更好的方法来处理高度交互的 shells?我考虑过为每个实例启动一个屏幕,但认为有更简洁的方法。我的理解是 shell=True
应该产生一个子 shell,但我想我是在以每次调用之间共享的方式产生一个。
我应该提一下我有一个机器人 运行玩游戏,所以我不希望发生任何来自用户端的实际交互。
# Kick off the GA execution
pool_args = zip(trial_ids,run_types,self.__population)
pool.map(self._GAExecute, pool_args)
---
# called by pool.map
def _GAExecute(self,pool_args):
trial_id = pool_args[0]
run_type = pool_args[1]
genome = pool_args[2]
self._RunSimulation(trial_id)
# Call the actual binary
def _RunSimulation(self, trial_id):
command = "./%s" % self.__crawl_binary
name = "-name %s" % trial_id
rc = "-rc %s" % os.path.join(self.__output_dir,'qw-%s'%trial_id,"qw -%s.rc"%trial_id)
seed = "-seed %d" % self.__seed
cdir = "-dir %s" % os.path.join(self.__output_dir,'qw-%s'%trial_id)
shell_command = "%s %s %s %s %s" % (command,name,rc,seed,cdir)
call(shell_command, shell=True)
为每个调用指定具有唯一文件句柄的标准输入、标准输出和标准错误:
import subprocess
cmd = ""
fout = open('stdout.txt','w')
fin = open('stdin.txt','r')
ferr = open('stderr.txt','w')
subprocess.call(cmd, stdout=fout , stdin = fin , stderr=ferr )
您确实可以将 stdin 和 stdout 关联到文件,如@napuzba 的回答:
fout = open('stdout.txt','w')
ferr = open('stderr.txt','w')
subprocess.call(cmd, stdout=fout, stderr=ferr)
另一种选择是使用 Popen instead of call. The difference is that call waits for completion (is blocking) while Popen is not, see What's the difference between subprocess Popen and call (how can I use them)?
使用 Popen,您可以将 stdout 和 stderr 保留在您的对象中,然后在以后使用它们,而不必依赖文件:
p = subprocess.Popen(cmd,stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.wait()
stderr = p.stderr.read()
stdout = p.stdout.read()
此方法的另一个潜在优势是您可以 运行 多个 Popen 实例而无需等待完成而不是使用线程池:
processes=[
subprocess.Popen(cmd1,stdout=subprocess.PIPE, stderr=subprocess.PIPE),
subprocess.Popen(cmd2,stdout=subprocess.PIPE, stderr=subprocess.PIPE),
subprocess.Popen(cmd3,stdout=subprocess.PIPE, stderr=subprocess.PIPE)
]
for p in processes:
if p.poll():
# process completed
else:
# no completion yet
附带说明一下,您应该 avoid shell=True
if you can, and if you do not use it Popen expects a list as a command instead of a string. Do not generate this list manually, but use shlex 它将为您处理所有极端情况,例如:
Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
我正在尝试 运行 一个基于控制台的游戏的多个实例(地牢爬行石头汤——自然用于研究目的)使用多处理池来评估每个 运行。
过去,当我使用池来评估类似代码(遗传算法)时,我使用 subprocess.call
来拆分每个进程。但是,由于 dcss 具有共享子 shell 的交互性,这似乎是有问题的。
我有我通常用于此类事情的代码,用爬网替换我已经投入 GA 的其他应用程序。有没有比这更好的方法来处理高度交互的 shells?我考虑过为每个实例启动一个屏幕,但认为有更简洁的方法。我的理解是 shell=True
应该产生一个子 shell,但我想我是在以每次调用之间共享的方式产生一个。
我应该提一下我有一个机器人 运行玩游戏,所以我不希望发生任何来自用户端的实际交互。
# Kick off the GA execution
pool_args = zip(trial_ids,run_types,self.__population)
pool.map(self._GAExecute, pool_args)
---
# called by pool.map
def _GAExecute(self,pool_args):
trial_id = pool_args[0]
run_type = pool_args[1]
genome = pool_args[2]
self._RunSimulation(trial_id)
# Call the actual binary
def _RunSimulation(self, trial_id):
command = "./%s" % self.__crawl_binary
name = "-name %s" % trial_id
rc = "-rc %s" % os.path.join(self.__output_dir,'qw-%s'%trial_id,"qw -%s.rc"%trial_id)
seed = "-seed %d" % self.__seed
cdir = "-dir %s" % os.path.join(self.__output_dir,'qw-%s'%trial_id)
shell_command = "%s %s %s %s %s" % (command,name,rc,seed,cdir)
call(shell_command, shell=True)
为每个调用指定具有唯一文件句柄的标准输入、标准输出和标准错误:
import subprocess
cmd = ""
fout = open('stdout.txt','w')
fin = open('stdin.txt','r')
ferr = open('stderr.txt','w')
subprocess.call(cmd, stdout=fout , stdin = fin , stderr=ferr )
您确实可以将 stdin 和 stdout 关联到文件,如@napuzba 的回答:
fout = open('stdout.txt','w')
ferr = open('stderr.txt','w')
subprocess.call(cmd, stdout=fout, stderr=ferr)
另一种选择是使用 Popen instead of call. The difference is that call waits for completion (is blocking) while Popen is not, see What's the difference between subprocess Popen and call (how can I use them)?
使用 Popen,您可以将 stdout 和 stderr 保留在您的对象中,然后在以后使用它们,而不必依赖文件:
p = subprocess.Popen(cmd,stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.wait()
stderr = p.stderr.read()
stdout = p.stdout.read()
此方法的另一个潜在优势是您可以 运行 多个 Popen 实例而无需等待完成而不是使用线程池:
processes=[
subprocess.Popen(cmd1,stdout=subprocess.PIPE, stderr=subprocess.PIPE),
subprocess.Popen(cmd2,stdout=subprocess.PIPE, stderr=subprocess.PIPE),
subprocess.Popen(cmd3,stdout=subprocess.PIPE, stderr=subprocess.PIPE)
]
for p in processes:
if p.poll():
# process completed
else:
# no completion yet
附带说明一下,您应该 avoid shell=True
if you can, and if you do not use it Popen expects a list as a command instead of a string. Do not generate this list manually, but use shlex 它将为您处理所有极端情况,例如:
Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)