为什么这个非阻塞 IO 调用会失败?
Why does this non-blocking IO call fail?
背景
- 我想使用 Java 的非阻塞 SocketChannel 发送大量数据(30MB,但将来可能会更大)
- 为什么非阻塞?这样下一个要发送的字节的计算就不会因为等待网络而被阻塞
- 当我在阻塞模式下使用 SocketChannel 时,传输完成没有问题
- 当我将 SocketChannel 设置为非阻塞时,它 显着 更快地完成,但服务器没有收到所有数据
- 服务器确实收到了 一些 的数据,但是
问题
- 为什么在使用非阻塞 Java NIO SocketChannel 时我的大型 (30MB) 文件传输失败,我该如何解决?
文件
我删掉了程序并编写了示例,这样它就可以通过 javac *.java && java Main
一次性 运行
- 它为服务器创建一个 Future,为客户端创建一个 Future,让客户端向服务器发送 30MB 的随机字节,然后在主线程上阻塞,直到两个 Future 都完成(尽管服务器从未完成)
注意:这主要是关于TheClient.java
- 如果评论
<CommentOutToMakeWork>
和 </CommentOutToMakeWork>
之间的行被注释掉,SocketChannel 将被阻塞,传输将完成
Main.java
:
import java.io.IOException;
import java.lang.InterruptedException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main {
public static void main(String[] args) throws ExecutionException, IOException, InterruptedException {
final SocketAddress address = new InetSocketAddress("127.0.0.1", 12345);
final int size = 30 * 1000 * 1000;
ExecutorService executor = Executors.newFixedThreadPool(2);
TheServer theServer = new TheServer(address, size);
TheClient theClient = new TheClient(address, size);
Future<String> serverFuture = executor.submit(theServer);
Thread.sleep(2000);
Future<String> clientFuture = executor.submit(theClient);
System.out.println("MAIN: Received from client: " + clientFuture.get());
System.out.println("MAIN: Received from server: " + serverFuture.get());
executor.shutdown();
}
}
TheClient.java
:
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Random;
import java.util.concurrent.Callable;
class TheClient implements Callable<String> {
private TheClient() {}
public TheClient(SocketAddress address, int size) {
this.size = size;
this.from = new byte[size];
this.serverAddress = address;
new Random().nextBytes(from);
}
private int size;
private byte[] from;
private SocketAddress serverAddress;
public String call() throws IOException {
SocketChannel socketChannel = SocketChannel.open();
System.out.println("CLIENT: Attempting to connect to server...");
socketChannel.connect(serverAddress);
// <CommentOutToMakeWork>
socketChannel.configureBlocking(false);
// </CommentOutToMakeWork>
System.out.println("CLIENT: Connection established. Sending " + size + " bytes.");
// For this example, this is one large write, but even my actual
// program, which uses a loop and puts smaller chunks onto the channel,
// is too fast for the SocketChannel.
socketChannel.write(ByteBuffer.wrap(from));
System.out.println("CLIENT: Write completed.");
return "CLIENT: Success!";
}
}
TheServer.java
:
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.Random;
import java.util.concurrent.Callable;
class TheServer implements Callable<String> {
private TheServer() {}
public TheServer(SocketAddress address, int size) {
this.size = size;
this.to = new byte[size];
this.serverAddress = address;
}
private int size;
private byte[] to;
private SocketAddress serverAddress;
public String call() throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open().bind(serverAddress);
System.out.println("SERVER: Awaiting connection...");
InputStream clientSocketInputStream = serverChannel.accept().socket().getInputStream();
System.out.println("SERVER: Connection established. Attempting to read " + size + " bytes.");
for (int i = 0; i < size; ++i) {
to[i] = (byte) clientSocketInputStream.read();
}
System.out.println("SERVER: Read completed.");
return "SERVER: Success!";
}
}
我相信答案就在 WritableByteChannel.write
文档中:
Unless otherwise specified, a write operation will return only after writing all of the r requested bytes. Some types of channels, depending upon their state, may write only some of the bytes or possibly none at all. A socket channel in non-blocking mode, for example, cannot write any more bytes than are free in the socket's output buffer.
看来需要用write
的return值来判断写了多少,没写完的处理。描述中不清楚的是如何你处理这种情况 - 例如,你可能会发现当套接字输出缓冲区耗尽时你需要做一些安排才能继续写入。
背景
- 我想使用 Java 的非阻塞 SocketChannel 发送大量数据(30MB,但将来可能会更大)
- 为什么非阻塞?这样下一个要发送的字节的计算就不会因为等待网络而被阻塞
- 当我在阻塞模式下使用 SocketChannel 时,传输完成没有问题
- 当我将 SocketChannel 设置为非阻塞时,它 显着 更快地完成,但服务器没有收到所有数据
- 服务器确实收到了 一些 的数据,但是
问题
- 为什么在使用非阻塞 Java NIO SocketChannel 时我的大型 (30MB) 文件传输失败,我该如何解决?
文件
我删掉了程序并编写了示例,这样它就可以通过
一次性 运行javac *.java && java Main
- 它为服务器创建一个 Future,为客户端创建一个 Future,让客户端向服务器发送 30MB 的随机字节,然后在主线程上阻塞,直到两个 Future 都完成(尽管服务器从未完成)
注意:这主要是关于
TheClient.java
- 如果评论
<CommentOutToMakeWork>
和</CommentOutToMakeWork>
之间的行被注释掉,SocketChannel 将被阻塞,传输将完成
- 如果评论
Main.java
:
import java.io.IOException;
import java.lang.InterruptedException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main {
public static void main(String[] args) throws ExecutionException, IOException, InterruptedException {
final SocketAddress address = new InetSocketAddress("127.0.0.1", 12345);
final int size = 30 * 1000 * 1000;
ExecutorService executor = Executors.newFixedThreadPool(2);
TheServer theServer = new TheServer(address, size);
TheClient theClient = new TheClient(address, size);
Future<String> serverFuture = executor.submit(theServer);
Thread.sleep(2000);
Future<String> clientFuture = executor.submit(theClient);
System.out.println("MAIN: Received from client: " + clientFuture.get());
System.out.println("MAIN: Received from server: " + serverFuture.get());
executor.shutdown();
}
}
TheClient.java
:
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Random;
import java.util.concurrent.Callable;
class TheClient implements Callable<String> {
private TheClient() {}
public TheClient(SocketAddress address, int size) {
this.size = size;
this.from = new byte[size];
this.serverAddress = address;
new Random().nextBytes(from);
}
private int size;
private byte[] from;
private SocketAddress serverAddress;
public String call() throws IOException {
SocketChannel socketChannel = SocketChannel.open();
System.out.println("CLIENT: Attempting to connect to server...");
socketChannel.connect(serverAddress);
// <CommentOutToMakeWork>
socketChannel.configureBlocking(false);
// </CommentOutToMakeWork>
System.out.println("CLIENT: Connection established. Sending " + size + " bytes.");
// For this example, this is one large write, but even my actual
// program, which uses a loop and puts smaller chunks onto the channel,
// is too fast for the SocketChannel.
socketChannel.write(ByteBuffer.wrap(from));
System.out.println("CLIENT: Write completed.");
return "CLIENT: Success!";
}
}
TheServer.java
:
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.Random;
import java.util.concurrent.Callable;
class TheServer implements Callable<String> {
private TheServer() {}
public TheServer(SocketAddress address, int size) {
this.size = size;
this.to = new byte[size];
this.serverAddress = address;
}
private int size;
private byte[] to;
private SocketAddress serverAddress;
public String call() throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open().bind(serverAddress);
System.out.println("SERVER: Awaiting connection...");
InputStream clientSocketInputStream = serverChannel.accept().socket().getInputStream();
System.out.println("SERVER: Connection established. Attempting to read " + size + " bytes.");
for (int i = 0; i < size; ++i) {
to[i] = (byte) clientSocketInputStream.read();
}
System.out.println("SERVER: Read completed.");
return "SERVER: Success!";
}
}
我相信答案就在 WritableByteChannel.write
文档中:
Unless otherwise specified, a write operation will return only after writing all of the r requested bytes. Some types of channels, depending upon their state, may write only some of the bytes or possibly none at all. A socket channel in non-blocking mode, for example, cannot write any more bytes than are free in the socket's output buffer.
看来需要用write
的return值来判断写了多少,没写完的处理。描述中不清楚的是如何你处理这种情况 - 例如,你可能会发现当套接字输出缓冲区耗尽时你需要做一些安排才能继续写入。