Java 具有并发 [​​=10=] 流的进程

Java Process with concurrent Input/Output Streams

我正在尝试创建一种 console/terminal 允许用户输入字符串,然后将其制作成流程并打印出结果。就像一个普通的控制台。但是我在管理 input/output 流时遇到问题。我调查了 this thread,但很遗憾,该解决方案不适用于我的问题。

连同 "ipconfig" 和 "cmd.exe" 等标准命令,我需要能够 运行 脚本并使用相同的输入流来传递一些参数,如果脚本是征求意见。

例如,在 运行 运行脚本 "python pyScript.py" 之后,如果脚本需要,我应该能够将进一步的输入传递给脚本(例如:raw_input),而还打印脚本的输出。您期望终端的基本行为。

到目前为止我得到了什么:

import java.awt.BorderLayout;
import java.awt.Color;
import java.awt.Dimension;
import java.awt.event.KeyEvent;
import java.awt.event.KeyListener;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;

import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTextPane;
import javax.swing.text.BadLocationException;
import javax.swing.text.Document;

public class Console extends JFrame{

    JTextPane inPane, outPane;
    InputStream inStream, inErrStream;
    OutputStream outStream;

    public Console(){
        super("Console");
        setPreferredSize(new Dimension(500, 600));
        setLocationByPlatform(true);
        setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);

        // GUI
        outPane = new JTextPane();
        outPane.setEditable(false);
        outPane.setBackground(new Color(20, 20, 20));
        outPane.setForeground(Color.white);
        inPane = new JTextPane();
        inPane.setBackground(new Color(40, 40, 40));
        inPane.setForeground(Color.white);
        inPane.setCaretColor(Color.white);

        JPanel panel = new JPanel(new BorderLayout());
        panel.add(outPane, BorderLayout.CENTER);
        panel.add(inPane, BorderLayout.SOUTH);

        JScrollPane scrollPanel = new JScrollPane(panel);

        getContentPane().add(scrollPanel);

        // LISTENER
        inPane.addKeyListener(new KeyListener(){
            @Override
            public void keyPressed(KeyEvent e){
              if(e.getKeyCode() == KeyEvent.VK_ENTER){
                    e.consume();
                    read(inPane.getText());
                }
            }
            @Override
            public void keyTyped(KeyEvent e) {}

            @Override
            public void keyReleased(KeyEvent e) {}
        });


        pack();
        setVisible(true);
    }

    private void read(String command){
        println(command);

        // Write to Process
        if (outStream != null) {
            System.out.println("Outstream again");
            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outStream));
            try {
                writer.write(command);
                //writer.flush();
                //writer.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }

        // Execute Command
        try {
            exec(command);
        } catch (IOException e) {}

        inPane.setText("");
    }

    private void exec(String command) throws IOException{
        Process pro = Runtime.getRuntime().exec(command, null);

        inStream = pro.getInputStream();
        inErrStream = pro.getErrorStream();
        outStream = pro.getOutputStream();

        Thread t1 = new Thread(new Runnable() {
            public void run() {
                try {
                    String line = null;
                    while(true){
                        BufferedReader in = new BufferedReader(new InputStreamReader(inStream));
                        while ((line = in.readLine()) != null) {
                            println(line);
                        }
                        BufferedReader inErr = new BufferedReader(new InputStreamReader(inErrStream));
                        while ((line = inErr.readLine()) != null) {
                            println(line);
                        }
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
    }

    public void println(String line) {
        Document doc = outPane.getDocument();
        try {
            doc.insertString(doc.getLength(), line + "\n", null);
        } catch (BadLocationException e) {}
    }

    public static void main(String[] args){
        new Console();
    }
}

我不使用提到的 ProcessBuilder,因为我喜欢区分错误流和正常流。

2016 年 8 月 29 日更新

在@ArcticLord 的帮助下,我们已经实现了原始问题中的要求。 现在只需消除任何奇怪的行为,如非终止过程。控制台有一个 "stop" 按钮,它只调用 pro.destroy()。但出于某种原因,这不适用于无限 运行ning 进程,即垃圾邮件输出。

控制台:http://pastebin.com/vyxfPEXC

InputStreamLineBuffer: http://pastebin.com/TzFamwZ1

停止的示例代码:

public class Infinity{
    public static void main(String[] args){ 
        while(true){
            System.out.println(".");
        }
    }
}

停止的示例代码:

import java.util.concurrent.TimeUnit;

public class InfinitySlow{
    public static void main(String[] args){ 
        while(true){
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(".");
        }
    }
}

你的代码是正确的。你只是错过了一些小事。
让我们从您的 read 方法开始:

private void read(String command){
    [...]
    // Write to Process
    if (outStream != null) {
        [...]
        try {
            writer.write(command + "\n");  // add newline so your input will get proceed
            writer.flush();  // flush your input to your process
        } catch (IOException e1) {
            e1.printStackTrace();
        }
    }
    // ELSE!! - if no outputstream is available
    // Execute Command
    else {
        try {
            exec(command);
        } catch (IOException e) {
            // Handle the exception here. Mostly this means
            // that the command could not get executed
            // because command was not found.
            println("Command not found: " + command);
        }
    }
    inPane.setText("");
}

现在让我们修复您的 exec 方法。您应该使用单独的线程来读取正常的进程输出和错误输出。此外,我引入了第三个线程,它等待进程结束并关闭 outputStream,因此下一个用户输入不是用于进程,而是一个新命令。

private void exec(String command) throws IOException{
    Process pro = Runtime.getRuntime().exec(command, null);

    inStream = pro.getInputStream();
    inErrStream = pro.getErrorStream();
    outStream = pro.getOutputStream();

    // Thread that reads process output
    Thread outStreamReader = new Thread(new Runnable() {
        public void run() {
            try {
                String line = null;
                BufferedReader in = new BufferedReader(new InputStreamReader(inStream));                        
                while ((line = in.readLine()) != null) {
                    println(line);                       
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("Exit reading process output");
        }
    });
    outStreamReader.start();

    // Thread that reads process error output
    Thread errStreamReader = new Thread(new Runnable() {
        public void run() {
            try {
                String line = null;           
                BufferedReader inErr = new BufferedReader(new InputStreamReader(inErrStream));
                while ((line = inErr.readLine()) != null) {
                    println(line);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("Exit reading error stream");
        }
    });
    errStreamReader.start();

    // Thread that waits for process to end
    Thread exitWaiter = new Thread(new Runnable() {
        public void run() {
            try {
                int retValue = pro.waitFor();
                println("Command exit with return value " + retValue);
                // close outStream
                outStream.close();
                outStream = null;
            } catch (InterruptedException e) {
                e.printStackTrace(); 
            } catch (IOException e) {
                e.printStackTrace();
            } 
        }
    });
    exitWaiter.start();
}

现在应该可以了。
如果您输入 ipconfig,它会打印命令输出,closes 输出流并准备好接收新命令。
如果您输入 cmd,它会打印输出并让您输入更多 cmd 命令,例如 dircd 等等,直到您输入 exit。然后它关闭os输出流并准备好接收新命令。

您可能 运行 在执行 python 脚本时遇到问题,因为如果它们没有被刷新到系统管道中,则使用 Java 读取 Process InputStreams 时会出现问题。
请参阅此示例 python 脚本

print "Input something!"
str = raw_input()
print "Received input is : ", str

您可以 运行 使用您的 Java 程序并进行输入,但在脚本完成之前您不会看到脚本输出。
我能找到的唯一修复方法是手动刷新脚本中的输出。

import sys
print "Input something!"
sys.stdout.flush()
str = raw_input()
print "Received input is : ", str
sys.stdout.flush()

运行 该脚本将如您所愿。
您可以在

阅读有关此问题的更多信息
  • Java: is there a way to run a system command and print the output during execution?
  • Why does reading from Process' InputStream block altough data is available
  • Java: can't get stdout data from Process unless its manually flushed

编辑:我刚刚找到另一个非常简单的解决方案来解决 Python 脚本的 stdout.flush() 问题。用 python -u script.py 启动它们,你不需要手动刷新。这应该可以解决您的问题。

EDIT2:我们在评论中讨论过,使用此解决方案输出和错误流将混淆,因为它们 运行 在不同的线程中。这里的问题是,当错误流线程出现时,我们无法区分输出写入是否完成。否则 classic 带锁的线程调度可以处理这种情况。但是无论数据是否流动,我们都有一个连续的流,直到过程完成。所以我们在这里需要一种机制来记录自从每个流读取最后一行以来已经过去了多少时间。

为此,我将介绍一个 class,它获取一个 InputStream 并启动一个线程来读取传入的数据。该线程将每一行存储在队列中,并在流结束时停止。此外,它保存最后一行被读取并添加到队列的时间。

public class InputStreamLineBuffer{
    private InputStream inputStream;
    private ConcurrentLinkedQueue<String> lines;
    private long lastTimeModified;
    private Thread inputCatcher;
    private boolean isAlive;

    public InputStreamLineBuffer(InputStream is){
        inputStream = is;
        lines = new ConcurrentLinkedQueue<String>();
        lastTimeModified = System.currentTimeMillis();
        isAlive = false;
        inputCatcher = new Thread(new Runnable(){
            @Override
            public void run() {
                StringBuilder sb = new StringBuilder(100);
                int b;
                try{
                    while ((b = inputStream.read()) != -1){  
                        // read one char
                        if((char)b == '\n'){
                            // new Line -> add to queue
                            lines.offer(sb.toString());
                            sb.setLength(0); // reset StringBuilder
                            lastTimeModified = System.currentTimeMillis();
                        }
                        else sb.append((char)b); // append char to stringbuilder
                    }
                } catch (IOException e){
                    e.printStackTrace();
                } finally {
                    isAlive = false;
                }
            }});
    }
    // is the input reader thread alive
    public boolean isAlive(){
        return isAlive;
    }
    // start the input reader thread
    public void start(){
        isAlive = true;
        inputCatcher.start();
    }
    // has Queue some lines
    public boolean hasNext(){
        return lines.size() > 0;
    }
    // get next line from Queue
    public String getNext(){
        return lines.poll();
    }
    // how much time has elapsed since last line was read
    public long timeElapsed(){
        return (System.currentTimeMillis() - lastTimeModified);
    }
}

有了这个 class 我们可以将输出和错误读取线程合二为一。当输入读取缓冲区线程处于活动状态且尚未消耗数据时,它就会存在。在每个 运行 中,它检查自上次读取输出以来是否已经过去了一段时间,如果是,它会一次打印所有未打印的行。与错误输出相同。然后它会因为不浪费 cpu 时间而休眠几毫秒。

private void exec(String command) throws IOException{
    Process pro = Runtime.getRuntime().exec(command, null);

    inStream = pro.getInputStream();
    inErrStream = pro.getErrorStream();
    outStream = pro.getOutputStream();

    InputStreamLineBuffer outBuff = new InputStreamLineBuffer(inStream);
    InputStreamLineBuffer errBuff = new InputStreamLineBuffer(inErrStream);

    Thread streamReader = new Thread(new Runnable() {       
        public void run() {
            // start the input reader buffer threads
            outBuff.start();
            errBuff.start();

            // while an input reader buffer thread is alive
            // or there are unconsumed data left
            while(outBuff.isAlive() || outBuff.hasNext() ||
                errBuff.isAlive() || errBuff.hasNext()){

                // get the normal output if at least 50 millis have passed
                if(outBuff.timeElapsed() > 50)
                    while(outBuff.hasNext())
                        println(outBuff.getNext());
                // get the error output if at least 50 millis have passed
                if(errBuff.timeElapsed() > 50)
                    while(errBuff.hasNext())
                        println(errBuff.getNext());
                // sleep a bit bofore next run
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }                 
            }
            System.out.println("Finish reading error and output stream");
        }          
    });
    streamReader.start();

    // remove outStreamReader and errStreamReader Thread
    [...]
}

也许这不是一个完美的解决方案,但它应该可以处理这里的情况。


编辑 (31.8.2016)
我们在评论中讨论了在实现终止启动的停止按钮时代码仍然存在问题 使用 Process#destroy() 进行处理。产生大量输出的过程,例如在无限循环中 通过调用 destroy() 立即销毁。但是因为它已经产生了很多必须消耗的输出 通过我们的 streamReader 我们无法回到正常的程序行为。
所以我们需要在这里做一些小改动:

我们将在 InputStreamLineBuffer 中引入一个 destroy() 方法来停止读取输出并清除队列。
更改将如下所示:

public class InputStreamLineBuffer{
    private boolean emergencyBrake = false;
    [...]
    public InputStreamLineBuffer(InputStream is){
        [...]
                while ((b = inputStream.read()) != -1 && !emergencyBrake){
                    [...]
                }
    }
    [...]

    // exits immediately and clears line buffer
    public void destroy(){
        emergencyBrake = true;
        lines.clear();
    }
}

以及主程序的一些小改动

public class ExeConsole extends JFrame{
    [...]
    // The line buffers must be declared outside the method
    InputStreamLineBuffer outBuff, errBuff; 
    public ExeConsole{
        [...]
        btnStop.addActionListener(new ActionListener() {
            public void actionPerformed(ActionEvent e) {
                 if(pro != null){
                      pro.destroy();
                      outBuff.destroy();
                      errBuff.destroy();
                 }
        }});
    }
    [...]
    private void exec(String command) throws IOException{
        [...]
        //InputStreamLineBuffer outBuff = new InputStreamLineBuffer(inStream);
        //InputStreamLineBuffer errBuff = new InputStreamLineBuffer(inErrStream);        
        outBuff = new InputStreamLineBuffer(inStream);
        errBuff = new InputStreamLineBuffer(inErrStream);    
        [...]
    }
}

现在它甚至应该能够破坏一些输出垃圾邮件进程。

注意: 我发现 Process#destroy() 无法销毁子进程。因此,如果您在 windows 上开始 cmd 并从那里启动一个 java 程序,你最终会破坏 cmd 进程,而 java 程序仍在 运行ning。 您将在任务管理器中看到它。 java 本身无法解决此问题。它需要 一些 os 依靠外部工具来获取这些进程的 pid 并手动杀死它们。

虽然@ArticLord 的解决方案很好很简洁,但最近我遇到了同样的问题并想出了一个概念上等效的解决方案,但在实现上略有不同。

概念相同,即"bulk reads":当一个reader线程轮到它时,它会消耗它处理的所有流,只有在它完成时才传递手。
这保证了 out/err 打印顺序。

但我没有使用 timer-based 轮流分配,而是使用 lock-based non-blocking 读取模拟:

// main method for testability: replace with private void exec(String command)
public static void main(String[] args) throws Exception
{
    // create a lock that will be shared between reader threads
    // the lock is fair to minimize starvation possibilities
    ReentrantLock lock = new ReentrantLock(true);

    // exec the command: I use nslookup for testing on windows 
    // because it is interactive and prints to stderr too
    Process p = Runtime.getRuntime().exec("nslookup");

    // create a thread to handle output from process (uses a test consumer)
    Thread outThread = createThread(p.getInputStream(), lock, System.out::print);
    outThread.setName("outThread");
    outThread.start();

    // create a thread to handle error from process (test consumer, again)
    Thread errThread = createThread(p.getErrorStream(), lock, System.err::print);
    errThread.setName("errThread");
    errThread.start();

    // create a thread to handle input to process (read from stdin for testing purpose)
    PrintWriter writer = new PrintWriter(p.getOutputStream());
    Thread inThread = createThread(System.in, null, str ->
    {
        writer.print(str);
        writer.flush();
    });
    inThread.setName("inThread");
    inThread.start();

    // create a thread to handle termination gracefully. Not really needed in this simple
    // scenario, but on a real application we don't want to block the UI until process dies
    Thread endThread = new Thread(() ->
    {
        try
        {
            // wait until process is done
            p.waitFor();
            logger.debug("process exit");

            // signal threads to exit
            outThread.interrupt();
            errThread.interrupt();
            inThread.interrupt();

            // close process streams
            p.getOutputStream().close();
            p.getInputStream().close();
            p.getErrorStream().close();

            // wait for threads to exit
            outThread.join();
            errThread.join();
            inThread.join();

            logger.debug("exit");
        }
        catch(Exception e)
        {
            throw new RuntimeException(e.getMessage(), e);
        }
    });
    endThread.setName("endThread");
    endThread.start();

    // wait for full termination (process and related threads by cascade joins)
    endThread.join();

    logger.debug("END");
}

// convenience method to create a specific reader thread with exclusion by lock behavior
private static Thread createThread(InputStream input, ReentrantLock lock, Consumer<String> consumer)
{
    return new Thread(() ->
    {
        // wrap input to be buffered (enables ready()) and to read chars
        // using explicit encoding may be relevant in some case
        BufferedReader reader = new BufferedReader(new InputStreamReader(input));

        // create a char buffer for reading
        char[] buffer = new char[8192];

        try
        {
            // repeat until EOF or interruption
            while(true)
            {
                try
                {
                    // wait for your turn to bulk read
                    if(lock != null && !lock.isHeldByCurrentThread())
                    {
                        lock.lockInterruptibly();
                    }

                    // when there's nothing to read, pass the hand (bulk read ended)
                    if(!reader.ready())
                    {
                        if(lock != null)
                        {
                            lock.unlock();
                        }

                        // this enables a soft busy-waiting loop, that simultates non-blocking reads
                        Thread.sleep(100);
                        continue;
                    }

                    // perform the read, as we are sure it will not block (input is "ready")
                    int len = reader.read(buffer);
                    if(len == -1)
                    {
                        return;
                    }

                    // transform to string an let consumer consume it
                    String str = new String(buffer, 0, len);
                    consumer.accept(str);
                }
                catch(InterruptedException e)
                {
                    // catch interruptions either when sleeping and waiting for lock
                    // and restore interrupted flag (not necessary in this case, however it's a best practice)
                    Thread.currentThread().interrupt();
                    return;
                }
                catch(IOException e)
                {
                    throw new RuntimeException(e.getMessage(), e);
                }
            }
        }
        finally
        {
            // protect the lock against unhandled exceptions
            if(lock != null && lock.isHeldByCurrentThread())
            {
                lock.unlock();
            }

            logger.debug("exit");
        }
    });
}

请注意,@ArticLord 和我的两种解决方案都不完全 starvation-safe,并且机会(真的很少)与消费者速度成反比。

2016 年快乐! ;)