阻塞队列实现原理

大雄只看了ArrayBlockingQueue, 感觉应该是差不多的。如有错误,欢迎指出。

一句话可以说明白,ArrayBlockingQueue是基于ReentrantLock及其Condition实现的。说到这里,你可能已经知道怎么搞了。

看一下这个构造

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition(); // 划重点
    notFull =  lock.newCondition(); // 划重点
}

似曾相识的感觉,我们在第二章第5节实现的BoundList就是这么搞的

看看put

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 这个while是必要的
        while (count == items.length)
            // 满了就放到notFull的等待队列
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    // 这是个循环数组
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 通知等待的消费者,有元素可以消费了
    notEmpty.signal();
}

看看take

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}
// 就是做了一个put相反的操作

DelayQueue

DelayQueue是有他的特殊之处的,所以看一看

看他的入队

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 这个q是他内部维护的一个优先级队列,private final PriorityQueue<E> q = new PriorityQueue<E>();
        // 可见他的排序是借助优先级队列做的
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            // 如果入队的元素排在了第一个,就通知等待着的消费者
            // 这里也是用了Condition
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

再看出队

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                // 没有元素的话,等待
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);

                // <=0证明到时间了,可以消费
                if (delay <= 0)
                    return q.poll();
                // 这个是个优化
                first = null; // don't retain ref while waiting

                // 变量leader是一个等待获取队列头部元素的线程。如果leader不等于空,表示已经有线程在等待获取队列的头元素。所以,使用await()方法让当前线程等待信号
                if (leader != null)
                    available.await();
                else {
                    // 如果leader等于空,则把当前线程设置成leader,并使用awaitNanos()方法让当前线程等待接收信号或等待delay时间。
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

SynchronousQueue

这个初看了下,没太明白,后续再看看吧

TODO: 待补充

results matching ""

    No results matching ""