何时注册 OP_WRITE
When to register OP_WRITE
我使用带有反应器模式的 NIO 将服务器连接到客户端。我的代码如下:
服务器端代码,在if(selectionKey.isWritable){} :
块中
public void isWritable(SelectionKey selectionKey) throws Exception {
SocketChannel socketChannel =
(SocketChannel) selectionKey.channel();
Integer myInteger = (Integer) selectionKey.attachment();
if (myInteger == null){
int myJob = jobFacade.isAnyJob(socketChannel, 100 /*deadline*/);
if (myJob > 0){
ByteBuffer inputBuffer = ByteBuffer.wrap("available\n".getBytes("UTF-8"));
socketChannel.write(inputBuffer);
myInteger = myJob;
socketChannel.register(
selector, SelectionKey.OP_WRITE, myInteger);
}else if (myJob == -1){
ByteBuffer inputBuffer = ByteBuffer.wrap("unavailable\n".getBytes("UTF-8"));
socketChannel.write(inputBuffer);
socketChannel.close();
UnsupportedOperationException un = new UnsupportedOperationException();
throw un;
}else if (myJob == -2){
ByteBuffer inputBuffer = ByteBuffer.wrap("pending\n".getBytes("UTF-8"));
inputBuffer.flip();
socketChannel.write(inputBuffer);
myInteger = null;
socketChannel.register(
selector, SelectionKey.OP_WRITE, myInteger);
}
// is there any new job to do?
}else{
int myInt = myInteger.intValue();
if ( myInt > 0 ){
long startRange = jobFacade.findByID(myInt);
sendTextFile(startRange, Integer.parseInt(properties.getProperty("workUnit")),
properties.getProperty("textPath"), socketChannel);
myInteger = -3;
socketChannel.register(
selector, SelectionKey.OP_WRITE, myInteger);
}else if (myInt == -3){
sendAlgorithmFile(socketChannel, properties.getProperty("algorithmPath"));
myInteger = -4;
socketChannel.register(
selector, SelectionKey.OP_WRITE, myInteger);
// send algorithm file
}else if (myInt == -4){
int isOK = jobFacade.isAccepted(socketChannel.socket().getInetAddress().toString(),
Long.parseLong(properties.getProperty("deadline")));
if(isOK == -1){
ByteBuffer inputBuffer = ByteBuffer.wrap("notaccepted\n".getBytes("UTF-8"));
socketChannel.write(inputBuffer);
myInteger = null;
socketChannel.register(
selector, SelectionKey.OP_WRITE, myInteger);
}else {
ByteBuffer inputBuffer = ByteBuffer.wrap("accepted\n".getBytes("UTF-8"));
socketChannel.write(inputBuffer);
myInteger = isOK;
socketChannel.register(
selector, SelectionKey.OP_READ, myInteger);
}
// send "accepted" or "not accepted"
}
}
}
不需要知道我在每个块中的方法是做什么的,只是这些方法首先生成一个按此顺序排列的数字。 1)myInteger=null, 2) myInteger > 0, 3) myInteger = -3, 4) myInteger = -4
按照这个顺序,OP-WRITE 会连续注册四次。这部分非常重要。那么让我们看看我的客户端代码,然后我会告诉你我的问题:
BufferedReader inFromServer = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
sentence = inFromServer.readLine();
System.out.println("Response from Server : " + sentence);
if (sentence.equals("available")){
BufferedReader inFromServer1 = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
while ((sentence = inFromServer1.readLine()) != null) {
myJob = myJob + sentence ;
}
inFromServer = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
String acception = inFromServer.readLine();
if (acception.equals("accepted")){
File file = new File("account.json");
byte[] bytes = new byte[2048];
InputStream inputStream = new FileInputStream(file);
OutputStream outputStream = clientSocket.getOutputStream();
int count;
try {
while ((count = inputStream.read(bytes)) > 0){
outputStream.write(bytes, 0, count);
}
outputStream.close();
inputStream.close();
}catch (IOException io){}
continue;
}else if (acception.equals("notaccepted")){
continue;
}
现在,我的问题是,当我 运行 我的服务器然后我的客户端时,我的服务器将 运行 不等待我的客户端获取输入流。首先,客户端得到 "available" 但是当第二个 getInputStream 将在客户端到达时,服务器会在 OP-WRITE 注册的所有阶段进行调整并等待客户端获取数据流(正如我在代码中定义的那样)。
实际上,我的服务器做得很好。它将按要求的顺序通过所有阶段。但问题是发送和接收数据不同步。
我不知道我的问题是什么。但是我猜当我连续注册 OP-WRITE 时,这意味着我的服务器没有发送所有字节的数据,所以只有第一个 getInputStream 会获取数据。
另一方面,我需要这个命令来 运行 我的程序。那么,有什么想法吗?
先帮你理清问题再思考模式:
您有一个 thread/process 传递一条消息,要求另一个线程/进程根据该消息采取行动。
接收方需要读取消息并可能启动它自己的一些子线程来执行该工作,因为它可以接收其他请求。
最好能告诉发件人已收到请求的确认。
似乎有必要保护消息传递。因为如果在您阅读时收到另一个请求,您最终可能会处理垃圾。
您可以将 nio 配置为有多个读取器和一个写入器,只读取缓冲区的一部分等。查看 how-tos、api 文档。很给力
exactly after sending a message
TCP中没有消息这种东西。它是一个字节流。发送方的两次写入很可能被接收方的一次读取所读取。如果你想要消息,你必须自己实现它们,包括计数词、终止符、STX/ETX、XML 等
我发现了我的问题。我的代码没有问题。 OP_WRITE 可以随时注册任何订单。最重要的是正确写入缓冲区和从套接字读取。
实际上,当我第二次向客户发送内容时,我并没有清除缓冲区。在这种情况下,我找到了它,并进行了更正。
但是当我向我的客户端发送一些字符然后想要发送一个文件时,因为在我的客户端我有一个循环来获取所有字符,如果文件是由同一个循环获取的内容。
这里的问题是如何让它们分开?
我使用带有反应器模式的 NIO 将服务器连接到客户端。我的代码如下:
服务器端代码,在if(selectionKey.isWritable){} :
public void isWritable(SelectionKey selectionKey) throws Exception {
SocketChannel socketChannel =
(SocketChannel) selectionKey.channel();
Integer myInteger = (Integer) selectionKey.attachment();
if (myInteger == null){
int myJob = jobFacade.isAnyJob(socketChannel, 100 /*deadline*/);
if (myJob > 0){
ByteBuffer inputBuffer = ByteBuffer.wrap("available\n".getBytes("UTF-8"));
socketChannel.write(inputBuffer);
myInteger = myJob;
socketChannel.register(
selector, SelectionKey.OP_WRITE, myInteger);
}else if (myJob == -1){
ByteBuffer inputBuffer = ByteBuffer.wrap("unavailable\n".getBytes("UTF-8"));
socketChannel.write(inputBuffer);
socketChannel.close();
UnsupportedOperationException un = new UnsupportedOperationException();
throw un;
}else if (myJob == -2){
ByteBuffer inputBuffer = ByteBuffer.wrap("pending\n".getBytes("UTF-8"));
inputBuffer.flip();
socketChannel.write(inputBuffer);
myInteger = null;
socketChannel.register(
selector, SelectionKey.OP_WRITE, myInteger);
}
// is there any new job to do?
}else{
int myInt = myInteger.intValue();
if ( myInt > 0 ){
long startRange = jobFacade.findByID(myInt);
sendTextFile(startRange, Integer.parseInt(properties.getProperty("workUnit")),
properties.getProperty("textPath"), socketChannel);
myInteger = -3;
socketChannel.register(
selector, SelectionKey.OP_WRITE, myInteger);
}else if (myInt == -3){
sendAlgorithmFile(socketChannel, properties.getProperty("algorithmPath"));
myInteger = -4;
socketChannel.register(
selector, SelectionKey.OP_WRITE, myInteger);
// send algorithm file
}else if (myInt == -4){
int isOK = jobFacade.isAccepted(socketChannel.socket().getInetAddress().toString(),
Long.parseLong(properties.getProperty("deadline")));
if(isOK == -1){
ByteBuffer inputBuffer = ByteBuffer.wrap("notaccepted\n".getBytes("UTF-8"));
socketChannel.write(inputBuffer);
myInteger = null;
socketChannel.register(
selector, SelectionKey.OP_WRITE, myInteger);
}else {
ByteBuffer inputBuffer = ByteBuffer.wrap("accepted\n".getBytes("UTF-8"));
socketChannel.write(inputBuffer);
myInteger = isOK;
socketChannel.register(
selector, SelectionKey.OP_READ, myInteger);
}
// send "accepted" or "not accepted"
}
}
}
不需要知道我在每个块中的方法是做什么的,只是这些方法首先生成一个按此顺序排列的数字。 1)myInteger=null, 2) myInteger > 0, 3) myInteger = -3, 4) myInteger = -4 按照这个顺序,OP-WRITE 会连续注册四次。这部分非常重要。那么让我们看看我的客户端代码,然后我会告诉你我的问题:
BufferedReader inFromServer = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
sentence = inFromServer.readLine();
System.out.println("Response from Server : " + sentence);
if (sentence.equals("available")){
BufferedReader inFromServer1 = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
while ((sentence = inFromServer1.readLine()) != null) {
myJob = myJob + sentence ;
}
inFromServer = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
String acception = inFromServer.readLine();
if (acception.equals("accepted")){
File file = new File("account.json");
byte[] bytes = new byte[2048];
InputStream inputStream = new FileInputStream(file);
OutputStream outputStream = clientSocket.getOutputStream();
int count;
try {
while ((count = inputStream.read(bytes)) > 0){
outputStream.write(bytes, 0, count);
}
outputStream.close();
inputStream.close();
}catch (IOException io){}
continue;
}else if (acception.equals("notaccepted")){
continue;
}
现在,我的问题是,当我 运行 我的服务器然后我的客户端时,我的服务器将 运行 不等待我的客户端获取输入流。首先,客户端得到 "available" 但是当第二个 getInputStream 将在客户端到达时,服务器会在 OP-WRITE 注册的所有阶段进行调整并等待客户端获取数据流(正如我在代码中定义的那样)。 实际上,我的服务器做得很好。它将按要求的顺序通过所有阶段。但问题是发送和接收数据不同步。 我不知道我的问题是什么。但是我猜当我连续注册 OP-WRITE 时,这意味着我的服务器没有发送所有字节的数据,所以只有第一个 getInputStream 会获取数据。 另一方面,我需要这个命令来 运行 我的程序。那么,有什么想法吗?
先帮你理清问题再思考模式:
您有一个 thread/process 传递一条消息,要求另一个线程/进程根据该消息采取行动。
接收方需要读取消息并可能启动它自己的一些子线程来执行该工作,因为它可以接收其他请求。
最好能告诉发件人已收到请求的确认。
似乎有必要保护消息传递。因为如果在您阅读时收到另一个请求,您最终可能会处理垃圾。
您可以将 nio 配置为有多个读取器和一个写入器,只读取缓冲区的一部分等。查看 how-tos、api 文档。很给力
exactly after sending a message
TCP中没有消息这种东西。它是一个字节流。发送方的两次写入很可能被接收方的一次读取所读取。如果你想要消息,你必须自己实现它们,包括计数词、终止符、STX/ETX、XML 等
我发现了我的问题。我的代码没有问题。 OP_WRITE 可以随时注册任何订单。最重要的是正确写入缓冲区和从套接字读取。 实际上,当我第二次向客户发送内容时,我并没有清除缓冲区。在这种情况下,我找到了它,并进行了更正。 但是当我向我的客户端发送一些字符然后想要发送一个文件时,因为在我的客户端我有一个循环来获取所有字符,如果文件是由同一个循环获取的内容。 这里的问题是如何让它们分开?