前言
PriorityBlockingQueue是BlockingQueue接口的实现类,它是一种优先级阻塞队列,每次出队都返回优先级最高或最低的元素,其外部是用均衡二叉树堆实现的。这里的优先级指的是元素类必须实现Comparable接口,而后用compareTo()办法比拟元素的优先级大小,当然也可指定自定义的比拟器comparator。
实现原理
先来看看它的重要属性:
// 队列默认容量为11 private static final int DEFAULT_INITIAL_CAPACITY = 11; // 队列最大容量 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; // 寄存元素的数组 private transient Object[] queue; // 队列长度 private transient int size; // 自定义比拟器 private transient Comparator<? super E> comparator; // 操作元素数组的互斥锁 private final ReentrantLock lock; // 数组非空条件 private final Condition notEmpty; // 数组扩容操作的自璇锁,1示意正在扩容,0示意没有在扩容 private transient volatile int allocationSpinLock; // 优先级队列 private PriorityQueue<E> q;
再来看它的几个构造函数:
public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); }
public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); }
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { <i style="color:transparent">来源gaodai$ma#com搞$$代**码网</i> if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
再来看重要办法:
put():
public void put(E e) { offer(e); // never need to block }
offer():
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; // 队列以后长度>=队列容量时,进行扩容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; // 未指定比拟器时,则应用默认的compareTo()来计算插入元素的地位 if (cmp == null) siftUpComparable(n, e, array); else // 指定了时,则应用指定的比拟器计算地位 siftUpUsingComparator(n, e, array, cmp); size = n + 1; // 唤醒某个期待在notEmpty条件的线程 notEmpty.signal(); } finally { lock.unlock(); } return true; }
其中tryGrow()办法如下:
private void tryGrow(Object[] array, int oldCap) { // 先开释操作数组的互斥锁,去尝试获取扩容的自璇锁 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; // 尝试获取扩容的锁 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { // 计算扩容后的新容量 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); // 新容量超出最大容量时,则取最大容量 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow // 旧容量加1依然溢出时,抛内存溢出异样 int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } // 其余线程抢到了扩容锁并正在扩容时,以后线程则让出CPU调度权 if (newArray == null) // back off if another thread is allocating Thread.yield(); // 获取操作数组的互斥锁 lock.lock(); // 扩容操作胜利时,将旧数组元素拷贝到扩容后的新数组 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
siftUpComparable()办法如下:
private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; }
siftUpUsingComparator()办法如下:
private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; }
take():
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { // 队列元素为空时,阻塞期待 while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; }
poll():
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { // 非阻塞,队列为空时返回null return dequeue(); } finally { lock.unlock(); } }
其中dequeue()办法如下:
private E dequeue() { int n = size - 1; if (n < 0) return null; else { Object[] array = queue; E result = (E) array[0]; E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
其中siftDownComparable()办法如下:
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; // loop while a non-leaf while (k < half) { int child = (k << 1) + 1; // assume left child is least Object c = array[child]; int right = child + 1; if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } }
siftDownUsingComparator()办法如下:
private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) { if (n > 0) { int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < n && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } }
先睡了,今天再剖析优先级的具体代码。晚安全世界!