如何同步并发写入和关闭
How to synchronize concurrent writes and close
我有一个自定义编写器如下:
public class MyWriter{
public void write(StuffToWrite stuff){
/*
do the write here
*/
}
public void close(){
/*
do the close here
*/
}
}
这将被多个并发线程访问,如果写入方法调用concurrentlty 没问题。
如果 middle/or 中有任何线程刚开始写入并等待每个已经开始的写入完成,我想确保关闭方法永远不会执行。如何实现?
P.S。一个建议是使用一个 AtomicInteger 标志,它在写入时 increased/decreased 增加 2,在 close 方法中增加 1,如下所示:
public class MyWriter
{
AtomicInteger counter = new AtomicInterger (0);
public void write (StuffToWrite stuff)
{
counterValue = counter.getAndAdd (2);
if (counterValue % 2 != 0)
{
throw new RuntimeException ("Already closed");
}
else
{
/*
do the write here
*/
counterValue = counter.getAndAdd (-2);
if (counterValue == 1)
{
doClose ();
}
}
}
public void close ()
{
int counterValue = counter.get ();
if (counterValue % 2 != 0)
return; //some thread already closed it
else if (counterValue > 0)
{ //eben and > 0, so in middle of write
counter.getAndAdd (1);
}
else
{ // == 1, so no pending writes and should be closed
counter.getAndAdd (1);
doClose ();
}
}
private void doClose ()
{
/*
do the close here
*/
}
}
PS2:
目前这个解决方案(blocking/non-blocking 算法的组合)对我有用:
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class MyWriter {
Queue<String> globalQueue = new ConcurrentLinkedQueue<String>();
AtomicBoolean isClosed1 = new AtomicBoolean(false);
AtomicBoolean isClosed2 = new AtomicBoolean(false);
AtomicInteger counter = new AtomicInteger(0);
public void write(List<String> toBeWrote) {
for (; ; ) {// a non atomic check-then-modify.
int old_state = counter.get();
if (isOdd(old_state)) {
throw new ClosedGracefullyException("gracefully closed");
}
int new_state = old_state + 2;
if (counter.compareAndSet(old_state, new_state)) break;
}
doTheWrite(toBeWrote);
int counterValue = counter.getAndAdd(-2);
if (counterValue == 1) {
doClose();
}
}
private boolean isOdd(int old_state) {
return old_state % 2 != 0;
}
private boolean isEven(int old_state) {
return old_state % 2 == 0;
}
private void doTheWrite(List<String> toBeWrote) {
for (String s : toBeWrote) {
globalQueue.add(s);
if (isFullyClosed()) {
throw new ClosedUnexpectedlyException("this writer closed!!");
}
}
}
public void synchronized close() {
int old_state = counter.get();
if (isOdd(old_state)) {
return;
} else if (counter.compareAndSet(0, 1)) {
doClose();
} else { //even and > 0, so in middle of write
counter.getAndAdd(1);
}
}
private void doClose() {
isClosed1.getAndSet(true);
isClosed2.getAndSet(true);
}
private boolean isFullyClosed() {
return isClosed1.get() && isClosed2.get();
}
public List<String> getContents() {
return Arrays.asList(globalQueue.toArray(new String[0]));
}
}
我使用这个单元测试来验证解决方案:
@Test
public void testConcurrency() throws Exception {
int numberOfWriters = 10;
int numberOfClosers = 3;
final int maxSleepMillis = 10;
int numberOfRuns = 500;
long sumDuration = 0L;
long start = System.currentTimeMillis();
for (int k = 0; k < numberOfRuns; k++) {
myWriter = new MyWriter();
Thread writers[] = new Thread[numberOfWriters];
Thread closers[] = new Thread[numberOfClosers];
final boolean[] exceptionHappened = {false};
final int[] closedExceptionCount = {0};
for (int i = 0; i < numberOfWriters; i++) {
final int finalI = i;
writers[i] = new Thread(new Runnable() {
public void run() {
sleep(maxSleepMillis);
try {
myWriter.write(Arrays.asList("test"+ finalI));
} catch (ClosedUnexpectedlyException e){
exceptionHappened[0] = true;
} catch (ClosedGracefullyException cwe){
//OK
closedExceptionCount[0]++;
}
}
});
}
for (int i = 0; i < numberOfClosers; i++) {
final int finalI = i;
closers[i] = new Thread(new Runnable() {
public void run() {
sleep(maxSleepMillis);
myWriter.close();
}
});
}
for (Thread writer : writers) {
writer.start();
}
for (Thread closer : closers) {
closer.start();
}
for (Thread writer : writers) {
writer.join();
}
for (Thread closer : closers) {
closer.join();
}
long avgDuration = (System.currentTimeMillis() - start) / (k+1);
System.out.println(String.format("run %d, closed Ex: %d , avgDuration: %d" , k, closedExceptionCount[0], avgDuration));
assertFalse(exceptionHappened[0]);
}
}
private void sleep(int maxSleepMillis) {
try {
Thread.sleep((long) (Math.random() * maxSleepMillis));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public class MyWriter{
private final static Lock LOCK = new ReentrantLock();
public void write(StuffToWrite stuff){
LOCK.lock();
/*
do the write here
*/
}
public void close(){
/*
do the close here
*/
LOCK.unlock();
}
}
最简单的事情就是使所有方法同步。这将防止任何并发关闭与写入。这也将防止并发写入 btw.
如果你真的想要并发写入,那么只要设置一个标志,让写入者在退出方法后检查这个标志。这样最后一个离开大楼的线程就可以关灯了。
总体思路:您可以将此信息编码在 AtomicLong 中。例如。每次有人进入 write 时,计数器加 2,每次线程离开 write 方法时,它都会将计数器减 2。
如果一个线程要进入写入,并且看到计数器是奇数,那么它可以return或者抛出异常。随心所欲
如果一个线程退出write,它可以将计数器减2,如果还剩1,这意味着这个线程是最后一个离开write方法,可以完成关闭。
如果一个线程要关闭,并且计数器是偶数且大于0,则将计数器加1。如果计数器已经是奇数,那么其他人已经调用了close方法,不需要进一步的操作(确保你不再递增 1!)。如果计数器为零,则没有编写器,您可以从调用线程完成关闭(确保将值设置为 1 以防止竞争条件)。
原子地执行这些状态传输以防止任何竞争条件
我有一个自定义编写器如下:
public class MyWriter{
public void write(StuffToWrite stuff){
/*
do the write here
*/
}
public void close(){
/*
do the close here
*/
}
}
这将被多个并发线程访问,如果写入方法调用concurrentlty 没问题。 如果 middle/or 中有任何线程刚开始写入并等待每个已经开始的写入完成,我想确保关闭方法永远不会执行。如何实现?
P.S。一个建议是使用一个 AtomicInteger 标志,它在写入时 increased/decreased 增加 2,在 close 方法中增加 1,如下所示:
public class MyWriter
{
AtomicInteger counter = new AtomicInterger (0);
public void write (StuffToWrite stuff)
{
counterValue = counter.getAndAdd (2);
if (counterValue % 2 != 0)
{
throw new RuntimeException ("Already closed");
}
else
{
/*
do the write here
*/
counterValue = counter.getAndAdd (-2);
if (counterValue == 1)
{
doClose ();
}
}
}
public void close ()
{
int counterValue = counter.get ();
if (counterValue % 2 != 0)
return; //some thread already closed it
else if (counterValue > 0)
{ //eben and > 0, so in middle of write
counter.getAndAdd (1);
}
else
{ // == 1, so no pending writes and should be closed
counter.getAndAdd (1);
doClose ();
}
}
private void doClose ()
{
/*
do the close here
*/
}
}
PS2: 目前这个解决方案(blocking/non-blocking 算法的组合)对我有用:
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class MyWriter {
Queue<String> globalQueue = new ConcurrentLinkedQueue<String>();
AtomicBoolean isClosed1 = new AtomicBoolean(false);
AtomicBoolean isClosed2 = new AtomicBoolean(false);
AtomicInteger counter = new AtomicInteger(0);
public void write(List<String> toBeWrote) {
for (; ; ) {// a non atomic check-then-modify.
int old_state = counter.get();
if (isOdd(old_state)) {
throw new ClosedGracefullyException("gracefully closed");
}
int new_state = old_state + 2;
if (counter.compareAndSet(old_state, new_state)) break;
}
doTheWrite(toBeWrote);
int counterValue = counter.getAndAdd(-2);
if (counterValue == 1) {
doClose();
}
}
private boolean isOdd(int old_state) {
return old_state % 2 != 0;
}
private boolean isEven(int old_state) {
return old_state % 2 == 0;
}
private void doTheWrite(List<String> toBeWrote) {
for (String s : toBeWrote) {
globalQueue.add(s);
if (isFullyClosed()) {
throw new ClosedUnexpectedlyException("this writer closed!!");
}
}
}
public void synchronized close() {
int old_state = counter.get();
if (isOdd(old_state)) {
return;
} else if (counter.compareAndSet(0, 1)) {
doClose();
} else { //even and > 0, so in middle of write
counter.getAndAdd(1);
}
}
private void doClose() {
isClosed1.getAndSet(true);
isClosed2.getAndSet(true);
}
private boolean isFullyClosed() {
return isClosed1.get() && isClosed2.get();
}
public List<String> getContents() {
return Arrays.asList(globalQueue.toArray(new String[0]));
}
}
我使用这个单元测试来验证解决方案:
@Test
public void testConcurrency() throws Exception {
int numberOfWriters = 10;
int numberOfClosers = 3;
final int maxSleepMillis = 10;
int numberOfRuns = 500;
long sumDuration = 0L;
long start = System.currentTimeMillis();
for (int k = 0; k < numberOfRuns; k++) {
myWriter = new MyWriter();
Thread writers[] = new Thread[numberOfWriters];
Thread closers[] = new Thread[numberOfClosers];
final boolean[] exceptionHappened = {false};
final int[] closedExceptionCount = {0};
for (int i = 0; i < numberOfWriters; i++) {
final int finalI = i;
writers[i] = new Thread(new Runnable() {
public void run() {
sleep(maxSleepMillis);
try {
myWriter.write(Arrays.asList("test"+ finalI));
} catch (ClosedUnexpectedlyException e){
exceptionHappened[0] = true;
} catch (ClosedGracefullyException cwe){
//OK
closedExceptionCount[0]++;
}
}
});
}
for (int i = 0; i < numberOfClosers; i++) {
final int finalI = i;
closers[i] = new Thread(new Runnable() {
public void run() {
sleep(maxSleepMillis);
myWriter.close();
}
});
}
for (Thread writer : writers) {
writer.start();
}
for (Thread closer : closers) {
closer.start();
}
for (Thread writer : writers) {
writer.join();
}
for (Thread closer : closers) {
closer.join();
}
long avgDuration = (System.currentTimeMillis() - start) / (k+1);
System.out.println(String.format("run %d, closed Ex: %d , avgDuration: %d" , k, closedExceptionCount[0], avgDuration));
assertFalse(exceptionHappened[0]);
}
}
private void sleep(int maxSleepMillis) {
try {
Thread.sleep((long) (Math.random() * maxSleepMillis));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public class MyWriter{
private final static Lock LOCK = new ReentrantLock();
public void write(StuffToWrite stuff){
LOCK.lock();
/*
do the write here
*/
}
public void close(){
/*
do the close here
*/
LOCK.unlock();
}
}
最简单的事情就是使所有方法同步。这将防止任何并发关闭与写入。这也将防止并发写入 btw.
如果你真的想要并发写入,那么只要设置一个标志,让写入者在退出方法后检查这个标志。这样最后一个离开大楼的线程就可以关灯了。
总体思路:您可以将此信息编码在 AtomicLong 中。例如。每次有人进入 write 时,计数器加 2,每次线程离开 write 方法时,它都会将计数器减 2。
如果一个线程要进入写入,并且看到计数器是奇数,那么它可以return或者抛出异常。随心所欲
如果一个线程退出write,它可以将计数器减2,如果还剩1,这意味着这个线程是最后一个离开write方法,可以完成关闭。
如果一个线程要关闭,并且计数器是偶数且大于0,则将计数器加1。如果计数器已经是奇数,那么其他人已经调用了close方法,不需要进一步的操作(确保你不再递增 1!)。如果计数器为零,则没有编写器,您可以从调用线程完成关闭(确保将值设置为 1 以防止竞争条件)。
原子地执行这些状态传输以防止任何竞争条件