我可以使用哪种 Java 线程池来处理来自套接字的数据?

What sort of Java Thread pool can I use for processing data from sockets?

我有以下启动线程服务器的代码:

Thread server = new Thread(new ServerRunnable(serverPort, devMode, messageQueue, database));
server.start();

Thread worker1 = new Thread(
    new WorkerRunnable(
        messageQueue,
        database, 
        devMode, 
        1
    ));
Thread worker2 = new Thread(
    new WorkerRunnable(
        messageQueue,
        database, 
        devMode, 
        2
    ));
Thread worker3 = new Thread(
    new WorkerRunnable(
        messageQueue,
        database, 
        devMode, 
        3
    ));

worker1.start();
worker2.start();
worker3.start();

ServerRunnable 将包含字节读取和其他信息的状态对象传递到 messageQueueWorkerRunnable 线程获取消息并处理它们。

我正在查看 ThreadPoolExecutor,希望我可以用它来用一个可以根据需要增长或收缩的池替换上面的三个工作线程,但它没有按我预期的方式工作。它需要一个任务队列来完成。

我的代码正在使用 Java NIO,因此无法保证放入队列的项目是完整的,因此可能需要进一步处理。因此,我无法按照我最初想象的方式使用 ThreadPoolExecutor,这会将 Runnable 传递给它包含队列。

所以,如果我想在这里使用 ThreadPoolExecutor,我必须将它添加到 ServerRunnable class(有点颠倒我当前的过程),然后通过在向 WorkerRunnable class 传递消息参数后,将一个新的 WorkerRunnable class 放到 ThreadPoolExecutor 队列中。 那是对的吗?

可能是这样的:

LinkedBlockingQueue messageQueue = new LinkedBlockingQueue<Runnable>();

Thread server = new Thread(
    new ServerRunnable(
        serverPort, 
        devMode, 
        messageQueue, 
        database
    ));
server.start();
int  corePoolSize  =    5;
int  maxPoolSize   =   10;
long keepAliveTime = 5000;

ExecutorService threadPoolExecutor = 
    new ThreadPoolExecutor(
        corePoolSize,
        maxPoolSize,
        keepAliveTime,
        TimeUnit.MILLISECONDS,
        messageQueue); 
threadPoolExecutor.prestartAllCoreThreads()

然后在 ServerRunnable class:

// Process incoming bytes into a message

messageQueue.put(new WorkerRunnable(database, devMode, message));

我的理解正确吗?

是的,或者你可以让单个消费者进入你的队列,它将继续将任务传递给 ExecutorService。像这样

Thread consumer = new Thread(new Consumer(queue));
consumer.start();

class Consumer implements Runnable {
    private ExecutorService     service = Executors.newCachedThreadPool();
    private final BlockingQueue queue;

    public Consumer(Queue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        Task t = null;
        while(t = queue.take()) {
            Worker worker = new Worker(t);
            service.execute(worker);
        }
    }
}

这与 ExecutorService 的耦合有点松散。