自定义 LinkedBlockingQueue 死锁

Custom LinkedBlockingQueue deadlock

我一直在 ThreadExecutorPool 中使用自定义阻塞队列,但有时任务工作人员不接受任务并且调度程序线程不会将新任务放入队列中。

我想知道以下自定义阻塞队列实现会导致死锁。这段代码有什么问题吗? 对于 add()take() 方法,synchronized 块更好。

import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.log4j.Logger;

import com.ttech.utils.alarm.Alarm;
import com.ttech.utils.alarm.AlarmInterface;
import com.ttech.utils.counter.Counter;
import com.ttech.utils.counter.SNMPAgent;

public class WorkerQueue<E> extends LinkedBlockingQueue<E> {

    private static final long serialVersionUID = 1L;

    public Integer lowThreshold;

    public Integer highThreshold;

    public Integer capacity;

    public String name;

    public String type;

    public Counter counter = null;

    public boolean writeAlarmLog;

    public static final Logger logger = Logger.getLogger(WorkerQueue.class);

    public static Alarm HighThresholdAlarm = null;
    public static Alarm CapacityAlarm = null;

    // Check the size here and clear capacity and high threshold alarms in case
    public E take() throws InterruptedException {
        E data = super.take();
        counter.setNewValue(super.size());
        if (super.size() == lowThreshold) {            
            if(!this.writeAlarmLog) {
                HighThresholdAlarm.clear(name);
                CapacityAlarm.clear(name);
            } else {
                HighThresholdAlarm.clearLog(name, "Queue High Threshold");
                CapacityAlarm.clearLog(name, "Queue Capacity Overload");
            }
        }
        return data;
    }

    public E poll() {
        E data = super.poll();
        counter.setNewValue(super.size());
        if (super.size() == lowThreshold) {
            if(!this.writeAlarmLog) {
                HighThresholdAlarm.clear(name);
                CapacityAlarm.clear(name);
            } else {
                HighThresholdAlarm.clearLog(name, "Queue High Threshold");
                CapacityAlarm.clearLog(name, "Queue Capacity Overload");
            }
        }
        return data;
    }


    public int drainTo(Collection<? super E> c, int maxElements){
       int size = super.drainTo(c,maxElements);       
       counter.setNewValue(super.size());       
       return size;
    }

    // During adding the data to queue check capacity and high threshold raise alarm in case
    public boolean add(E data) {
        Boolean rc = true;

        if (capacity > 0) {
            if (this.size() >= capacity) {
                logger.error("Queue " + name + " is over capacity");
                if(!this.writeAlarmLog)
                    CapacityAlarm.raise(name);
                else
                    CapacityAlarm.raiseLog(AlarmInterface.AS_CRITICAL, name, "Queue Capacity Overload");
                return false;
            }
        }

        if (!super.add(data)) {
            logger.error("Cannot add data to queue:" + name);
            rc = false;
        } else {
            counter.setNewValue(super.size());
        }

        if (highThreshold == super.size()) {


            if(!this.writeAlarmLog)
                HighThresholdAlarm.raise(name);
            else
                HighThresholdAlarm.raiseLog(AlarmInterface.AS_CRITICAL, name, "Queue High Threshold");
        }

        return rc;
    }
}

ThreadPoolExecutor 没有 add 任务到它的工作队列。它 offers them and if not accepted passes them to the configured RejectedExecutionHandler. By default this is the abort policy handler,导致抛出 RejectedExecutionException

永远不会调用自定义队列中的 add 方法。

如果您想跟踪您拥有的正在进行的任务数量的变化,我建议重写执行程序本身的 beforeExecuteafterExecute 方法。活动任务数可以从getActiveCount.

获取