并发容器 – 各种队列

要学好Java并发编程,最重要的还是要理解JMM中并发问题的原理、Volatile+CAS的实现、Synchronized对象锁,几乎里面所有的东西都是围绕这几个东西来实现的。

探讨Java并发包中的各种队列。Java并发包提供了丰富的队列类,可以简单分为:

  • 无锁非阻塞并发队列:ConcurrentLinkedQueue和ConcurrentLinkedDeque
  • 普通阻塞队列:基于数组的ArrayBlockingQueue,基于链表的LinkedBlockingQueue和LinkedBlockingDeque
  • 优先级阻塞队列:PriorityBlockingQueue
  • 延时阻塞队列:DelayQueue
  • 其他阻塞队列:SynchronousQueue和LinkedTransferQueue

无锁非阻塞是这些队列不使用锁,所有操作总是可以立即执行,主要通过循环CAS实现并发安全,阻塞队列是指这些队列使用锁和条件,很多操作都需要先获取锁或满足特定条件,获取不到锁或等待条件时,会等待(即阻塞),获取到锁或条件满足再返回。

这些队列迭代都不会抛出ConcurrentModificationException,都是弱一致的,后面就不单独强调了。下面,我们来简要探讨每类队列的用途、用法和基本实现原理。

无锁非阻塞并发队列

有两个无锁非阻塞队列:ConcurrentLinkedQueue和ConcurrentLinkedDeque,它们适用于多个线程并发使用一个队列的场合,都是基于链表实现的,都没有限制大小,是无界的,与ConcurrentSkipListMap类似,它们的size方法不是一个常量运算,不过这个方法在并发应用中用处也不大。

ConcurrentLinkedQueue实现了Queue接口,表示一个先进先出的队列,从尾部入队,从头部出队,内部是一个单向链表。ConcurrentLinkedDeque实现了Deque接口,表示一个双端队列,在两端都可以入队和出队,内部是一个双向链表。它们的用法类似于LinkedList,我们就不赘述了。

这两个类最基础的原理是循环CAS,ConcurrentLinkedQueue的算法基于一篇论文:"Simple,
Fast, and Practical Non-Blocking and Blocking Concurrent Queue
Algorithms"
(

普通阻塞队列

除了刚介绍的两个队列,其他队列都是阻塞队列,都实现了接口BlockingQueue,在入队/出队时可能等待,主要方法有:

图片 1

//入队,如果队列满,等待直到队列有空间void put(E e) throws InterruptedException;//出队,如果队列空,等待直到队列不为空,返回头部元素E take() throws InterruptedException;//入队,如果队列满,最多等待指定的时间,如果超时还是满,返回falseboolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;//出队,如果队列空,最多等待指定的时间,如果超时还是空,返回nullE poll(long timeout, TimeUnit unit) throws InterruptedException;

图片 2

普通阻塞队列是常用的队列,常用于生产者/消费者模式。

ArrayBlockingQueue和LinkedBlockingQueue都是实现了Queue接口,表示先进先出的队列,尾部进,头部出,而LinkedBlockingDeque实现了Deque接口,是一个双端队列。

ArrayBlockingQueue是基于循环数组实现的,有界,创建时需要指定大小,且在运行过程中不会改变,这与我们在容器类中介绍的ArrayDeque是不同的,ArrayDeque也是基于循环数组实现的,但是是无界的,会自动扩展。

LinkedBlockingQueue是基于单向链表实现的,在创建时可以指定最大长度,也可以不指定,默认是无限的,节点都是动态创建的。LinkedBlockingDeque与LinkedBlockingQueue一样,最大长度也是在创建时可选的,默认无限,不过,它是基于双向链表实现的。

内部,它们都是使用显式锁ReentrantLock和显式条件Condition实现的。

ArrayBlockingQueue的实现很直接,有一个数组存储元素,有两个索引表示头和尾,有一个变量表示当前元素个数,有一个锁保护所有访问,有两个条件,"不满"和"不空"用于协作,成员声明如下:

图片 3

final Object[] items;int takeIndex; // 头int putIndex; //尾int count; //元素个数final ReentrantLock lock;private final Condition notEmpty;private final Condition notFull;

图片 4

实现思路与我们在72节实现的类似,就不赘述了。

与ArrayBlockingQueue类似,LinkedBlockingDeque也是使用一个锁和两个条件,使用锁保护所有操作,使用"不满"和"不空"两个条件,LinkedBlockingQueue稍微不同,因为它使用链表,且只从头部出队、从尾部入队,它做了一些优化,使用了两个锁,一个保护头部,一个保护尾部,每个锁关联一个条件。

优先级阻塞队列

普通阻塞队列是先进先出的,而优先级队列是按优先级出队的,优先级高的先出,我们在容器类中介绍过优先级队列PriorityQueue及其背后的数据结构堆。

PriorityBlockingQueue是PriorityQueue的并发版本,与PriorityQueue一样,它没有大小限制,是无界的,内部的数组大小会动态扩展,要求元素要么实现Comparable接口,要么创建PriorityBlockingQueue时提供一个Comparator对象。

与PriorityQueue的区别是,PriorityBlockingQueue实现了BlockingQueue接口,在队列为空时,take方法会阻塞等待。

另外,PriorityBlockingQueue是线程安全的,它的基本实现原理与PriorityQueue是一样的,也是基于堆,但它使用了一个锁ReentrantLock保护所有访问,使用了一个条件协调阻塞等待。

延时阻塞队列

延时阻塞队列DelayQueue是一种特殊的优先级队列,它也是无界的,它要求每个元素都实现Delayed接口,该接口的声明为:

public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit);}

Delayed扩展了Comparable接口,也就是说,DelayQueue的每个元素都是可比较的,它有一个额外方法getDelay返回一个给定时间单位unit的整数,表示再延迟多长时间,如果小于等于0,表示不再延迟。

DelayQueue也是优先级队列,它按元素的延时时间出队,它的特殊之处在于,只有当元素的延时过期之后才能被从队列中拿走,也就是说,take方法总是返回第一个过期的元素,如果没有,则阻塞等待。

DelayQueue可以用于实现定时任务,我们看段简单的示例代码:

图片 5

public class DelayedQueueDemo { private static final AtomicLong taskSequencer = new AtomicLong(0); static class DelayedTask implements Delayed { private long runTime; private long sequence; private Runnable task; public DelayedTask(int delayedSeconds, Runnable task) { this.runTime = System.currentTimeMillis() + delayedSeconds * 1000; this.sequence = taskSequencer.getAndIncrement(); this.task = task; } @Override public int compareTo(Delayed o) { if (o == this) { return 0; } if (o instanceof DelayedTask) { DelayedTask other = (DelayedTask) o; if (runTime < other.runTime) { return -1; } else if (runTime > other.runTime) { return 1; } else if (sequence < other.sequence) { return -1; } else { return 1; } } throw new IllegalArgumentException(); } @Override public long getDelay(TimeUnit unit) { return unit.convert(runTime - System.currentTimeMillis(), TimeUnit.MICROSECONDS); } public Runnable getTask() { return task; } } public static void main(String[] args) throws InterruptedException { DelayQueue<DelayedTask> tasks = new DelayQueue<>(); tasks.put(new DelayedTask(2, new Runnable() { @Override public void run() { System.out.println("execute delayed task"); } })); DelayedTask task = tasks.take(); task.getTask().run(); }}

图片 6

DelayedTask表示延时任务,只有延时过期后任务才会执行,任务按延时时间排序,延时一样的按照入队顺序排序。

内部,DelayQueue是基于PriorityQueue实现的,它使用一个锁ReentrantLock保护所有访问,使用一个条件available表示头部是否有元素,当头部元素的延时未到时,take操作会根据延时计算需睡眠的时间,然后睡眠,如果在此过程中有新的元素入队,且成为头部元素,则阻塞睡眠的线程会被提前唤醒然后重新检查。以上是基本思路,DelayQueue的实现有一些优化,以减少不必要的唤醒,具体我们就不探讨了。

其他阻塞队列

Java并发包中还有两个特殊的阻塞队列,SynchronousQueue和LinkedTransferQueue。

SynchronousQueue

SynchronousQueue与一般的队列不同,它不算一种真正的队列,它没有存储元素的空间,存储一个元素的空间都没有。它的入队操作要等待另一个线程的出队操作,反之亦然。如果没有其他线程在等待从队列中接收元素,put操作就会等待。take操作需要等待其他线程往队列中放元素,如果没有,也会等待。SynchronousQueue适用于两个线程之间直接传递信息、事件或任务。

LinkedTransferQueue

LinkedTransferQueue实现了TransferQueue接口,TransferQueue是BlockingQueue的子接口,但增加了一些额外功能,生产者在往队列中放元素时,可以等待消费者接收后再返回,适用于一些消息传递类型的应用中。TransferQueue的接口定义为:

图片 7

public interface TransferQueue<E> extends BlockingQueue<E> { //如果有消费者在等待(执行take或限时的poll),直接转给消费者, //返回true,否则返回false,不入队 boolean tryTransfer(E e); //如果有消费者在等待,直接转给消费者, //否则入队,阻塞等待直到被消费者接收后再返回 void transfer(E e) throws InterruptedException; //如果有消费者在等待,直接转给消费者,返回true //否则入队,阻塞等待限定的时间,如果最后被消费者接收,返回true boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException; //是否有消费者在等待 boolean hasWaitingConsumer(); //等待的消费者个数 int getWaitingConsumerCount();}

图片 8

LinkedTransferQueue是基于链表实现的、无界的TransferQueue,具体实现比较复杂,我们就不探讨了。

class ConcurrentHashMap:/** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode; int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = & hash)) == null) { //如果当前位置Node为空,通过cas操作设置 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer; else { V oldVal = null; //锁定当前位置Node链表 synchronized { if (tabAt == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin; if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }

Java中的阻塞队列实现原理都是通过上篇文章中提到的ReentrantLock来实现的,所有操作方法都必须先获取内部的ReentrantLock才能继续,否则返回false/阻塞/抛出异常,常用的阻塞队列有以下几个:ArrayBlockingQueue:由数组实现的有界阻塞队列LinkedBlockingQueue:由链表实现的有界阻塞队列PriorityBlockingQueue:由数组实现的支持优先级的无界阻塞队列SynchronousBlockingQueue:不储存元素的阻塞队列,所有的入队列操作都将阻塞,直到被出队列唤醒,反之亦然,newCachedThreadPool中的阻塞队列就是这个DelayBlockingQueue:基于PriorityQueue实现的延时阻塞队列下面就以ArrayBlockingQueue为例摸一遍源码吧,其他也都差不多的套路:

核心方法是enqueue、dequeue,因为是private并不包含任何同步和长度判断,只是简单的在数组中插入和删除元素罢了,真正的同步实在对外暴露的put、take等方法,首先获取Lock,同时会判断长度问题决定是否需要通过Condition等待队列非空/非满。关于BlockingQueue可以思考以下细节问题:队列长度问题:ArrayBlockingQueue通过count来记录长度,为什么不需要加volatile呢?上篇文章有讲到AQS中已经通过volatile来避免state的可见性问题,BlockingQueue中获取count之前已经获取锁,肯定不会有可见性问题的了。LinkedBlockingQueue中直接用了AtomicInteger来记录长度,简单粗暴,在获取size等方法时都不需要加锁。无界队列如PriorityBlockingQueue用数组实现最小堆结构,这个需要注意数据扩容导致的性能问题。Deque:采用双端队列的结构,其实主要原理还是一样,只不过加了尾部进出队列的方法。

 /** Number of elements in the queue */ int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; /** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put throws InterruptedException { checkNotNull; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue; } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private void enqueue { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } } /** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }

相关文章

Comment ()
评论是一种美德,说点什么吧,否则我会恨你的。。。