Java编程生产者消费者实现的四种方法
目录
- 实现生产者消费者的四种方式
- 一、最基础的
- 二、java.util.concurrent.lock 中的 Lock 框架
- 三、阻塞队列BlockingQueue的实现
- Blockqueue 接口的一些方法
- 四、信号量 Semaphore 的实现
实现生产者消费者的四种方式
一、最基础的
利用 wait() 和 notify() 方法实现,当缓冲区满或为空时都调用 wait() 方法等待,当生产者生产了一个产品或消费者消费了一个产品后会唤醒所有线程;
package com.practice; public class testMain {privatestaticInteger count = 0; privatestaticfinal Integer FULL = 10; privatestaticString LOCK = "lock"; public static void main(String[] args) {testMain testMain = new testMain(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); }class Producer implementsRunnable{@Overridepublic void run(){for (int i = 0; i < 10; i++) {try{Thread.sleep(3000); }catch (Exception e){e.printStackTrace(); }synchronized (LOCK){while(count == FULL){//缓存空间满了try{LOCK.wait(); //线程阻塞}catch (Exception e){e.printStackTrace(); }}count++; //生产者System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有"+count); LOCK.notifyAll(); //唤醒所有线程}}}}class Consumer implements Runnable{@Overridepublic void run(){for (int i = 0; i < 10; i++) {try{Thread.sleep(3000); }catch (InterruptedException e){e.printStackTrace(); }synchronized (LOCK){while(count == 0){try{LOCK.wait(); }catch (Exception e){}}count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有 "+count); LOCK.notifyAll(); //唤醒所有线程}}}}}
二、java.util.concurrent.lock 中的 Lock 框架
通过对 lock 的 lock() 方法和 unlock() 方法实现对锁的显示控制,而
synchronize()
则是对锁的隐形控制,可重入锁也叫做递归锁,指的是同一个线程外层函数获得锁之后,内层递归函数仍然有获取该锁的代码,但不受影响;简单来说,该锁维护这一个与获取锁相关的计数器,如果拥有锁的某个线程再次得到锁,那么获计数器就加1,函数调用结束计数器就减1,然后锁需要释放两次才能获得真正释放,已经获取锁的线程进入其他需要相同锁的同步代码块不会被阻塞
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockTest {privatestaticInteger count = 0; privatestaticInteger FULL = 10; //创建一个锁对象private Lock lock = new ReentrantLock(); //创建两个条件变量,一个为缓冲非满,一个缓冲区非空privatefinalCondition notFull = lock.newCondition(); privatefinalCondition notEmpty = lock.newCondition(); public static void main(String[] args){ReentrantLockTest testMain = new ReentrantLockTest(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); }class Producer implements Runnable{@Overridepublic void run(){for (int i = 0; i <10; i++) {try {Thread.sleep(3000); } catch (Exception e) {e.printStackTrace(); }// 获取锁lock.lock(); try {while (count == FULL) {try{notFull.await(); }catch(InterruptedException e){e.printStackTrace(); }}count++; System.out.println(Thread.currentThread().getName()+ "生产者生产,目前总共有" + count); }finally {lock.unlock(); }}}}class Consumer implements Runnable{@Overridepublic void run(){for (int i = 0; i <10; i++) {try{Thread.sleep(3000); }catch (Exception e){e.printStackTrace(); }lock.lock(); try{while(count==0){try{notEmpty.await(); }catch (InterruptedException e){e.printStackTrace(); }}count--; System.out.println(Thread.currentThread().getName() +"消费者消费,目前总共有 " + count); }finally {lock.unlock(); //解锁}}}}}
三、阻塞队列BlockingQueue的实现
被阻塞的情况主要分为如下两种,BlockingQueue 是线程安全的
1,当队列满了的时候进行入队操作;
2,当队列空的时候进行出队操作
Blockqueue 接口的一些方法
文章图片
四类方法分别对应于:
1,ThrowsException,如果操作不能马上进行,则抛出异常;
2,SpecialValue 如果操作不能马上进行,将会返回一个特殊的值,true或false;
3,Blocks 操作被阻塞;
4,TimeOut 指定时间未执行返回一个特殊值 true 或 false
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * 使用 BlockQueue 实现生产者消费模型 */public class BlockQueueTest {public staticInteger count = 0; //创建一个阻塞队列final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10); public static void main(String[] args) {BlockQueueTest testMain = new BlockQueueTest(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); }class Producer implementsRunnable{@Overridepublicvoid run(){for (int i = 0; i <10; i++) {try{Thread.sleep(3000); }catch (Exception e){e.printStackTrace(); }try{blockingQueue.put(1); count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有 " + count); }catch (InterruptedException e){e.printStackTrace(); }}}}class Consumer implements Runnable{@Overridepublic void run(){for (int i = 0; i <10; i++) {try{Thread.sleep(3000); }catch (InterruptedException e){e.printStackTrace(); }try{blockingQueue.take(); //消费count--; System.out.println(Thread.currentThread().getName() +" 消费者消费,目前总共有 "+ count); }catch (InterruptedException e){e.printStackTrace(); }}}}}
四、信号量 Semaphore 的实现
Semaphore (信号量) 用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。Java中的 Semaphone 维护了一个许可集,一开始设定这个许可集的数量,使用
acquire()
方法获得一个许可,当许可不足时会被阻塞,release()
添加一个许可。下面代码中,还加入了
mutex
信号量,维护消费者和生产者之间的同步关系,保证生产者消费者之间的交替进行import java.util.concurrent.Semaphore; public class SemaphoreTest {privatestaticInteger count = 0; //创建三个信号量final Semaphore notFull = new Semaphore(10); final Semaphore notEmpty = new Semaphore(0); final Semaphore mutex = new Semaphore(1); //互斥锁,控制共享数据的互斥访问public static void main(String[] args) {SemaphoreTest testMain = new SemaphoreTest(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); }class Producer implements Runnable{@Overridepublic void run(){for (int i = 0; i <10; i++) {try{Thread.sleep(3000); }catch (InterruptedException e){e.printStackTrace(); }try{notFull.acquire(); //获取一个信号量mutex.acquire(); count++; System.out.println(Thread.currentThread().getName() +"生产者生产,目前总共有 "+count); } catch (InterruptedException e){e.printStackTrace(); } finally {mutex.release(); //添加notEmpty.release(); }}}}class Consumer implementsRunnable{@Overridepublic void run(){for (int i = 0; i <10; i++) {try{Thread.sleep(3000); }catch(InterruptedException e){e.printStackTrace(); }try{notEmpty.acquire(); mutex.acquire(); count--; System.out.println(Thread.currentThread().getName() +"消费者消费,目前总共有"+count); }catch (InterruptedException e){e.printStackTrace(); }finally {mutex.release(); notFull.release(); }}}}}
Reference
https://juejin.cn/post/6844903486895865864#comment
【Java编程生产者消费者实现的四种方法】以上就是Java编程生产者消费者实现的四种方法的详细内容,更多关于java实现生产消费者的资料请关注脚本之家其它相关文章!
推荐阅读
- JAVA(抽象类与接口的区别&重载与重写&内存泄漏)
- 事件代理
- Java|Java OpenCV图像处理之SIFT角点检测详解
- java中如何实现重建二叉树
- 数组常用方法一
- 【Hadoop踩雷】Mac下安装Hadoop3以及Java版本问题
- python青少年编程比赛_第十一届蓝桥杯大赛青少年创意编程组比赛细则
- Java|Java基础——数组
- RxJava|RxJava 在Android项目中的使用(一)
- java之static、static|java之static、static final、final的区别与应用