BufferedReader readLine() 阻塞直到缓冲区已满

BufferedReader readLine() blocking until buffer is full

我正在编写一对 client/server 应用程序。服务器 运行 的多个线程收集数据并将其添加到 BlockingQueue。套接字代码遍历队列并将找到的任何数据发送给客户端。数据是一个字符串,我附加了一个行分隔符,以便客户端可以使用 BufferedReader.readLine().

读取它

我的问题是,readLine() 不是在可用的每一行上返回,而是等到整个缓冲区已满,然后才吐出缓冲区中的所有完整行。使用默认的 8K 缓冲区,这意味着我通过客户端以 8K 块的形式获取数据,这是非常不受欢迎的。我附上了代表这一点的 MRE 代码。我已经通过登录我的实际应用程序确认 BufferedWriter 正在写入队列中可用的数据,但老实说,我不知道延迟是在发送方之后发生的,还是真的在阅读方面。如果您 运行 MRE,您将看到客户端一次显示大约 170 行数据。

我已经在网上搜索了几天这种现象,我能找到的一个类似问题的片段表明,这可能与底层的 InputStreamReader and/or StreamDecoder 有关,但是开始超出我的专业范围。 (参见 this link

所以我的问题是我是否正确地实施了 BufferedReader 以及如何解决我遇到的问题,以便我在没有不必要的延迟的情况下获得每条传入线路。

package serverTest;

import java.util.concurrent.BlockingQueue;

public class ServerTest {

    public static void main(String[] args) {

        int port = 54321;
        ServerSocketComms server = new ServerSocketComms(port);
        BlockingQueue<String> queue = server.getQueue();
        new Thread(server).start();
        
        ClientSocketComms client = new ClientSocketComms("localhost", port);
        new Thread(client).start();
        
        for(int i = 0; i < 1000; i++) { // should give about 10 seconds of output
            try {
                queue.put("" + i + " - All work and no play makes Jack a dull boy");
                // Slow things down enough to show what's happening
                Thread.sleep(10);
                // 48 characters should fill the 8K buffer in approximately 2 seconds
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}
package serverTest;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ServerSocketComms implements Runnable {

    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
    private final int port;
    
    public ServerSocketComms(int port) {
        this.port = port;
    }
    
    @Override
    public void run() {
        // Open server socket and wait for connection
        try {
            ServerSocket serverSocket = new ServerSocket(port);
            Socket socket = serverSocket.accept();

            // Continually loop over blocking data queue until stopped
            BufferedWriter dataOut = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            while(socket.isConnected()) {
                dataOut.write(queue.take());
                dataOut.newLine(); // delimit strings with a line separator
            }
            
            // Loop never exits because client socket never completes because of BufferedReader issue
            // so sockets never close and application never terminates
            socket.close();
            serverSocket.close();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public BlockingQueue<String> getQueue() {
        // Return a reference to the sending queue to be populated by other threads
        return this.queue;
    }
}
package serverTest;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;

public class ClientSocketComms implements Runnable {

    private final String server;
    private final int port;
    
    public ClientSocketComms(String server, int port) {
        this.server = server;
        this.port = port;
    }
    
    @Override
    public void run() {
        // Open socket to server and wait for incoming data
        try {
            Socket socket = new Socket(server, port);
            BufferedReader dataIn = new BufferedReader(new InputStreamReader(socket.getInputStream()));

            // Continually loop over incoming data until stopped
            String data;
            while((data = dataIn.readLine()) != null) {
                // Should print out every line as it's received,
                // but instead waits until buffer is full
                // (outputs about 170 lines at a time)
                System.out.println(data);
            }
            
            // Close socket and thread will die
            // (but loop never ends because buffer doesn't get completely refilled)
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

您的服务器正在使用 BufferedWriter:

BufferedWriter dataOut = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));

这个做你不喜欢的缓冲。似乎默认缓冲区大小是您看到的 8k,尽管 API 中没有记录并且可能会更改。如果在某个时间点您想要确保到目前为止存储在缓冲区中的所有内容都立即发送给客户端,请尝试使用 dataOut.flush() 刷新缓冲区。详情请查看 BufferedWriter API

顺便说一句,我没有检查你的代码是否还有其他问题。不过以上肯定是一个