阻塞队列实现原理
大雄只看了ArrayBlockingQueue, 感觉应该是差不多的。如有错误,欢迎指出。
一句话可以说明白,ArrayBlockingQueue是基于ReentrantLock及其Condition实现的。说到这里,你可能已经知道怎么搞了。
看一下这个构造
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition(); // 划重点
notFull = lock.newCondition(); // 划重点
}
似曾相识的感觉,我们在第二章第5节实现的BoundList就是这么搞的
看看put
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 这个while是必要的
while (count == items.length)
// 满了就放到notFull的等待队列
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
// 这是个循环数组
if (++putIndex == items.length)
putIndex = 0;
count++;
// 通知等待的消费者,有元素可以消费了
notEmpty.signal();
}
看看take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
// 就是做了一个put相反的操作
DelayQueue
DelayQueue是有他的特殊之处的,所以看一看
看他的入队
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 这个q是他内部维护的一个优先级队列,private final PriorityQueue<E> q = new PriorityQueue<E>();
// 可见他的排序是借助优先级队列做的
q.offer(e);
if (q.peek() == e) {
leader = null;
// 如果入队的元素排在了第一个,就通知等待着的消费者
// 这里也是用了Condition
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
再看出队
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
// 没有元素的话,等待
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
// <=0证明到时间了,可以消费
if (delay <= 0)
return q.poll();
// 这个是个优化
first = null; // don't retain ref while waiting
// 变量leader是一个等待获取队列头部元素的线程。如果leader不等于空,表示已经有线程在等待获取队列的头元素。所以,使用await()方法让当前线程等待信号
if (leader != null)
available.await();
else {
// 如果leader等于空,则把当前线程设置成leader,并使用awaitNanos()方法让当前线程等待接收信号或等待delay时间。
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
SynchronousQueue
这个初看了下,没太明白,后续再看看吧
TODO: 待补充