向主线程发送线程错误信号

Signal thread error to main thread

我在 Java 中有一个线程连接到套接字并将信息发送到另一个正在处理该信息的线程。

现在,如果 "producer" 线程由于任何原因失败,我希望整个程序停止,因此必须发生某种通知。

这是我的程序(非常简单):

public class Main {
  private Queue<String> q = new ConcurrentLinkedQueue();

  public static void main(String[] args) throws IOException {
    new Thread(new Producer(q)).start();
    new Thread(new Consumer(q)).start();

    // Catch any error in the producer side, then stop both consumer and producer, and do some extra work to notify me that there's an error...
  }
}

主要代码只是创建一个共享队列,并启动生产者和消费者。到目前为止,我想这还好吗?现在生产者代码是这样的:

public class Producer implements Runnable {
  private Queue<String> q;

  public Producer(Queue<String> q) {
    this.q = q;
  }

  public void run() {
    try {
      connectToSocket();
      while(true) {
        String data = readFromSocket()
        q.offer(data);
      }
    } catch (Exception e) {
      // Something really bad happened, notify the parent thread so he stops the program...
    }
  }
}

生产者连接socket,读取并发送字符串数据入队...消费者:

public class Consumer implements Runnable {
  private Queue<String> q;

  public Consumer(Queue<String> q) {
    this.q = q;
  }

  public void run() {
    while(true) {
      String dataFromSocket = q.poll();
      saveData(dataFromSocket);
    }
  }
}

我的代码所做的远不止于此,但我认为现在我想做的事情不言自明了。我读过 wait()notify() 但我认为那行不通,因为我不想等待我的线程出现异常,我想以更好的方式处理它。有哪些替代方案?

总的来说,我的代码看起来合理吗?使用 ExecutorService 对这里有帮助吗?

非常感谢!

您可以使用 ThreadUncaughtExceptionHandler

Thread.setDefaultExceptionHandler(
new UncaughtExceptionHandler() {
    public void unchaughtException(Thread th, Throwable exception) {
        System.out.println("Exception from Thread" + th + ". Exception:" + exception);
    }
});

Java 文档 http://docs.oracle.com/javase/7/docs/api/java/lang/Thread.UncaughtExceptionHandler.html

根据您当前的代码,最简单的解决方案是等待生产者线程完成,然后中断消费者:

Thread producerThread = new Thread(new Producer(q));
producerThread.start();     
Thread consumerThread = new Thread(new Consumer(q));
consumerThread.start();
try {
    producerThread.join();
} finally {
    consumerThread.interrupt();
}

正如您所提到的,执行程序会为您提供一种更通用的方法来在您需要退出时关闭所有内容(例如,当使用 ctrl 在终端中中断时-c).

ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
Producer producer = new Producer(q);
Consumer consumer = new Consumer(q);
executor.submit(producer::run);
executor.submit(consumer::run);
Runtime.getRuntime().addShutdownHook(new Thread(executor::shutdownNow));

请注意,您的清理必须比仅仅关闭执行程序更全面。您必须事先关闭套接字才能中断线程。

这是一个更完整的示例,可以从两侧处理关机。您可以通过使用 nc -l 1234 启动测试服务器来测试它。杀死任一进程(nc 或 java 客户端)将导致另一个进程干净退出。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class Main {
    private ExecutorService executor;
    private Socket socket;
    private AtomicBoolean running = new AtomicBoolean(true);

    public static void main(String[] args) throws IOException {
        Main main = new Main();
        main.run();
    }

    private Main() throws IOException {
        executor = Executors.newCachedThreadPool();
        socket = new Socket("localhost", 1234);
    }

    private void run() throws IOException {
        BlockingQueue<String> q = new SynchronousQueue<>();
        Producer producer = new Producer(socket, q);
        Consumer consumer = new Consumer(q);

        // Start the producer. When it ends call stop
        CompletableFuture.runAsync(producer, executor).whenComplete((status, ex) -> stop());
        // Start the consumer.
        CompletableFuture.runAsync(consumer, executor);
        // Add a shutdown hook to stop everything on break
        Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
    }

    private void stop() {
        if (running.compareAndSet(true, false)) { // only once
            // Close the socket to unblock the producer
            try {
                socket.close();
            } catch (IOException e) {
                // ignore
            }

            // Interrupt tasks
            executor.shutdownNow();
            try {
                // Give tasks some time to clean up
                executor.awaitTermination(1, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                // ignore
            }
        }
    }

    static class Producer implements Runnable {
        private BufferedReader in;
        private BlockingQueue<String> q;

        public Producer(Socket socket, BlockingQueue<String> q) throws IOException {
            this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            this.q = q;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    String data = in.readLine();
                    if (data == null) {
                        break;
                    }
                    q.put(data);
                }
            } catch (InterruptedException | IOException e) {
                // Fall through
            }
            System.err.println("Producer done");
        }
    }

    static class Consumer implements Runnable {
        private BlockingQueue<String> q;

        public Consumer(BlockingQueue<String> q) {
            this.q = q;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    System.out.println(q.take());
                }
            } catch (InterruptedException e) {
                // done
            }
            System.err.println("Client done");
        }
    }
}

将消费者线程作为 'daemon' 线程

启动

将消费者线程标记为'daemon'并让主线程也结束:

来自 Java API Thread.setDaemon(boolean) 的文档:

Marks this thread as either a daemon thread or a user thread. The Java Virtual Machine exits when the only threads running are all daemon threads.

public class Main {
    private Queue<String> q = new ConcurrentLinkedQueue();

    public static void main(String[] args) throws IOException {
        Thread producerThread = new Thread(new Producer(q));
        // producerThread.setDefaultUncaughtExceptionHandler(...);
        producerThread.start();
        Thread consumerThread = new Thread(new Consumer(q));
        consumerThread.setDeamon(true);
        consumerThread.start();
    }

}

这样,当主线程和生产者线程终止(成功或异常)时,您的应用程序会自动停止。

如果主线程需要知道 producerThread 失败,您可以按照@Manish 的建议将其与 UncaughtExceptionHandler 结合使用...

volatile怎么样?

public class Main {
  volatile boolean isStopMain = false;
  private Queue<String> q = new ConcurrentLinkedQueue();

  public static void main(String[] args) throws IOException {
    new Thread(new Producer(q)).start(); 
    new Thread(new Consumer(q)).start();
    // Catch any error in the producer side, then stop both consumer and producer, and do some extra work to notify me that there's an error...
    while (true) {
       if(isStopMain){
      System.exit(0); //or other operation to stop the main thread.
     }
    }
  }
}

在制作人中:

public class Producer implements Runnable {
  private Queue<String> q;

  public Producer(Queue<String> q) {
    this.q = q;
  }

  public void run() {
    try {
      connectToSocket();
      while(true) {
        String data = readFromSocket()
        q.offer(data);
      }
    } catch (Exception e) {
  // Something really bad happened, notify the parent thread so he stops the program...
      Main.isStopMain = true;
    }
  }
}

我想知道你是否试图在子线程中杀死父线程?如果是,您可能需要了解以下内容:How do I terminate parent thread from child?