如何同步并发写入和关闭

How to synchronize concurrent writes and close

我有一个自定义编写器如下:

public class MyWriter{
public void write(StuffToWrite stuff){
/*
do the write here
*/
}

public void close(){
/*
do the close here
*/
}
}

这将被多个并发线程访问,如果写入方法调用concurrentlty 没问题。 如果 middle/or 中有任何线程刚开始写入并等待每个已经开始的写入完成,我想确保关闭方法永远不会执行。如何实现?

P.S。一个建议是使用一个 AtomicInteger 标志,它在写入时 increased/decreased 增加 2,在 close 方法中增加 1,如下所示:

   public class MyWriter
{

  AtomicInteger counter = new AtomicInterger (0);

  public void write (StuffToWrite stuff)
  {
    counterValue = counter.getAndAdd (2);
    if (counterValue % 2 != 0)
      {
    throw new RuntimeException ("Already closed");
      }
    else
      {
    /*
       do the write here
     */
    counterValue = counter.getAndAdd (-2);
    if (counterValue == 1)
      {
        doClose ();
      }
      }
  }

  public void close ()
  {
    int counterValue = counter.get ();
    if (counterValue % 2 != 0)
      return;           //some thread already closed it
    else if (counterValue > 0)
      {             //eben and > 0, so in middle of write
    counter.getAndAdd (1);
      }
    else
      {             // == 1, so no pending writes and should be closed
    counter.getAndAdd (1);
    doClose ();
      }
  }

  private void doClose ()
  {
    /*
       do the close here
     */
  }

}

PS2: 目前这个解决方案(blocking/non-blocking 算法的组合)对我有用:

import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class MyWriter {

    Queue<String> globalQueue = new ConcurrentLinkedQueue<String>();

    AtomicBoolean isClosed1 = new AtomicBoolean(false);
    AtomicBoolean isClosed2 = new AtomicBoolean(false);


    AtomicInteger counter = new AtomicInteger(0);


    public void write(List<String> toBeWrote) {
        for (; ; ) {// a non atomic check-then-modify.
            int old_state = counter.get();
            if (isOdd(old_state)) {
                throw new ClosedGracefullyException("gracefully closed");
            }
            int new_state = old_state + 2;
            if (counter.compareAndSet(old_state, new_state)) break;
        }
        doTheWrite(toBeWrote);
        int counterValue = counter.getAndAdd(-2);
        if (counterValue == 1) {
            doClose();
        }
    }

    private boolean isOdd(int old_state) {
        return old_state % 2 != 0;
    }

    private boolean isEven(int old_state) {
        return old_state % 2 == 0;
    }

    private void doTheWrite(List<String> toBeWrote) {
        for (String s : toBeWrote) {
            globalQueue.add(s);
            if (isFullyClosed()) {
                throw new ClosedUnexpectedlyException("this writer closed!!");
            }
        }
    }

    public void synchronized close() {
        int old_state = counter.get();
        if (isOdd(old_state)) {
            return;
        } else if (counter.compareAndSet(0, 1)) {
            doClose();
        } else { //even and > 0, so in middle of write
            counter.getAndAdd(1);
        }
    }

    private void doClose() {
        isClosed1.getAndSet(true);
        isClosed2.getAndSet(true);
    }

    private boolean isFullyClosed() {
        return isClosed1.get() && isClosed2.get();
    }

    public List<String> getContents() {
        return Arrays.asList(globalQueue.toArray(new String[0]));
    }
}

我使用这个单元测试来验证解决方案:

 @Test
    public void testConcurrency() throws Exception {

        int numberOfWriters = 10;
        int numberOfClosers = 3;
        final int maxSleepMillis = 10;
        int numberOfRuns = 500;

        long sumDuration = 0L;
        long start = System.currentTimeMillis();
        for (int k = 0; k < numberOfRuns; k++) {
            myWriter = new MyWriter();
            Thread writers[] = new Thread[numberOfWriters];
            Thread closers[] = new Thread[numberOfClosers];

            final boolean[] exceptionHappened = {false};
            final int[] closedExceptionCount = {0};

            for (int i = 0; i < numberOfWriters; i++) {
                final int finalI = i;
                writers[i] = new Thread(new Runnable() {
                    public void run() {
                        sleep(maxSleepMillis);
                        try {
                            myWriter.write(Arrays.asList("test"+ finalI));
                        } catch (ClosedUnexpectedlyException e){
                            exceptionHappened[0] = true;
                        } catch (ClosedGracefullyException cwe){
                            //OK
                            closedExceptionCount[0]++;
                        }
                    }
                });
            }

            for (int i = 0; i < numberOfClosers; i++) {
                final int finalI = i;
                closers[i] = new Thread(new Runnable() {
                    public void run() {
                        sleep(maxSleepMillis);
                        myWriter.close();
                    }
                });
            }

            for (Thread writer : writers) {
                writer.start();
            }

            for (Thread closer : closers) {
                closer.start();
            }

            for (Thread writer : writers) {
                writer.join();
            }

            for (Thread closer : closers) {
                closer.join();
            }
            long avgDuration = (System.currentTimeMillis() - start) / (k+1);
            System.out.println(String.format("run %d, closed Ex: %d , avgDuration: %d" , k, closedExceptionCount[0], avgDuration));
            assertFalse(exceptionHappened[0]);
        }


    }

    private void sleep(int maxSleepMillis) {
        try {
            Thread.sleep((long) (Math.random() * maxSleepMillis));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
public class MyWriter{
    private final static Lock LOCK = new ReentrantLock();
    public void write(StuffToWrite stuff){
        LOCK.lock();
        /*
        do the write here
        */
    }

    public void close(){
        /*
        do the close here
        */
        LOCK.unlock();
    }
}

最简单的事情就是使所有方法同步。这将防止任何并发关闭与写入。这也将防止并发写入 btw.

如果你真的想要并发写入,那么只要设置一个标志,让写入者在退出方法后检查这个标志。这样最后一个离开大楼的线程就可以关灯了。

总体思路:您可以将此信息编码在 AtomicLong 中。例如。每次有人进入 write 时,计数器加 2,每次线程离开 write 方法时,它都会将计数器减 2。

如果一个线程要进入写入,并且看到计数器是奇数,那么它可以return或者抛出异常。随心所欲

如果一个线程退出write,它可以将计数器减2,如果还剩1,这意味着这个线程是最后一个离开write方法,可以完成关闭。

如果一个线程要关闭,并且计数器是偶数且大于0,则将计数器加1。如果计数器已经是奇数,那么其他人已经调用了close方法,不需要进一步的操作(确保你不再递增 1!)。如果计数器为零,则没有编写器,您可以从调用线程完成关闭(确保将值设置为 1 以防止竞争条件)。

您可以使用 https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/AtomicLong.html#compareAndSet-long-long-

原子地执行这些状态传输以防止任何竞争条件