深入剖析java并发之阻塞队列 LinkedBlockingQueue 和实现“生产者/消费者”模式

avatar 2018年03月14日18:22:40 0 243 views
上一篇文章介绍了使用 wait/notify 实现 生产者消费者,先看下面这个例子
  1. public class ProductTest3 {
  2.     public static void main(String args[]) {
  3.         BlockingQueue blockingQueue = new LinkedBlockingQueue(5);//定义一个大小为5的盒子,表示可以阻塞几个线程
  4.         new Thread(new ProducerQueue(blockingQueue)).start();
  5.         new Thread(new ConsumerQueue(blockingQueue)).start();
  6.     }
  7.     static class ProducerQueue implements Runnable {
  8.         private BlockingQueue producerQueue;
  9.         public ProducerQueue(BlockingQueue producerQueue) {
  10.             this.producerQueue = producerQueue;
  11.         }
  12.         @Override
  13.         public void run() {
  14.             for (int i = 0; i < 10; i++) {
  15.                 try {
  16.                     System.out.println("生产者生产了第 " + i + " 个产品");
  17.                     producerQueue.put(i);
  18.                 } catch (InterruptedException e) {
  19.                     e.printStackTrace();
  20.                 }
  21.             }
  22.         }
  23.     }
  24.     static class ConsumerQueue implements Runnable {
  25.         private BlockingQueue consumerQueue;
  26.         public ConsumerQueue(BlockingQueue consumerQueue) {
  27.             this.consumerQueue = consumerQueue;
  28.         }
  29.         @Override
  30.         public void run() {
  31.             for (int i = 0; i < 10; i++) {
  32.                 try {
  33.                     System.out.println("消费者消费了第 " + i + " 个产品");
  34.                     consumerQueue.take();
  35.                 } catch (InterruptedException e) {
  36.                     e.printStackTrace();
  37.                 }
  38.             }
  39.         }
  40.     }
  41. }
   

阻塞队列概要

阻塞队列与我们平常接触的普通队列(LinkedList或ArrayList等)的最大不同点,在于阻塞队列支出阻塞添加和阻塞删除方法。
  • 阻塞添加 所谓的阻塞添加是指当阻塞队列元素已满时,队列会阻塞加入元素的线程,直队列元素不满时才重新唤醒线程执行元素加入操作。
  • 阻塞删除 阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作(一般都会返回被删除的元素)
由于Java中的阻塞队列接口BlockingQueue继承自Queue接口,因此先来看看阻塞队列接口为我们提供的主要方法
  1. public interface BlockingQueue<E> extends Queue<E> {
  2.     //将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量)
  3.     //在成功时返回 true,如果此队列已满,则抛IllegalStateException。 
  4.     boolean add(E e);
  5.     //将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量) 
  6.     // 将指定的元素插入此队列的尾部,如果该队列已满, 
  7.     //则在到达指定的等待时间之前等待可用的空间,该方法可中断 
  8.     boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
  9.     //将指定的元素插入此队列的尾部,如果该队列已满,则一直等到(阻塞)。 
  10.     void put(E e) throws InterruptedException;
  11.     //获取并移除此队列的头部,如果没有元素则等待(阻塞), 
  12.     //直到有元素将唤醒等待线程执行该操作 
  13.     E take() throws InterruptedException;
  14.     //获取并移除此队列的头部,在指定的等待时间前一直等到获取元素, //超过时间方法将结束
  15.     E poll(long timeout, TimeUnit unit) throws InterruptedException;
  16.     //从此队列中移除指定元素的单个实例(如果存在)。 
  17.     boolean remove(Object o);
  18. }
  19.     //除了上述方法还有继承自Queue接口的方法 
  20.     //获取但不移除此队列的头元素,没有则跑异常NoSuchElementException 
  21.     E element();
  22.     //获取但不移除此队列的头;如果此队列为空,则返回 null。 
  23.     E peek();
  24.     //获取并移除此队列的头,如果此队列为空,则返回 null。 
  25.     E poll();
这里我们把上述操作进行分类
  • 插入方法
    • add(E e) : 添加成功返回true,失败抛IllegalStateException异常
    • offer(E e) : 成功返回 true,如果此队列已满,则返回 false。
    • put(E e) :将元素插入此队列的尾部,如果该队列已满,则一直阻塞
  • 删除方法:
    • remove(Object o) :移除指定元素,成功返回true,失败返回false
    • poll() : 获取并移除此队列的头元素,若队列为空,则返回 null
    • take():获取并移除此队列头元素,若没有元素则一直阻塞。
  • 检查方法
    • element() :获取但不移除此队列的头元素,没有元素则抛异常
    • peek() :获取但不移除此队列的头;若队列为空,则返回 null。
阻塞队列的对元素的增删查操作主要就是上述的三类方法,通常情况下我们都是通过这3类方法操作阻塞队列,了解完阻塞队列的基本方法后,下面我们将分析阻塞队列中的两个实现类ArrayBlockingQueue和LinkedBlockingQueue的简单使用和实现原理,其中实现原理是这篇文章重点分析的内容。  

LinkedBlockingQueue的基本概要

LinkedBlockingQueue是一个由链表实现的有界队列阻塞队列,但大小默认值为Integer.MAX_VALUE,所以我们在使用LinkedBlockingQueue时建议手动传值,为其提供我们所需的大小,避免队列过大造成机器负载或者内存爆满等情况。其构造函数如下
  1. //默认大小为Integer.MAX_VALUE
  2. public LinkedBlockingQueue() {
  3.        this(Integer.MAX_VALUE);
  4. }
  5. //创建指定大小为capacity的阻塞队列
  6. public LinkedBlockingQueue(int capacity) {
  7.      if (capacity <= 0throw new IllegalArgumentException();
  8.      this.capacity = capacity;
  9.      last = head = new Node<E>(null);
  10.  }
  11. //创建大小默认值为Integer.MAX_VALUE的阻塞队列并添加c中的元素到阻塞队列
  12. public LinkedBlockingQueue(Collection<? extends E> c) {
  13.      this(Integer.MAX_VALUE);
  14.      final ReentrantLock putLock = this.putLock;
  15.      putLock.lock(); // Never contended, but necessary for visibility
  16.      try {
  17.          int n = 0;
  18.          for (E e : c) {
  19.              if (e == null)
  20.                  throw new NullPointerException();
  21.              if (n == capacity)
  22.                  throw new IllegalStateException("Queue full");
  23.              enqueue(new Node<E>(e));
  24.              ++n;
  25.          }
  26.          count.set(n);
  27.      } finally {
  28.          putLock.unlock();
  29.      }
  30.  }
 

LinkedBlockingQueue的实现原理剖析

原理概论

LinkedBlockingQueue是一个基于链表的阻塞队列,其内部维持一个基于链表的数据队列,实际上我们对LinkedBlockingQueue的API操作都是间接操作该数据队列,这里我们先看看LinkedBlockingQueue的内部成员变量
  1. public class LinkedBlockingQueue<E> extends AbstractQueue<E>
  2.         implements BlockingQueue<E>, java.io.Serializable {
  3.     /**
  4.      * 节点类,用于存储数据
  5.      */
  6.     static class Node<E> {
  7.         E item;
  8.         /**
  9.          * One of:
  10.          * - the real successor Node
  11.          * - this Node, meaning the successor is head.next
  12.          * - null, meaning there is no successor (this is the last node)
  13.          */
  14.         Node<E> next;
  15.         Node(E x) { item = x; }
  16.     }
  17.     /** 阻塞队列的大小,默认为Integer.MAX_VALUE */
  18.     private final int capacity;
  19.     /** 当前阻塞队列中的元素个数 */
  20.     private final AtomicInteger count = new AtomicInteger();
  21.     /**
  22.      * 阻塞队列的头结点
  23.      */
  24.     transient Node<E> head;
  25.     /**
  26.      * 阻塞队列的尾节点
  27.      */
  28.     private transient Node<E> last;
  29.     /** 获取并移除元素时使用的锁,如take, poll, etc */
  30.     private final ReentrantLock takeLock = new ReentrantLock();
  31.     /** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 */
  32.     private final Condition notEmpty = takeLock.newCondition();
  33.     /** 添加元素时使用的锁如 put, offer, etc */
  34.     private final ReentrantLock putLock = new ReentrantLock();
  35.     /** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */
  36.     private final Condition notFull = putLock.newCondition();
  37. }
  从上述可看成,每个添加到LinkedBlockingQueue队列中的数据都将被封装成Node节点,添加的链表队列中,其中head和last分别指向队列的头结点和尾结点。与ArrayBlockingQueue不同的是,LinkedBlockingQueue内部分别使用了takeLock 和 putLock 对并发进行控制,也就是说,添加和删除操作并不是互斥操作,可以同时进行,这样也就可以大大提高吞吐量。这里再次强调如果没有给LinkedBlockingQueue指定容量大小,其默认值将是Integer.MAX_VALUE,如果存在添加速度大于删除速度时候,有可能会内存溢出,这点在使用前希望慎重考虑。至于LinkedBlockingQueue的实现原理图与ArrayBlockingQueue是类似的,除了对添加和移除方法使用单独的锁控制外,两者都使用了不同的Condition条件对象作为等待队列,用于挂起take线程和put线程。 更多内容:点此     原文地址:http://blog.csdn.net/javazejian/article/details/77410889
  • 微信
  • 交流学习,有偿服务
  • weinxin
  • 博客/Java交流群
  • 资源分享,问题解决,技术交流。群号:590480292
  • weinxin
avatar
已通过评论:0   待审核评论数:0