何时注册 OP_WRITE

When to register OP_WRITE

我使用带有反应器模式的 NIO 将服务器连接到客户端。我的代码如下: 服务器端代码,在if(selectionKey.isWritable){} :

块中
public void isWritable(SelectionKey selectionKey) throws Exception {

        SocketChannel socketChannel =
                (SocketChannel) selectionKey.channel();

        Integer myInteger = (Integer) selectionKey.attachment();

        if (myInteger == null){
            int myJob = jobFacade.isAnyJob(socketChannel, 100 /*deadline*/);
            if (myJob > 0){

                ByteBuffer inputBuffer = ByteBuffer.wrap("available\n".getBytes("UTF-8"));
                socketChannel.write(inputBuffer);
                myInteger = myJob;
                socketChannel.register(
                        selector, SelectionKey.OP_WRITE, myInteger);

            }else if (myJob == -1){

                ByteBuffer inputBuffer = ByteBuffer.wrap("unavailable\n".getBytes("UTF-8"));
                socketChannel.write(inputBuffer);
                socketChannel.close();

                UnsupportedOperationException un = new UnsupportedOperationException();
                throw un;

            }else if (myJob == -2){

                ByteBuffer inputBuffer = ByteBuffer.wrap("pending\n".getBytes("UTF-8"));
                inputBuffer.flip();
                socketChannel.write(inputBuffer);
                myInteger = null;
                socketChannel.register(
                        selector, SelectionKey.OP_WRITE, myInteger);

            }
//            is there any new job to do?
        }else{

            int myInt = myInteger.intValue();

            if ( myInt > 0 ){

                long startRange = jobFacade.findByID(myInt);
                sendTextFile(startRange, Integer.parseInt(properties.getProperty("workUnit")),
                             properties.getProperty("textPath"), socketChannel);
                myInteger = -3;
                socketChannel.register(
                        selector, SelectionKey.OP_WRITE, myInteger);

            }else if (myInt == -3){

                sendAlgorithmFile(socketChannel, properties.getProperty("algorithmPath"));
                myInteger = -4;
                socketChannel.register(
                        selector, SelectionKey.OP_WRITE, myInteger);
//                send algorithm file

            }else if (myInt == -4){
                int isOK = jobFacade.isAccepted(socketChannel.socket().getInetAddress().toString(),
                                                Long.parseLong(properties.getProperty("deadline")));
                if(isOK == -1){

                    ByteBuffer inputBuffer = ByteBuffer.wrap("notaccepted\n".getBytes("UTF-8"));
                    socketChannel.write(inputBuffer);
                    myInteger = null;
                    socketChannel.register(
                            selector, SelectionKey.OP_WRITE, myInteger);
                }else {

                    ByteBuffer inputBuffer = ByteBuffer.wrap("accepted\n".getBytes("UTF-8"));
                    socketChannel.write(inputBuffer);
                    myInteger = isOK;
                    socketChannel.register(
                            selector, SelectionKey.OP_READ, myInteger);
                }
//                send "accepted" or "not accepted"
            }
        }
    }

不需要知道我在每个块中的方法是做什么的,只是这些方法首先生成一个按此顺序排列的数字。 1)myInteger=null, 2) myInteger > 0, 3) myInteger = -3, 4) myInteger = -4 按照这个顺序,OP-WRITE 会连续注册四次。这部分非常重要。那么让我们看看我的客户端代码,然后我会告诉你我的问题:

BufferedReader inFromServer = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
            sentence = inFromServer.readLine();
            System.out.println("Response from Server : " + sentence);


            if (sentence.equals("available")){

                BufferedReader inFromServer1 = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                while ((sentence = inFromServer1.readLine()) != null) {
                     myJob = myJob + sentence ;
                }


inFromServer = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                String acception = inFromServer.readLine();
                if (acception.equals("accepted")){

                    File file = new File("account.json");
                    byte[] bytes = new byte[2048];
                    InputStream inputStream = new FileInputStream(file);
                    OutputStream outputStream = clientSocket.getOutputStream();
                    int count;
                    try {
                        while ((count = inputStream.read(bytes)) > 0){
                            outputStream.write(bytes, 0, count);
                        }
                        outputStream.close();
                        inputStream.close();

                    }catch (IOException io){}

                    continue;

                }else if (acception.equals("notaccepted")){

                    continue;

                }

现在,我的问题是,当我 运行 我的服务器然后我的客户端时,我的服务器将 运行 不等待我的客户端获取输入流。首先,客户端得到 "available" 但是当第二个 getInputStream 将在客户端到达时,服务器会在 OP-WRITE 注册的所有阶段进行调整并等待客户端获取数据流(正如我在代码中定义的那样)。 实际上,我的服务器做得很好。它将按要求的顺序通过所有阶段。但问题是发送和接收数据不同步。 我不知道我的问题是什么。但是我猜当我连续注册 OP-WRITE 时,这意味着我的服务器没有发送所有字节的数据,所以只有第一个 getInputStream 会获取数据。 另一方面,我需要这个命令来 运行 我的程序。那么,有什么想法吗?

先帮你理清问题再思考模式:

您有一个 thread/process 传递一条消息,要求另一个线程/进程根据该消息采取行动。

接收方需要读取消息并可能启动它自己的一些子线程来执行该工作,因为它可以接收其他请求。

最好能告诉发件人已收到请求的确认。

似乎有必要保护消息传递。因为如果在您阅读时收到另一个请求,您最终可能会处理垃圾。

您可以将 nio 配置为有多个读取器和一个写入器,只读取缓冲区的一部分等。查看 how-tos、api 文档。很给力

exactly after sending a message

TCP中没有消息这种东西。它是一个字节流。发送方的两次写入很可能被接收方的一次读取所读取。如果你想要消息,你必须自己实现它们,包括计数词、终止符、STX/ETX、XML 等

我发现了我的问题。我的代码没有问题。 OP_WRITE 可以随时注册任何订单。最重要的是正确写入缓冲区和从套接字读取。 实际上,当我第二次向客户发送内容时,我并没有清除缓冲区。在这种情况下,我找到了它,并进行了更正。 但是当我向我的客户端发送一些字符然后想要发送一个文件时,因为在我的客户端我有一个循环来获取所有字符,如果文件是由同一个循环获取的内容。 这里的问题是如何让它们分开?