面试官:阻塞队列的底层实现有了解过吗
前言
本节以ArrayBlockingQueue为例, 带大家看下阻塞队列是如何实现,一起来看下吧!
ArrayBlockingQueue 源码分析
构造函数
同样的,我们先从它的构造函数看起。
public ArrayBlockingQueue(int capacity) { this(capacity, false); }
- capacity 固定容量大小。
- false,这个字段名称其实是fair默认下它是false,非公平锁。
上节我们使用的就是它的默认用法,公平锁和非公平锁我们之前讲过,可以查看以往文章(ReentrantLock源码分析)。下面我们接着看:
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
从上面的代码来看,可知capacity > 0,第一个构造函数的this()其实就是调的这个构造函数,我们可以通过它来指定容量和访问策略(fair 和 nofair)的ArrayBlockingQueue。
再接着看最后一个构造函数。
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; // 锁用于可见性 lock.lock(); try { int i = 0; try { // 迭代集合 for (E e : c) { checkNotNull(e); items[i++] = e; } // 捕获异常 越界 } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
从代码来看,对比上一个多了个Collection,这是干嘛的呢?它允许我们在创建的时候初始化一个集合进去,按迭代顺序添加到容器,从它的内部我们也可以看出来。
内部变量
// 队列的元素 final Object[] items; // 获取下一个元素时的索引 int takeIndex; // 下一个添加元素时的索引 int putIndex; // 队列的元素数量 int count; // 全局锁 final ReentrantLock lock; // 等待条件 private final Condition notEmpty; private final Condition notFull; // 迭代器的共享状态 transient Itrs itrs = null;
内部方法
add() & offer()
我们看下add方法,这个方法用于向队列中添加元素。
public boolean add(E e) { return super.add(e); }
内部调用了父类的方法,它继承了AbstractQueue。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {....}
接着看AbstractQueue的add()。
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
可以看到内部调用了offer(), 如果添加成功就返回true,失败就抛出异常, 这符合我们上节使用时它的特点。
但是,我们发现在它的内部并没有offer方法,所以实现不在AbstractQueue,实现还是在ArrayBlockingQueue。
来看下ArrayBlockingQueue的offer()方法。
public boolean offer(E e) { // 判断元素 e 是否为空,空抛出 NullPointerException 异常 checkNotNull(e); final ReentrantLock lock = this.lock; // 需要持锁 lock.lock(); try { // 如果元素已满 返回false, 对标 add 就会抛出异常 if (count == items.length) return false; else { // 添加到队列中 enqueue(e); return true; } } finally { lock.unlock(); } }
看下enqueue:
private void enqueue(E x) { final Object[] items = this.items; // 将元素添加到预期的索引位置 items[putIndex] = x; // 如果下个索引值等于容器数量值 将putIndex归0 if (++putIndex == items.length) putIndex = 0; // 容器元素数量+1 count++; // 唤醒等待的线程 notEmpty.signal(); }
remove & poll
先看第一个 remove(), 同样的这个方法存在于 AbstractQueue内部,如果被移除的元素为null则抛出异常。
public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); }
poll()的实现在ArrayBlockingQueue,内部实现方式跟add很像。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
private E dequeue() { final Object[] items = this.items; // 这个注解用于忽略一些警告 这不是重点 @SuppressWarnings("unchecked") // 取出元素 takeIndex 按照 FIFO E x = (E) items[takeIndex]; // 元素取出时 置为空 items[takeIndex] = null; // 判断下一个元素的位置 if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 唤醒等待的线程 notFull.signal(); // 返回元素 return x; }
第二个remove(e), 这个实现在ArrayBlockingQueue的内部,可以移除指定元素。
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { // 遍历移除指定元素 if (o.equals(items[i])) { // 移除指定元素 并更新对应的索引位置 removeAt(i); return true; } // 防止越界 if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } }
take
take会造成线程阻塞下面我看下它的内部实现。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 获取锁,当线程中断会直接返回 lock.lockInterruptibly(); try { // 如果元素内部为空 会进入阻塞,意思是没有元素可拿了,进入等待 while (count == 0) // 使当前线程等待 notEmpty.await(); // 否则出列 return dequeue(); } finally { lock.unlock(); } }
put
该方法实现跟 take类似, 也会阻塞线程。
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 如果元素满了 就阻塞 while (count == items.length) notFull.await(); // 否则入列 enqueue(e); } finally { lock.unlock(); } }