Concurrent Queue in Java 只保留每个子线程的最后一项

Concurrent Queue in Java that only retains the last item of each child thread

我有 1 个启动 n 个子线程的主线程。这些子线程中的每一个都不断地产生一个新事件并将其添加到共享队列中。该事件表示子线程上复杂计算的最新状态。

主线程使用共享队列,但它对处理仍在队列中的所有事件不感兴趣:如果单个子线程将 3 个事件发布到队列,主线程只对最后一个感兴趣。可以丢弃该子线程的旧事件(一旦子线程添加了较新的事件)。

例如:

childThread A adds event A1
mainThread removes event A1

childThread B adds event B1
childThread B adds event B2 // => B1 should be discarded
childThread B adds event B3 // => B2 should be discarded
mainThread removes event B3

childThread A adds event A2
childThread A adds event A3 // => A2 should be discarded
childThread B adds event B4
mainThread removes event A3
mainThread removes event B4

childThread B adds event B5
childThread A adds event A4
childThread A adds event A5 // => A4 should be discarded
childThread B adds event B6 // => B5 should be discarded
childThread A adds event A6 // => A5 should be discarded
mainThread removes event B6 // do B6 first because B5 was before A4
mainThread removes event A6

可选要求:主线程确实希望尽可能多地循环处理子线程的事件,但如果 none 个子线程正在生成,则主线程仍会阻塞。

使用常规 BlockingQueue,但队列中包含的节点对象有一个名为 wasConsumed 的同步标志。它以 false 开始,主要消费者在它开始工作时将其设置为 true

每个线程保留它入队的最后一个项目。如果线程有更新并且最后一个项目没有被消耗,它会使用与以前相同的同步更新排队的项目。否则,它会排入一个新作业。

class UpdateableNode<T>
{
  private boolean wasConsumed;
  private T task;

  // main consumer calls this BEFORE processing the task
  synchronized void startConsuming()
  {
    wasConsumed=true;
  }

  // producer tries to update the task if it wasn't consumed
  synchronized boolean tryUpdate(T newTask)
  {
    if (wasConsumed)
      return false;
    task = newTask;
    return true;
  }
}

更新:

理解了前面的概念后,同样的事情可以用AtomicReference来管理,而不是我们粗暴的UpdatableNode

消费者代码:

  1. BlockingQueue
  2. 弹出一个 AtomicReference<Task> ref
  3. 通过调用 Task t = ref.getAndSet(null) 获取任务,这将从 ref 中清除任务并 return 它以原子方式(在大多数平台上是无锁的)。

生产者代码:

  1. 保留最后一个任务Task lastTask
  2. 保留最后一个参考 AtomicReference<Task> lastRef
  3. 当新任务到达时尝试更新 lastRef.compareAndSet(lastTask, newTask) 只有当旧任务仍在 ref 中且未被消耗时它才会工作。
  4. 如果失败,创建一个新的AtomicReference<Task>,将其入队并保存在lastRef中。
  5. 在这两种情况下,将 newTask 保存在 lastTask

我会使用两个 DataStructures:1 个用于 "triggers" 的 BlockingQueue 和 1 个用于引用最新事件的 Map。

EventSource 将:

  1. 在新事件上为地图创建一个条目(当键已经存在时更新旧的)键 = "this" (EventSource),值 = 最新事件
  2. 在新事件中,使用 "this"(EventSource) 向 BlockingQueue 中添加一个条目,但前提是其中还没有条目。

最后一部分是可选的。它只是减少了对地图的不成功查找。您也可以只在每个新事件上添加触发器。每次查找对地图中的键是否定的时,请忽略它。

消费者将:

  1. 等待阻塞队列非空
  2. 删除顶部,将其用作事件映射的键 ...
  3. 未找到密钥:继续下一个队列项目。
  4. 找到键:删除条目和过程值(即事件)
  5. 循环到 1。

您可以使用 CorrurentMap 接受线程作为键,事件作为值。因为对于地图,每个键只能有一个值,这正是您要寻找的。并且由于它由许多线程共享,因此使用并发版本是理想的。

查看有关 CorrurentMap 实现之一的文档,ConcurrentHashMaphttp://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentHashMap.html

编辑:

经过一番思考,我想到了一个可能的解决方案。很简单,但希望对你有所帮助:

public class EventHandler implements Runnable {

    private final BlockingQueue<Thread> blockingQueue;
    private final ConcurrentMap<Thread, Event> eventMap;
    private final AtomicReference<Boolean> machineState;

    public EventHandler() {
        int threadNumber = 3;
        blockingQueue = new ArrayBlockingQueue<Thread>( threadNumber );
        eventMap = new ConcurrentHashMap<Thread, Event>();
        machineState = new AtomicReference( Boolean.TRUE );
    }

    @Override
    public void run() {
        while ( machineState.get() ) {
            Event toConsume = null;

            //We don't want Threads to medle in while we are removing it from the map AND the queue;
            synchronized ( this ) { 
                Thread eventParent = null;
                try {
                    eventParent = blockingQueue.take();
                }
                catch ( InterruptedException ex ) {
                    //Exception Handling;
                }

                toConsume = eventMap.remove( eventParent );
            }

            runEvent( toConsume );
        }
    }

    private void runEvent( Event toConsume ) {
        //Event Handling
    }

    //Notice that this method is syncronized
    public synchronized void addEvent( Event event ) {
        Thread thisThread = Thread.currentThread();
        eventMap.put( thisThread, event );

        //Now checking and removing old unpoped Events, if any

        for ( Iterator<Thread> iterator = blockingQueue.iterator() ; iterator.hasNext() ; ) {
            Thread next = iterator.next();
            if( next == thisThread ) {
                iterator.remove();
                break;
            }
        }
        blockingQueue.offer( thisThread );
    }

    //Other methods...

}

希望对您有所帮助。

祝你有愉快的一天。 :)

您可以使用原子引用队列。引用包含最新的项目。当消费者消费一个项目时,它也会通过 getAndSet 原子地清除引用。因此,保留引用的生产线程可以检查该项目是否已被消费。如果不是,它将通过 compareAndSet 更新引用,否则,它将使引用入队。

喜欢

ExecutorService es=Executors.newCachedThreadPool();
BlockingQueue<AtomicReference<String>> queue=new ArrayBlockingQueue<>(20);
es.execute(() -> { // consumer
    while(!Thread.interrupted()) try {
        String item=queue.take().getAndSet(null);
        System.out.println("consuming "+item);
        Thread.sleep(10);// simulate workload
    } catch(InterruptedException ex) { break; }
});
for(int i=0; i<20; i++) { // schedule multiple producers
    String name="T"+i;
    es.execute(() -> { // producer
        AtomicReference<String> lastItem=new AtomicReference<>();
        for(int item=0; item<100; item++) try {
            String current=name+" - item "+item;
            String last=lastItem.get();
            if(last==null || !lastItem.compareAndSet(last, current)) {
                lastItem.set(current);
                queue.put(lastItem);
            }
            Thread.sleep(5);// simulate workload
        }
        catch(InterruptedException ex) { 
            System.err.println(name+" interrupted");
            break;
        }
    });
}

这具有一定程度的“公平性”,因为在一个原子引用被消耗后,它将被放在队列的末尾。如果您将 fair 参数设置为 true 来构造队列,它确实会表现出类似循环的行为,一旦所有线程都第一次将其引用排入队列。但是,如果一个线程启动得太慢以至于它无法在另一个线程完成它的最后一个项目之前将它的第一个项目排入队列,这将不是完全公平的,但我想,你不想在下面强制执行“公平”反正这样的条件。