Java并发编程之线程间的通信
目录
- 一、概念简介
- 1、线程通信
- 2、等待通知机制
- 3、基础方法
- 二、等待通知原理
- 1、基本原理
- 2、实现案例
- 三、管道流通信
- 1、管道流简介
- 2、使用案例
- 四、生产消费模式
- 1、业务场景
- 2、代码实现
- 五、源代码地址
一、概念简介
1、线程通信
在操作系统中,线程是个独立的个体,但是在线程执行过程中,如果处理同一个业务逻辑,可能会产生资源争抢,导致并发问题,通常使用互斥锁来控制该逻辑。但是在还有这样一类场景,任务执行是有顺序控制的,例如常见的报表数据生成:
文章图片
- 启动数据分析任务,生成报表数据;
- 报表数据存入指定位置数据容器;
- 通知数据搬运任务,把数据写入报表库;
2、等待通知机制
如上的业务场景,如果线程A生成数据过程中,线程B一直在访问数据容器,判断该过程的数据是否已经生成,则会造成资源浪费。正常的流程应该如图,线程A和线程B同时启动,线程A开始处理数据生成任务,线程B尝试获取容器数据,数据还没过来,线程B则进入等待状态,当线程A的任务处理完成,则通知线程B去容器中获取数据,这样基于线程等待和通知的机制来协作完成任务。
3、基础方法
等待/通知机制的相关方法是Java中Object层级的基础方法,任何对象都有该方法:
- notify:随机通知一个在该对象上等待的线程,使其结束wait状态返回;
- notifyAll:唤醒在该对象上所有等待的线程,进入对象锁争抢队列中;
- wait:线程进入waiting等待状态,不会争抢锁对象,也可以设置等待时间;
二、等待通知原理
1、基本原理
等待/通知机制,该模式下指线程A在不满足任务执行的情况下调用对象wait()方法进入等待状态,线程B修改了线程A的执行条件,并调用对象notify()或者notifyAll()方法,线程A收到通知后从wait状态返回,进而执行后续操作。两个线程通过基于对象提供的wait()/notify()/notifyAll()等方法完成等待和通知间交互,提高程序的可伸缩性。
2、实现案例
通过线程通信解决上述数据生成和存储任务的解耦流程。
public class NotifyThread01 {static Object lock = new Object() ; static volatile List dataList = new ArrayList<>(); public static void main(String[] args) throws Exception {Thread saveThread = new Thread(new SaveData(),"SaveData"); saveThread.start(); TimeUnit.SECONDS.sleep(3); Thread dataThread = new Thread(new AnalyData(),"AnalyData"); dataThread.start(); }// 等待数据生成,保存static class SaveData implements Runnable {@Overridepublic void run() {synchronized (lock){while (dataList.size()==0){try {System.out.println(Thread.currentThread().getName()+"等待..."); lock.wait(); } catch (InterruptedException e) {e.printStackTrace(); }}System.out.println("SaveData .."+ dataList.get(0)+dataList.get(1)); }}}// 生成数据,通知保存static class AnalyData implements Runnable {@Overridepublic void run() {synchronized (lock){dataList.add("hello,"); dataList.add("java"); lock.notify(); System.out.println("AnalyData End..."); }}}}
注意:除了dataList满足写条件,还要在AnalyData线程执行通知操作。
三、管道流通信
1、管道流简介
基本概念
管道流主要用于在不同线程间直接传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读取数据,进而实现不同线程间的通信。
实现分类
管道字节流:PipedInputStream和PipedOutputStream;
管道字符流:PipedWriter和PipedReader;
新IO管道流:Pipe.SinkChannel和Pipe.SourceChannel;
2、使用案例
public class NotifyThread02 {public static void main(String[] args) throws Exception {PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream(); // 链接输入流和输出流pos.connect(pis); // 写数据线程new Thread(new Runnable() {public void run() {BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); // 将从键盘读取的数据写入管道流PrintStream ps = new PrintStream(pos); while (true) {try {System.out.print(Thread.currentThread().getName()); ps.println(br.readLine()); Thread.sleep(1000); } catch (Exception e) {e.printStackTrace(); }}}}, "输入数据线程:").start(); // 读数据线程new Thread(new Runnable() {public void run() {BufferedReader br = new BufferedReader(new InputStreamReader(pis)); while (true) {try {System.out.println(Thread.currentThread().getName() + br.readLine()); } catch (IOException e) {e.printStackTrace(); }}}}, "输出数据线程:").start(); }}
写线程向管道流写入数据,读线程读取数据,完成基本通信流程。
四、生产消费模式
1、业务场景
基于线程等待通知机制:实现工厂生产一件商品,通知商店卖出一件商品的业务流程。
2、代码实现
public class NotifyThread03 {public static void main(String[] args) {Product product = new Product(); ProductFactory productFactory = new ProductFactory(product); ProductShop productShop = new ProductShop(product); productFactory.start(); productShop.start(); }}// 产品class Product {public String name ; public double price ; // 产品是否生产完毕,默认没有boolean flag ; }// 产品工厂:生产class ProductFactory extends Thread {Product product ; public ProductFactory (Product product){this.product = product; }@Overridepublic void run() {int i = 0 ; while (i < 20) {synchronized (product) {if (!product.flag){if (i%2 == 0){product.name = "鼠标"; product.price = 79.99; } else {product.name = "键盘"; product.price = 89.99; }System.out.println("产品:"+product.name+"【价格:"+product.price+"】出厂..."); product.flag = true ; i++; // 通知消费者product.notifyAll(); } else {try {// 进入等待状态product.wait(); } catch (InterruptedException e) {e.printStackTrace(); }}}}}}// 产品商店:销售class ProductShop extends Thread {Product product ; public ProductShop (Product product){this.product = product ; }@Overridepublic void run() {while (true) {synchronized (product) {if (product.flag == true ){System.out.println("产品:"+product.name+"【价格"+(product.price*2)+"】卖出..."); product.flag = false ; product.notifyAll(); //唤醒生产者} else {try {product.wait(); } catch (InterruptedException e) {e.printStackTrace(); }}}}}}
流程描述:ProductFactory生成一件商品,通知商店售卖,通过flag标识判断控制是否进入等待状态,商店卖出商品后,再次通知工厂生产商品。
五、源代码地址
GitHub·地址
https://github.com/cicadasmile/java-base-parent
GitEE·地址
https://gitee.com/cicadasmile/java-base-parent
【Java并发编程之线程间的通信】以上就是Java并发编程之线程间的通信的详细内容,更多关于Java 线程间通信的资料请关注脚本之家其它相关文章!
推荐阅读
- JAVA(抽象类与接口的区别&重载与重写&内存泄漏)
- 事件代理
- Java|Java OpenCV图像处理之SIFT角点检测详解
- java中如何实现重建二叉树
- 数组常用方法一
- 【Hadoop踩雷】Mac下安装Hadoop3以及Java版本问题
- python青少年编程比赛_第十一届蓝桥杯大赛青少年创意编程组比赛细则
- Java|Java基础——数组
- RxJava|RxJava 在Android项目中的使用(一)
- java之static、static|java之static、static final、final的区别与应用