套接字阻止在 Java 服务器和 Python 客户端之间发送消息

Socket blocks sending messages between a Java server and Python client

我需要在本地 Windows 机器上的 Java 应用程序和 python 脚本之间传递一些数据字符串。因此,我决定使用一个 Java 套接字服务器,该服务器通过 TCP 与 python 客户端进行通信。 Java 创建两个线程来处理本地主机端口 9998 和 9999 上的两个套接字连接。我使用端口 9998 来处理传入消息,而我使用端口 9999 来处理发送消息。 我的两个应用程序 运行 对于前几条消息 sent/received 很顺利,并且在某个时候,它停止在将字符串从 Java 发送到 Python 的调用上。 这是我的部分代码:

这个Javaclass处理套接字服务器的创建和通信

    public class ServerSocketConnection {

    private int port;
    private Socket socket;
    private ServerSocket serverSocket;
    private Logger logger;
    private BufferedWriter out;
    private BufferedReader in;

    public ServerSocketConnection(int port) {
        this.port = port;
        logger = App.getLogger();
    }

    // Create a server for a socket connection
    public void createServer() {
        try {
            // Create a server socket
            serverSocket = new ServerSocket(port);
            // Socket creation
            socket = serverSocket.accept();
            // Create a print writer
            out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            // Create a buffered reader
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        } catch (IOException e) {
            logger.severe("Error creating server socket");
        }
    }

    // Close the server socket
    public void closeServer() {
        try {
            serverSocket.close();
        } catch (IOException e) {
            logger.severe("Error closing server socket");
        }
    }

    public void sendMessage(String message) {
        try {
            // Sending the byte lenght of the message
            byte[] ptext = message.getBytes("UTF-8");
            send(String.valueOf(ptext.length));
            // Sending the message
            send(message);
        } catch (IOException e) {
            logger.severe("Error sending message:" + e.getMessage());
        }
    }

    private void send(String message) throws IOException {
        out.write(message);
        out.newLine();
        out.flush();
    }

    public String receiveMessage() {
        try {
            return in.readLine();
        } catch (IOException e) {
            logger.severe("Error receiving message");
            return null;
        }
    }

这是处理消息发送的 Java 线程。它从其他线程共享的队列中获取要发送的消息。

public class SendToPlatform implements Runnable {

    private static final int PORT = 9999;
    private Thread worker;
    private AtomicBoolean running;
    private AtomicBoolean stopped = new AtomicBoolean(false);
    private BlockingQueue<String> queueOut;
    private Logger logger;
    private ServerSocketConnection serverSocketConnection;

    public SendToPlatform(BlockingQueue<String> queueOut, AtomicBoolean running) {
        this.queueOut = queueOut;
        this.running = running;
        this.logger = App.getLogger();
        serverSocketConnection = new ServerSocketConnection(PORT);
    }

    public void run() {
        stopped.set(false);
        serverSocketConnection.createServer();
        while (running.get()) {
            socketSender();
        }
        stopped.set(true);
    }

    private void socketSender() {
        if (!queueOut.isEmpty()) {
            String element = null;
            try {
                element = queueOut.poll(1000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                logger.severe("SendToPlatform: InterruptedException: " + e.getMessage());
            }
            serverSocketConnection.sendMessage(element);
        }
    }
}

这是用于从 Java 套接字服务器接收消息的 python 线程:

    def __socket_reading_no_server(self, queue_input : queue.Queue, is_running : bool):
        HOST = "localhost"
        PORT = 9999
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect((HOST, PORT))

        while is_running:
            data = s.recv(4)
            message_size = int(data.decode('UTF-8').strip())
            data = s.recv(min(message_size + 2, 1024))
            message = data.decode('UTF-8').strip()
            queue_input.put(message)
        s.close()

并且此方法通过以下指令作为线程启动:

input_thread = threading.Thread(target=self.__socket_reading_no_server , args =(self.__queue_input, self.__running, ), daemon=True)
input_thread.start()

通过调试、记录和使用 Wireshark 了解我的代码中的问题,我得出结论,我的 out.write 指令经常出现问题,该指令在正确发送大约 10 条消息后发送消息时阻塞.当我关闭套接字连接时,挂起的消息被释放。我尝试使用 PrintWriterDataOutputStream 而不是 BufferedWriter, 但出现了同样的问题。我试过在发送字符串之前不发送消息的长度来适配s.recv()大小,但是还是出现了同样的问题。 我是套接字编程的新手,可能我做了一些非常错误的事情,但我找不到问题出在哪里。也许有更好的方法在我不知道的进程之间传递数据,我可以用它来代替套接字来满足我的需要?

@absuu 回答后的编辑

应用答案中建议的更正后,在尝试写入套接字时,我仍然在发送方法阻塞中遇到相同的问题 out.write。我按如下方式编辑了我的代码:

public class ServerSocketConnection {
    [...]
    public void sendMessage(String message) {
        try {
            send(message);
        } catch (IOException e) {
            logger.severe("Error sending message:" + e.getMessage());
        }
    }

    private void send(String message) throws IOException {
        message += "\r\n";
        byte[] ptext = message.getBytes("UTF-8");
        out.write(String.format("%2d",ptext.length));
        out.write("\r\n");
        out.flush();
        out.write(new String(ptext));
        logger.info("Data sent");
        out.flush();
    }
}

我还增加了 s.recv 尺寸,但没有任何变化

这里是 Java 客户端和 Python 服务器的“缩减”实现,演示了发送任意长度消息(在本例中为字符串)的有效机制:

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;

public class Client {

    public static void main(String[] args) throws IOException {
        String message = "Talking isn't doing. It is a kind of good deed to say well; and yet words are not deeds.";
        try (Socket socket = new Socket("localhost", 7070)) {
            try (DataOutputStream out = new DataOutputStream(socket.getOutputStream())) {
                byte[] dts = message.getBytes();
                out.writeInt(dts.length);
                out.write(dts);
            }
        }
    }
}

请注意客户端如何在发送实际消息之前将即将到来的消息的长度作为 32 位整数发送。

from multiprocessing.connection import Listener

def main():
    listener = Listener(address=('0.0.0.0', 7070), family='AF_INET', authkey=None)
    with listener.accept() as conn:
        print(conn.recv_bytes().decode())
            

if __name__ == '__main__':
    main()

连接 class 期望使用在客户端实现的协议(在本例中为 Java)接收消息 - 即一个 32 位整数,给出要遵循的数据量.

我希望这能澄清问题

TL;DR ,请参阅下面的 代码更正


您的任何问题之前,有几点需要注意:

  1. 将服务器的数据编码从 UTF-16 更改为 UTF-8为什么? 任何端点之间的数据传输都依赖于 consistency,即 server/client 应用程序的数据编码。当您的服务器 (ServerSocketConnection) 正在发送使用 UTF-16 编码的消息时,您的客户端 (__socket_reading_no_server) 正在接收使用 UTF-8 编码的消息。即使您的客户端能够接收来自服务器的所有消息,它也根本无法识别它们。例如,UTF-16 将字符串 "5" 编码为字节 [0,53],对于 UTF-8 结果是 [53](假设大端字节顺序)。详情请参阅 Wikipedia
  2. 不要使用out.newLine()。请改用 out.write("\r\n")为什么? newline() 的行为取决于平台,这导致 returning 一个或两个字符,对于类 Unix OS 或 Windows OS,分别。它依赖于line.separator系统属性,您可以参考Java doc了解更多详情。
  3. 您的 data = s.recv(4) 限制您的客户端一次最多读取 4 个字节,这很危险。 为什么? 因为根据 Python doc,你的 4 不是客户端接收到的实际字节数,而是 最大数据量收到。此外,客户端理论上最多只能接收下一条传入消息的 9999 字节(byte1~4:'9')。

关于你的问题"...在发送...时阻塞的指令": 不幸的是,这里没有提供错误信息,我们无法准确推断出是哪一部分整个事情都出了问题。但是,我们可以推断,这更有可能是 Java 对网络套接字实现的结果,因为您遇到的情况在 C 网络编程(即原始套接字)中可能并不常见,即 [根据write系统调用的POSIX定义,=79=]阻塞不会在连续传输字节期间发生(注意大多数高级语言最终会调用write系统调用发送字节):

Upon successful completion, write() and pwrite() shall return the number of bytes actually written to the file associated with fildes. This number shall never be greater than nbyte. Otherwise, -1 shall be returned and errno set to indicate the error.

也就是说,在调用write将字节发送到流缓冲区后,它只会return something,而不是blocking .

网络套接字的 Java 实现相当复杂,这绝对不是 Java 的错。事实上,如果我们能够正确使用套接字,那么这些隐蔽的错误就会消失。例如,根据我的测试,应用以下更正后,您的应用程序 工作正常:


代码更正:

  1. ServerSocketConnection / byte[] ptext = message.getBytes("UTF-16"); -> byte[] ptext = message.getBytes("UTF-8");
  2. ServerSocketConnection / send(String.valueOf(ptext.length)); -> send(String.format("%2d",ptext.length));
  3. ServerSocketConnection / out.newLine() -> out.write("\r\n")

测试:

服务器:

BlockingQueue<String> q = new ArrayBlockingQueue<String>(20);
q.add("str 1");
q.add("str 2");
q.add("str 3");
serverSocketConnection.sendMessage(element);
logger.info("element:"+element);  // debug
########################################################## Server Outputs  
Nov 11, 2021 11:05:51 PM SendToPlatform socketSender
INFO: element:str 1
Nov 11, 2021 11:05:51 PM SendToPlatform socketSender
INFO: element:str 2
Nov 11, 2021 11:05:51 PM SendToPlatform socketSender
INFO: element:str 3

客户:

print("message: %s" % message)   # debug
# queue_input.put(message)
########################################################## Client Outputs
message: str 1
message: str 2
message: str 3

编辑:

我还想强调的一件事是,虽然我们实际上不知道您的应用程序要做什么,但可以通过网络套接字进行简单的基于长度的消息传输。也许它不是很实用和健壮,但它绝对是可能的。以下是对您的代码的一些更详细的更正:

ServerSocketConnection :

// ............. other parts stay unchanged
public void sendMessage(String message) {
    try {
        int msgLen = message.getBytes("UTF-8").length;
        send(String.format("%3d", msgLen));     // tell client the message size
        send(message);      // send actual message
    } catch (IOException e) {
        logger.severe("Error sending message:" + e.getMessage());
    }
}

private void send(String message) throws IOException {
    out.write(message);
    out.flush();
}
// ............. other parts stay unchanged

__socket_reading_no_server :

# ............. other parts stay unchanged
while is_running:
    data = s.recv(3)
    message_size = int(data.decode('UTF-8').strip())
    data = s.recv(min(message_size, 1024))
    message = data.decode('UTF-8').strip()
    print("incoming message:[%s]" % message)
# ............. other parts stay unchanged

非常非常非常重要 :

这里我们有 M=3,它限制了您的任何消息都应该是一个字符串,最多 包含 999 个字符!也就是说,如果您希望您的应用程序正常工作,那么您的每条消息,比如说 String msg,都应该满足 msg.length <= 999.