将缓冲区写入 Java 通道:线程安全与否?
Writing buffers to a Java channel: Thread-safe or not?
考虑以下代码片段,它只是将 someByteBuffer
的内容写入标准输出:
// returns an instance of "java.nio.channels.Channels$WritableByteChannelImpl"
WritableByteChannel w = Channels.newChannel(System.out);
w.write(someByteBuffer);
Java 指定 channels are, in general, intended to be safe for multithreaded access, while buffers are not safe for use by multiple concurrent threads.
所以,我想知道上面的代码片段是否需要同步,因为它在某个缓冲区(不是线程-安全)。
我看了一下implementation of the write
method:
public int write(ByteBuffer src) throws IOException {
int len = src.remaining();
int totalWritten = 0;
synchronized (writeLock) {
while (totalWritten < len) {
int bytesToWrite = Math.min((len - totalWritten),
TRANSFER_SIZE);
if (buf.length < bytesToWrite)
buf = new byte[bytesToWrite];
src.get(buf, 0, bytesToWrite);
try {
begin();
out.write(buf, 0, bytesToWrite);
} finally {
end(bytesToWrite > 0);
}
totalWritten += bytesToWrite;
}
return totalWritten;
}
}
请注意,除方法中的前两行外,所有内容都由 writeLock
同步。现在,由于 ByteBuffer src
不是线程安全的,在没有适当同步的情况下调用 src.remaining()
是有风险的,因为另一个线程可能会更改它。
Should I synchronize the line w.write(someByteBuffer)
in the above snippet, or am I missing something and the Java implementation of the write()
method has already taken care of that?
编辑: 这是一个经常抛出 BufferUnderflowException
的示例代码,因为我在最后注释掉了 synchronized
块。删除这些注释将使代码异常消失。
import java.nio.*;
import java.nio.channels.*;
public class Test {
public static void main(String[] args) throws Exception {
ByteBuffer b = ByteBuffer.allocate(10);
b.put(new byte[]{'A', 'B', 'C', 'D', 'E', 'F', 'G', '\n'});
// returns an instance of "java.nio.channels.Channels$WritableByteChannelImpl"
WritableByteChannel w = Channels.newChannel(System.out);
int c = 10;
Thread[] r = new Thread[c];
for (int i = 0; i < c; i++) {
r[i] = new Thread(new MyRunnable(b, w));
r[i].start();
}
}
}
class MyRunnable implements Runnable {
private final ByteBuffer b;
private final WritableByteChannel w;
MyRunnable(ByteBuffer b, WritableByteChannel w) {
this.b = b;
this.w = w;
}
@Override
public void run() {
try {
// synchronized (b) {
b.flip();
w.write(b);
// }
} catch (Exception e) {
e.printStackTrace();
}
}
}
如果您从多个线程进行多次写入并希望确保这些写入的原子性,您只需要锁定通道,在这种情况下您需要锁定 System.out 对象。
如果您共享了一个跨线程非线程安全的变异数据结构,则需要添加锁定。如果可以的话,我会避免在多线程中使用 ByteBuffer。
重点是:如果您的设置允许多个线程篡改那个缓冲区 对象,那么你就会遇到线程问题。就这么简单!
问题不在于 channel.write() 是否线程安全。知道这一点很好,但不是问题的核心!
真正的问题是:您的 代码使用该缓冲区做什么?
当数据正在运行时,此通道实现确实锁定内部有什么帮助......从外部?!
你知道,进入此方法的 src
对象可能会发生各种事情 - 而 该通道正忙于写入缓冲区!
换句话说:此代码是否 "safe" 的问题完全取决于 您的 代码对那个 src
缓冲区对象的并行处理.
鉴于 OP 的评论:核心 点是:您必须确保使用该字节缓冲区的任何 activity 都是线程安全的。在给出的示例中,我们有两个操作:
b.flip();
w.write(b);
这些是每个线程要做的唯一操作;因此:当确保只有 one 线程可以进行这两个调用时(如图所示;通过查看 shared 缓冲区对象);那你很好。
其实很简单:如果你有"shared data";那么你必须确保 "shared data" 的线程 reading/writing 以某种方式 同步 以避免竞争条件。
考虑以下代码片段,它只是将 someByteBuffer
的内容写入标准输出:
// returns an instance of "java.nio.channels.Channels$WritableByteChannelImpl"
WritableByteChannel w = Channels.newChannel(System.out);
w.write(someByteBuffer);
Java 指定 channels are, in general, intended to be safe for multithreaded access, while buffers are not safe for use by multiple concurrent threads.
所以,我想知道上面的代码片段是否需要同步,因为它在某个缓冲区(不是线程-安全)。
我看了一下implementation of the write
method:
public int write(ByteBuffer src) throws IOException {
int len = src.remaining();
int totalWritten = 0;
synchronized (writeLock) {
while (totalWritten < len) {
int bytesToWrite = Math.min((len - totalWritten),
TRANSFER_SIZE);
if (buf.length < bytesToWrite)
buf = new byte[bytesToWrite];
src.get(buf, 0, bytesToWrite);
try {
begin();
out.write(buf, 0, bytesToWrite);
} finally {
end(bytesToWrite > 0);
}
totalWritten += bytesToWrite;
}
return totalWritten;
}
}
请注意,除方法中的前两行外,所有内容都由 writeLock
同步。现在,由于 ByteBuffer src
不是线程安全的,在没有适当同步的情况下调用 src.remaining()
是有风险的,因为另一个线程可能会更改它。
Should I synchronize the line
w.write(someByteBuffer)
in the above snippet, or am I missing something and the Java implementation of thewrite()
method has already taken care of that?
编辑: 这是一个经常抛出 BufferUnderflowException
的示例代码,因为我在最后注释掉了 synchronized
块。删除这些注释将使代码异常消失。
import java.nio.*;
import java.nio.channels.*;
public class Test {
public static void main(String[] args) throws Exception {
ByteBuffer b = ByteBuffer.allocate(10);
b.put(new byte[]{'A', 'B', 'C', 'D', 'E', 'F', 'G', '\n'});
// returns an instance of "java.nio.channels.Channels$WritableByteChannelImpl"
WritableByteChannel w = Channels.newChannel(System.out);
int c = 10;
Thread[] r = new Thread[c];
for (int i = 0; i < c; i++) {
r[i] = new Thread(new MyRunnable(b, w));
r[i].start();
}
}
}
class MyRunnable implements Runnable {
private final ByteBuffer b;
private final WritableByteChannel w;
MyRunnable(ByteBuffer b, WritableByteChannel w) {
this.b = b;
this.w = w;
}
@Override
public void run() {
try {
// synchronized (b) {
b.flip();
w.write(b);
// }
} catch (Exception e) {
e.printStackTrace();
}
}
}
如果您从多个线程进行多次写入并希望确保这些写入的原子性,您只需要锁定通道,在这种情况下您需要锁定 System.out 对象。
如果您共享了一个跨线程非线程安全的变异数据结构,则需要添加锁定。如果可以的话,我会避免在多线程中使用 ByteBuffer。
重点是:如果您的设置允许多个线程篡改那个缓冲区 对象,那么你就会遇到线程问题。就这么简单!
问题不在于 channel.write() 是否线程安全。知道这一点很好,但不是问题的核心!
真正的问题是:您的 代码使用该缓冲区做什么?
当数据正在运行时,此通道实现确实锁定内部有什么帮助......从外部?!
你知道,进入此方法的 src
对象可能会发生各种事情 - 而 该通道正忙于写入缓冲区!
换句话说:此代码是否 "safe" 的问题完全取决于 您的 代码对那个 src
缓冲区对象的并行处理.
鉴于 OP 的评论:核心 点是:您必须确保使用该字节缓冲区的任何 activity 都是线程安全的。在给出的示例中,我们有两个操作:
b.flip();
w.write(b);
这些是每个线程要做的唯一操作;因此:当确保只有 one 线程可以进行这两个调用时(如图所示;通过查看 shared 缓冲区对象);那你很好。
其实很简单:如果你有"shared data";那么你必须确保 "shared data" 的线程 reading/writing 以某种方式 同步 以避免竞争条件。