java|Java并发编程—生产者消费者Java实现


目录

  • 1 什么是生产者消费者模型
  • 2 通过BlockingQueue实现生产者消费者模型
  • 3 通过Condition实现
  • 4 通过监视器锁对象的方法实现

1 什么是生产者消费者模型 简单来说,生产者向一块固定的空间生产资源,消费者消费这个空间的资源,具体过程如下:
  • 生产者向一定大小的区域S生产资源,当S满时生产者阻塞
  • 消费者从S中获取资源,当S为空时,消费者被阻塞
  • 消费者在消费资源后会唤醒因S满而被阻塞的生产者
  • 生产者在生产资源后会唤醒因S空而被阻塞的消费者
2 通过BlockingQueue实现生产者消费者模型 这是实现生产者消费者模型最简单的方法,代码如下
public static void main(String[] args) { BlockingQueue queue = new ArrayBlockingQueue<>(10); //生产任务 Runnable pro = () -> { for (int i = 0; i < 100; i++) { try { queue.put(i); System.out.println("生产者 " + Thread.currentThread().getName() + " 生产了" + i); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("生产者线程 " + Thread.currentThread().getName() + " 结束"); }; //消费任务 Runnable con = () -> { try { for (int i = 0; i < 200; i++) { Thread.sleep(1000); System.out.println("消费者 " + Thread.currentThread().getName() + " 消费了" + queue.take()); } System.out.println("消费者线程结束"); } catch (InterruptedException e) { e.printStackTrace(); } }; //开启两个生产者线程 new Thread(pro).start(); new Thread(pro).start(); //开启一个消费者线程 new Thread(con).start(); }

3 通过Condition实现 这种实现方法其实就是自己实现一个BlockingQueue,然后用自己实现的BlockingQueue代替第一种实现方法中的BlockingQueue即可,BlockingQueue的实现方法如下:
/** * 基于Condition实现的BlockingQueue */ public class MyBlockingQueue {private Queue queue = new LinkedList<>(); private int size; private ReentrantLock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); public MyBlockingQueue(int size) { this.size = size; }public void put(T t) { try { //加锁,实现queue操作的线程安全性 lock.lock(); //判断当前queue是否已经满了,满了则将生产者挂到notFull的条件队列上 while (queue.size() == size) { notFull.await(); } //生产元素 queue.offer(t); //唤醒因queue空而被挂起的消费者线程 notEmpty.signal(); } catch (Exception e) { e.printStackTrace(); } finally { //释放锁 lock.unlock(); } }public T take() { T t = null; try { lock.lock(); //判断当前queue是否为空,为空则将当前线程挂到notEmpty的条件队列上 while (queue.size() == 0) { notEmpty.await(); } //消费者消费元素 t = queue.poll(); //消费元素后唤醒因queue满而被挂起的生产者线程 notFull.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } return t; } }

对queue大小的判断必选要放在while循环中,不能用if,我们考虑以下情况:
在queue为空的时候:
  • 消费者A进入判断语句,发现队列为空,被挂起并释放锁
  • 消费者B拿到锁进入判断语句,发现队列卫康,被挂起并释放锁
生产者生产了一个元素:
  • 消费者A抢到锁,消费一个元素释放锁
  • 消费者B抢到A释放的锁,此时队列为空,但是由于是if判断,消费者B会直接往下执行,当执行poll()时,返回的是null空值,这显然不符合消费者消费元素的特点,若这里面有其他业务逻辑,拿到null值后进行其他操作将会报空指针异常
4 通过监视器锁对象的方法实现 【java|Java并发编程—生产者消费者Java实现】这种实现和第二种差不多,就是利用监视器锁来实现BlockingQueue,然后再直接在第一种方法的代码中调用,用监视器锁对象实现BlockingQueue的代码如下
public class MyBlockQueue {private Queue queue = new LinkedList<>(); private int size; public MyBlockQueue (int size) { this.size = size; }public synchronized void put(E e) { while (queue.size() == size) { try { wait(); } catch (InterruptedException ex) { ex.printStackTrace(); } } queue.offer(e); notifyAll(); }public synchronized E take() { while (queue.size() == 0) { try { wait(); } catch (InterruptedException ex) { ex.printStackTrace(); } } E e = queue.poll(); notifyAll(); return e; }}

    推荐阅读