多线程
JUC JUC是java.util.concurrent工具包的简称,这是一个处理线程的工具包,从JDK1.5开始出现。
进程与线程 进程Process:是系统进行资源分配和调度的基本单位,是操作系统结构的技术。进程是线程的容器,程序是指令、数据及组织形式的描述,进程是程序的实体。
线程Thread:是操作系统能够进行运算掉的最小单位,它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
线程的状态 线程状态枚举类
public enum State {
NEW,(新建)
RUNNABLE,(准备就绪),
BLOCKED,(阻塞)
WAITING,(等待:不见不散,约个人五点到,五点不到会继续等)
TIMED_WAITING,(等待:过时不候,约定时间未到直接走人)
TERMINATED;
(终结)
}
wait,sleep区别
- sleep()是Thread的静态方法;wait()是Object的方法,任何对象实例都能调用。
- sleep()执行时不会释放锁,也不需要占用锁;wait会释放锁,但是调用的前提是当前线程占有锁(即代码要在synchronized中)。
- 都可以被interrupted()方法中断,都是在哪里睡哪里醒。
JVM同步基于进入加锁和退出解锁,使用管程对象实现的,每个Java对象都一个管程对象,随着java对象一同创建和销毁。
用户线程、守护线程 用户线程:平时用到的普通线程,如自定义线程。当主线程结束后,用户线程还在运行,JVM存活。
守护线程:运行在后台的特殊线程,比如垃圾回收。如果没有用户线程了,只剩下守护线程,jvm结束。守护线程的设置
setDaemon(true)
要在线程运行之前。Synchronized
synchronized
是Java中的关键字,是一种同步锁,它修饰的对象有以下几种:- 修饰一个代码块,作用的对象是对用这个代码块的对象;
synchronized(this){
//修饰的是大括号中的代码
}
- 修饰一个方法,被修饰的方法称为同步方法,其作用范围是整个方法,作用对象是调用这个方法的对象。
- 修饰一个静态方法,作用范围是整个静态方法,作用的对象是这个类的所有对象。
- 修饰一个类;
- 创建资源类,在资源类创建属性和操作方法(高内聚,低耦合:资源有什么就调用什么,没有就不调用)。
- 创建多个线程,调用资源类的操作方法。
Ticket
有属性number
记录票数目,方法sale()
进行卖票并且通过synchronized加锁,package com.jiayuleng.jucstudy.sync;
class Ticket {
//票数
private int number = 30;
//操作方法:买票
public synchronized void sale() {
//判断:是否有票;
if(number > 0) {
System.out.println(Thread.currentThread().getName() + ": 卖出:" + (number--) + " 剩下:" + number);
}
}}public class SaleTicket {
//第二步 创建多个线程,调用资源类的操作方法
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0;
i < 40;
i++) {
ticket.sale();
}
}
}, "AA").start();
new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0;
i < 40;
i++) {
ticket.sale();
}
}
}, "bb").start();
new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0;
i < 40;
i++) {
ticket.sale();
}
}
}, "cc").start();
}
}
文章图片
如果不加锁,会出现线程不同步的情况。
文章图片
多线程的创建 一、继承于Thread类
- 创建一个继承于Thread类的子类。
- 重写Thread类的
run()
方法,将此线程执行的操作声明在run()中。 - 创建Thread类的子类的对象。
- 通过此类调用
start()
方法。
package com.jiayuleng.javabasestudy.threadstudy.threadcreate;
//1.创建一个继承于Thread类的子类。
//2.重写Thread类的run()方法,将此线程执行的操作声明在run()中。
//3.创建Thread类的子类的对象。
//4.通过此类调用start()方法。//例子:遍历100以内的所有的偶数/**
* @author ColdRain
*/
class MyThread extends Thread {
@Override
public void run() {
for(int i = 0;
i < 100;
i++) {
if(i % 2 == 0) {
System.out.println(i);
}
}
}
}public class ThreadTest {
public static void main(String[] args) {
MyThread t1 = new MyThread();
t1.start();
for(int i = 0;
i < 100;
i++) {
if(i % 2 == 0) {
System.out.println(i + "******main*******");
}
}
}
}
主线程与子线程交替运行如下图
文章图片
问题一:我们不能通过直接调用
run()
的方式启动线程,start()
方法会先启动当前线程,再调用当前线程的run()
方法。问题二:同一个线程对象不可以启动
start()
方法两次,会报IllegalThreadStateException
异常;实现Runnable接口
- 创建一个实现了Runnable接口的类
- 实现类去实现Runnable中的抽象方法:run()
- 创建实现类的对象
- 将此对象作为参数传递到Thread类的构造器中,创建Thread类的对象
- 通过Thread类的对象调用start()方法
package com.jiayuleng.javabasestudy.threadstudy.threadcreate2;
//1. 创建一个实现了Runnable接口的类
class Mythread implements Runnable {
//2. 实现类去实现Runnable中的抽象方法:run()
@Override
public void run() {
for(int i = 0;
i < 100;
i++) {
if(i % 2 == 0) {
System.out.println(i);
}
}
}
}public class ThreadTest2 {
public static void main(String[] args) {
//3. 创建实现类的对象
Mythread mythread = new Mythread();
//4. 将此对象作为参数传递到Thread类的构造器中,创建Thread类的对象
Thread t1 = new Thread(mythread);
//5. 通过Thread类的对象调用start()方法
t1.start();
// 也可以通过匿名内部类的形式实现Runnable创建Thread,效果是一样的
new Thread(new Runnable() {
@Override
public void run() {
...
}
}, "t1").start();
// 更进一步精简的效果
new Thread(() -> {
...
}, "t1").start();
}
}
比较创建两种线程方式: 开发中优先选择实现Runnable接口的方式
原因:
- 实现的方式没有类的单继承的局限性。
- 实现的方式更适合来处理多个线程有共享数据的情况。
联系:Thread类中的run也是实现了Runnable接口,从而重写run()
方法;
public class Thread implements Runnable
ticket
记录票量,但是ticket一定要加上static
声明为静态。如果不加限定的话,每新建一个窗口线程变量都会有有一个自己的ticket
属性,票对于三个窗口不是公共的而是分别计算的了。package com.jiayuleng.javabasestudy.threadstudy.threadcreate;
class Window extends Thread {
private static int ticket = 30;
@Override
public void run() {
while(true) {
if(ticket > 0) {
System.out.println(this.getName() + ": sale tickets, ticketid is : " +ticket--);
} else {
break;
}
}
}
}public class SaleTicket1 {
public static void main(String[] args) {
Window window1 = new Window();
Window window2 = new Window();
Window window3 = new Window();
window1.start();
window2.start();
window3.start();
}
}
如果不加
static
的效果:文章图片
而通过实现Runnable接口的方式,由于传入各线程线程中的窗口类对象是同一个,所以共用同一个ticket属性,不需要单加static。
package com.jiayuleng.javabasestudy.threadstudy.threadcreate;
class Window2 implements Runnable {
private int ticket = 30;
@Override
public void run() {
while(true) {
if(ticket > 0) {
System.out.println(Thread.currentThread().getName() + ": sale tickets, ticketid is " + ticket--);
} else {
break;
}
}
}
}public class SaleTicket2 {
public static void main(String[] args) {
Window2 window2 = new Window2();
Thread t1 = new Thread(window2);
Thread t2 = new Thread(window2);
Thread t3 = new Thread(window2);
t1.setName("t1");
t2.setName("t2");
t3.setName("t3");
t1.start();
t2.start();
t3.start();
}
}
Thread中的常用方法
- start():启动当前线程,调用当前线程的run()。
- run():通常需要重写Thread类中的此方法,将创建的线程要执行的操作声明在此方法中。
- currentThread():静态方法,返回执行当前代码的线程。
- getName():获取当前线程的名字。
- setName():设置当前线程的名字。
- yield():释放当前cpu的执行权。
- join():在线程a中调用线程b的join(),此时线程a进入阻塞状态,直到线程b完全执行完以后,线程a才结束阻塞状态。
- stop():已过时。当执行此方法时,强制结束当前线程。
- sleep(long millitime):让当前线程“睡眠”指定的millitime毫秒。在指定的millitime毫秒时间内,当前线程是阻塞状态。
- isAlive(): 判断当前线程是否存活。
文章图片
如何获取和设置当前线程的优先级:
getPriority()
:获取线程的优先级setPriority(int p)
:设置线程的优先级说明:高优先级的线程要抢占低优先级线程cpu的执行权,但是只是从概率上讲,高优先级的线程高概率的情况下被执行。并不意味着只有当高优先级的线程执行完以后,低优先级的线程才执行。
Lock锁 Lock锁提供了更多的功能,相对于Synchronized区别:
- Lock不是Java内置的,synchronized是Java内置关键字。Lock是一个类,通过这个类可以实现同步访问。
- Lock需要手动释放锁,如果没有主动释放锁就可能导致死锁现象。而synchronized不需要用户手动释放锁,在synchronized方法或synchroniized代码执行结束后会自动让线程释放对锁的占用。在发生异常时,synchronized会自动释放线程占有锁,Lock如果没有通过unlock()主动释放锁就很可能造成死锁;
- Lock可以让等待锁的线程响应中断,而synchronized不行,使用synchronized时等待的线程会一直等待下去,无法响应中断;
- Lock可以知道是否成功获取锁,synchronized则不行
- Lock可以提高多线程进行读操作的效率,当大量线程同时竞争时Lock性能远优于synchronized;
public interface Lock {
//获取锁
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//解锁
void unlock();
Condition newCondition();
}
ReentrantLock ReentrantLock,意为可重入锁,ReentrantLock是唯一实现了Lock接口的类。
下面通过
ReentrantLock
类实现资源的锁:class LTicket {
//票数
private int number = 30;
//创建可重入锁
private final ReentrantLock lock = new ReentrantLock();
//操作方法:买票
public void sale() {
try {
//上锁
lock.lock();
//判断:是否有票;
if (number > 0) {
System.out.println(Thread.currentThread().getName() + ": 卖出:" + (number--) + " 剩下:" + number);
}} finally {
//解锁
lock.unlock();
}
}}
线程间通信 线程间通信的模型有两种:共享内存,消息传递。
多线程编程步骤:
- 创建资源类,在资源类中创建创建属性和操作方法;
- 资源类的操作方法:
(1)判断
(2)干活
(3)通知 - 创建多线程,调用资源类的操作方法;
- wait():线程释放锁,进入等待;
- notify():随机唤醒某个等待的线程继续运行;
- notifyAll():唤醒所有等待线程继续运行;
Lock接口与Condition接口线程通信 Lock接口:
- lock():获取锁
- unlock():释放锁
- await():对应wait()方法
- signal():对应notify()方法
- signalAll():对应notifyAll()方法
//首先创建资源类
class Share {
//创建资源类属性与操作方法;
//初始值
private int number = 0;
// +1的方法
public synchronizedvoid incr() throws InterruptedException {
//第二步 判断 干活 通知
//判断
if(number != 0) {
//当数值为1时,进行等待,等待数值被其他线程变为0后进行告知
this.wait();
}
//满足条件后,干活
number++;
System.out.println(Thread.currentThread().getName() + " :: " + number);
//干完活通知其他线程
this.notifyAll();
}//-1的方法
public synchronizedvoid decr() throws InterruptedException {
if(number != 1) {
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + " :: " + number);
this.notifyAll();
}
}//调用演示
public class ThreadDemo1 {
public static void main(String[] args) {
Share share = new Share();
//创建线程
new Thread(()-> {
for(int i = 1;
i <= 10;
i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "AA").start();
new Thread(()->{
for(int i = 1;
i < 10;
i++) {
try {
share.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "BB").start();
}
}
虚假通信 在上述代码的基础上如果再加入两个线程,也是分别实现+1,-1;但是此时可能会发生错乱,因为wait()方法在if判断中,在哪里睡去会在哪里醒来。当两个+1线程,彼此相互抢占线程,会从wait()醒来继续往下执行程序,这时便出现了奇怪的值,这就是虚假通信:
文章图片
wait():
对于某一个参数的版本,实现中断和虚假唤醒是可能的,而且此方法应始终在循环中使用:
文章图片
当把wait()方法放置在while中,便可解决这个问题;(举例好比登机安检,安检过后如果下机后还要重新安检)
while(number != 1) {
this.wait();
}
通过Lock接口实现线程间通信
- 创建资源类,资源属性,资源方法
- 资源方法中,先判断条件加锁,干活,通知其他线程,释放锁
class Share {
private int number = 0;
//lock创建
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
//+1
public void incr() throws InterruptedException {
lock.lock();
try {
//判断
while(number != 0) {
condition.await();
}
//干活
number++;
System.out.println(Thread.currentThread().getName() + " :: " + number);
//通知
condition.signalAll();
} finally {
//释放锁
lock.unlock();
}
}
//-1
public void decr() throws InterruptedException {
lock.lock();
try {
while(number != 1) {
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + " :: " + number);
condition.signalAll();
} finally {
lock.unlock();
}
}
}
线程间定制化通信 要求按照顺序,A线程打印5次A后,B线程打印10次B,C线程打印15次C,整体重复几次;
实现的方案:设定通信标志位:
number
,当其为0时,打印A;当其为1时,打印B;当其为C时,打印C;资源方法中,通过标志位
number
判断此时是否应该执行,执行结束后设定下一顺序标志位,并唤醒对应锁;class ShareSource {
int number = 0;
Lock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
Condition conditionC = lock.newCondition();
void printA() {
lock.lock();
try {
while(number != 0) {
conditionA.await();
}
for(int i = 0;
i < 5;
i++) {
System.out.println(Thread.currentThread().getName() + " :: A");
}
number = 1;
conditionB.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}void printB() {
lock.lock();
try {
while(number != 1) {
conditionB.await();
}
for(int i = 0;
i < 10;
i++) {
System.out.println(Thread.currentThread().getName() + " :: B");
}
number = 2;
conditionC.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}}void printC() {
lock.lock();
try {
while(number != 2) {
conditionC.await();
}
for(int i = 0;
i < 15;
i++) {
System.out.println(Thread.currentThread().getName() + " :: C");
}
number = 0;
conditionA.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}}
}
集合的线程安全 多线程在对同一个集合对象又读又写的时候,就会产生集合安全问题;
如果使用
ArrayList
集合,会产生线程安全问题;public class NotSafeDemo {
public static void main(String[] args) {
//ArrayList不是线程安全的
List list = new ArrayList<>();
//Vetor解决线程安全
//Listlist = new Vector<>();
//从集合类中获取
//List list = Collections.synchronizedList(new ArrayList<>());
//通过CopyOnWriteArrayList解决
//List list = new CopyOnWriteArrayList<>();
for(int i = 0;
i < 20;
i++) {
String num = String.valueOf(i);
new Thread(() -> {
list.add(String.valueOf(num));
System.out.println(list);
}, "线程" + i).start();
}}
}
文章图片
Vector
、Collections,synchronizedList(new ArrayList<>())
是线程安全的集合,可以解决上述异常;重点:
CopyOnWriteArrayList()
也是线程安全集合,它相当于线程安全的ArrayList,它是基于动态数组机制;CopyOnWriteArrayList
内部有个volatile数组,在“添加、修改、删除”操作时,都会新建一个拷贝数组并对新数组进行操作,之后再将操作后对数组覆盖原先的数组中。由于涉及修改数据的操作都会新建数组,所以执行修改数据操作效率很低,如果是进行遍历查找效率比较高;CopyOnWriteArrayList的add操作:
文章图片
Hashset
线程不安全,CopyOnWriteArraySet
线程安全HashMap
线程不安全,ConcurrentHashMap
线程安全;
- 一个对象里如果有多个
synchronized
方法,只要一个线程去调用了其中一个,其余线程皆需等待;换句话说,同一对象某一时刻,只能有唯一线程去执行synchronized
方法; - 线程调用普通方法不受
synchronized
方法的影响 - 不同对象使用的不是同一把锁,互相独立;
synchronized
实现同步的基础:Java中的每一个对象都可以做为锁:- 对于同步方法,锁是当前实例对象;
- 对于静态同步方法,锁是当前类的Class对象
- 对于同步方法快,锁是Synchronized括号里配置的对象;
在Java并发编程中,公平锁是保障了多线程下各线程获取锁的顺序。非公平锁无法提供这个保障;
//非公平锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//区别重点看这里
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}//公平锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//hasQueuedPredecessors这个方法就是最大区别所在
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
实际中:
ReentranLock(fair: true)
传入变量为true,即为公平锁,传入false即为非公平锁;可重入锁 可重入锁就是某个线程已经获得某个锁,可以再次获取此锁而不会出现死锁!
可重入锁有:
- synchronized
- ReentrantLock
public class WhatReentrant {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
synchronized (this) {
System.out.println("第1次获取锁" + this);
int index = 1;
while(true) {
synchronized (this) {
System.out.println("第" + (++index) + "次获取锁" + this);
}
if(index == 10) {
break;
}
}
}
}
}).start();
}
}
文章图片
死锁 死锁,是指多个进程在运行过程中因争夺资源而造成的一种互相等待的现象,若没有外力干涉,他们无法再执行下去。
文章图片
死锁代码
package com.jiayuleng.jucstudy.sync;
import java.util.concurrent.TimeUnit;
public class DeadLock {
static Object a = new Object();
staticObject b = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (a) {
System.out.println(Thread.currentThread().getName() + "持有a锁,试图获取b锁");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (b) {
System.out.println(Thread.currentThread().getName() + "持有b锁,试图获取a锁");
}
}
}, "A").start();
new Thread(() -> {
synchronized (b) {
System.out.println(Thread.currentThread().getName() + "持有b锁,试图获取a锁");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (a) {
System.out.println(Thread.currentThread().getName() + "持有a锁,试图获取b锁");
}
}
}, "B").start();
}
}
文章图片
通过jsp工具验证是否是死锁
jps -l
类似于linux的ps -ef
jstack
:jvm自带堆栈跟踪器
jps -l
文章图片
jstack 11168
文章图片
B线程在等待锁<0x04f7c308>,已锁 <0x04f7c310>;
A线程在等待锁 <0x04f7c310>,已锁<0x04f7c308>;
造成死锁;
Callable接口 目前学习了两种建立创建线程的方法,一种是继承Thread类,第二种是实现Runnable接口;Runnbale接口缺少的一项功能是,当线程终止时即run()函数运行完成时,无法使线程返回结果,因为run函数是一个void类型的,为了支持返回结果功能,Java提供了Callable接口;
Callable接口的特点
- 和Runnable接口需要实现void类型的
run()
函数类似,Callable接口需要实现有返回值类型的call()
函数;
public interface Callable {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
call()
方法可以引发异常,而run()
不可以;Callable
接口和Runnable
接口不可以直接替换,Thread
类的构造方法中并没有Callable
类型;
Future
接口的意义即为未来任务,当线程call()
方法完成时,结果必须存储在主线程已知的对象中,以便主线程可以知道该线程返回的结果。Futre基本上是主线程可以跟踪其他线程进度以及线程结果的一种方式,要实现此接口,必须重写5种方法,下面列举其中三种重要方法:
public boolean cancel (boolean mayInterrupt)
:用于停止任务,如果任务尚未启动,它将停止任务,如果已停止,则仅在mayInterrupt
为true时中断任务;public Object get()
抛出InterruptedException
、ExecutionException
异常,当任务完成,它将立即返回结果;public boolean isDone
:如果任务完成,则返回true,否则返回false,可以看到任务运行状态;Callable
与Runnable
类似,封装了线程中运行的任务,Futre
用于存储从线程获取的结果。实际上,Future
也可以与Runnable
一起使用,Runnable
负责创建线程,Future
负责获取结果;
FutureTask
类实现了Runnable
接口,也实现了Future
接口,作为连接Thread
类与Callable
接口间的桥梁;文章图片
FutureTask
类构造函数可以传入Callable
类型: public FutureTask(Callable callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
// ensure visibility of callable
}
新建线程变量Thread可以传入
FutureTask
类型,这样就可以新建线程了;新建类实现
Callable
接口,新建类实现Runnable接口;//比较两接口
//实现Callable接口
class MyThread2 implements Callable {@Override
public Object call() throws Exception {
return 200;
}
}//实现Runnable
class MyThread1 implements Runnable {@Override
public void run() {}
}public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//Runnable接口创建线程
new Thread(new MyThread1(), "AA").start();
//Callable接口不能直接替换Runnable
//new Thread(new MyThread2(), "BB").start();
//使用FutureTask
FutureTask futureTask1 = new FutureTask<>(new MyThread2());
//lambda表达式简化
FutureTask futureTask2 = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName() + " come in callable!");
return 1024;
});
//创建线程
new Thread(futureTask2, "lucy").start();
//调用Future的isDone()方法判断线程此时的完成状态;
while(!futureTask2.isDone()) {
System.out.println("wait...");
}//调用FutureTask的get方法,获取线程已经运行得到的结果
System.out.println(futureTask2.get());
//第二次调用get方法,依然可以直接获取结果
System.out.println(futureTask2.get());
System.out.println(Thread.currentThread().getName() + " come over!");
}
}
运行结果:
文章图片
JUC三大辅助类 JUC提供了三种常用的辅助类,通过这些辅助类可以很好地解决线程数量过多时Lock锁的频繁操作,三种辅助类如下:
CountDownLatch
:减少计数CyclicBarrier
:循环栅栏Semaphore
:信号灯
CountDownLatch
CountDownLatch
类可以设置一个计数器,通过countDown()
方法进行计数减1的操作,使用await()
方法等待计数器不大于0(减至0),然后继续执行await()
方法之后的语句;例子:班级里有7名同学,6名普通同学(其他线程),1班长(主线程),只有班长能够锁门,所以要求其他六名同学都离开后,班长才能够锁门;
public class CountDownLatchPractice {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
//六名同学线程
for(int i = 0;
i <= 6;
i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " has left");
countDownLatch.countDown();
}, "student" + i).start();
}
//班长主线程等待
System.out.println("main thread sleep");
countDownLatch.await();
//当CountDownLatch的计数器归零后,便从等待await状态跳出,继续向下执行
System.out.println("all students left, now count num is " + countDownLatch.getCount());
}
}
文章图片
循环栅栏
CycLicBarrier
CyclicBarrier
循环栅栏大概意为循环阻塞,在使用中CyclicBarrier
的构造方法第一个参数是目标障碍数,每次执行一次障碍数+1,如果达到了目标障碍数则会执行cyclicBarrier.await()
之后的语句,可以将CyclicBarrier
理解为+1操作;构造函数:
文章图片
例子:集齐7颗龙珠召唤神龙
注:这里编写代码时,遇到了一个小问题,在静态的主函数方法中调用外部变量,一定需要是静态变量的问题:面试官:为什么java中静态方法不能调用非静态方法和变量?;
public class CyclicBarrierPractice {
static int NUMBER = 7;
public static void main(String[] args) {
//新建CyclicBarrier变量,给定数量7个线程处于等待状态时启动,并会启动barrier给定的屏障操作;
CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () -> {
System.out.println("******集齐7颗龙珠可以召唤神龙");
});
//集齐7颗龙珠的过程
for(int i = 1;
i <= 7;
i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "星龙珠被收集到了");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}
信号灯
Semaphore
文章图片
Semaphore
的构造方法中传入的第一个参数是最大信号量(可以看成最大线程池),每个信号量初始化为一个最多只能分发一个许可证。使用acquire()
方法获取许可证,release()
方法释放许可;例子:抢车位,6辆汽车停靠3个汽车位;
public class SemaphorePractice {
//抢车位,6辆汽车停靠3个汽车位;
public static void main(String[] args) {
//创建Semaphore,设置许可数量
Semaphore semaphore = new Semaphore(3);
//模拟6辆汽车
for(int i = 0;
i <= 6;
i++) {
new Thread(() -> {
//抢占
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 抢占到了车位");
//设置随机停车时间
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
//离开车位
System.out.println(Thread.currentThread().getName() + "----离开车位");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放
semaphore.release();
}
}, "第" + String.valueOf(i) + "车").start();
}
}
}
文章图片
读写锁 读写锁介绍 对共享资源面临读和写操作,一般写操作也没有读操作频繁。多个线程可以同时读一个资源;写操作只能有一个线程去操作共享资源,同时不允许其他线程对资源进行读和写的操作;
针对上述场景,Java并发包提供了读写锁
ReentrantReadWriteLock
,它同时表示两个锁,一个是读操作相关的锁——共享锁,一个是写相关的锁——排他锁;读写锁的实例 当不使用读写锁,多线程对一个资源类
Map
进行读写操作:import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
//resource class
class MyCache {//create map
private volatile Map map = new HashMap<>();
//put data in map
public void put(String key, Object value) {
System.out.println(Thread.currentThread().getName() + " is writing " + key);
//pause
try {
TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}//put data
map.put(key, value);
System.out.println(Thread.currentThread().getName() + " finish write " + key);
}//get data
public Object get(String key) {
System.out.println(Thread.currentThread().getName() + "is reading " + key);
try {
TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}Object result = null;
result = map.get(key);
System.out.println(Thread.currentThread().getName() + " finish get " + key);
return result;
}
}public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for(int i = 0;
i < 5;
i++) {
//create thread to put data
final int num = i;
new Thread(() -> {
myCache.put(num + "", num + "");
}, String.valueOf(i)).start();
}for(int i = 0;
i < 5;
i++) {
//create thread to get data by key
final int num = i;
new Thread(() -> {
myCache.get(num + "");
}, String.valueOf(i)).start();
}
}
}
运行结果如下,读写线程显然没有在写线程完成之前便开始了工作,这是我们不允许的!
文章图片
下面是使用读写锁:
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
//resource class
class MyCache {//create map
private volatile Map map = new HashMap<>();
// 在资源类中创建读写锁,读写锁是同一个类对象,使用的时候调用不同的方法进行加锁即可;
//create ReadWriteLock
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
//put data in map
public void put(String key, Object value) {
//add write lock
rwLock.writeLock().lock();
try{
System.out.println(Thread.currentThread().getName() + " is writing " + key);
//pause
TimeUnit.MICROSECONDS.sleep(300);
//put data
map.put(key, value);
System.out.println(Thread.currentThread().getName() + " finish write " + key);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rwLock.writeLock().unlock();
}
}//get data
public Object get(String key) {
Object result = null;
rwLock.readLock().lock();
try{
System.out.println(Thread.currentThread().getName() + "is reading " + key);
TimeUnit.MICROSECONDS.sleep(300);
result = map.get(key);
System.out.println(Thread.currentThread().getName() + " finish reading " + key);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();
}
return result;
}
}public class ReadWriteLockDemo {
public static void main(String[] args) throws InterruptedException {
MyCache myCache = new MyCache();
for(int i = 0;
i < 5;
i++) {
//create thread
final int num = i;
new Thread(() -> {
myCache.put(num + "", num + "");
}, String.valueOf(i)).start();
}TimeUnit.MICROSECONDS.sleep(300);
for(int i = 0;
i < 5;
i++) {
//create thread
final int num = i;
new Thread(() -> {
myCache.get(num + "");
}, String.valueOf(i)).start();
}
}
}
在使用读写锁后,线程再对资源进行访问就满足我们的要求了,在写线程完成后读线程才开始工作;
文章图片
锁降级 锁降级:指的是写锁降级成为读锁,且要求把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。如果当前线程拥有写锁,然后将其释放,最后在获取读锁,这种分段完成的过程不能称之为锁降级。
锁降级示例:
public void processData() {
readLock.lock();
if(!update) {
//必须先释放读锁
readLock.unlock();
//锁降级从写锁获取到开始
writeLock.lock();
try {
if(!update) {
//准备数据的流程(略)
...
upddate = true;
}
//把持住写锁的同时获取了读锁
readLock.lock();
} finally {
//释放写锁
writeLock.unlock();
}
//锁降级完成,写锁降级为读锁
}
try {
//使用数据的流程(略)
...
} finally {
readLock.unlock();
}
}
锁降级中的读锁的获取是否必要?
答案是必要!!!
主要是为了保证数据的可见性,如果当前线程不获取读锁而是直接释放写锁,将设此刻另一个线程T获取了写锁修改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁,既遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新;
其实也不难理解,只要线程获取写锁,那么这一刻只有这一个线程可以在临界区操作,它自己写完的东西,自己的是可以看见的,所以写锁降级为读锁是非常自然的一种行为,并且几乎没有任何性能影响
为何ReentrantReadWriteLock不支持升锁(把持读锁,获取写锁,最后释放读锁)?
目的同上一样是为了保持数据的可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可见的。
是反过来就不一定行的通了,因为读锁是共享的,也就是说同一时刻有大量的读线程都在临界区读取资源,如果可以允许读锁升级为写锁,这里面就涉及一个很大的竞争问题,所有的读锁都会去竞争写锁,这样以来必然引起巨大的抢占,这是非常复杂的,因为如果竞争写锁失败,那么这些线程该如何处理?是继续还原成读锁状态,还是升级为竞争写锁状态?这一点是不好处理的,所以Java的api为了让语义更加清晰,所以只支持写锁降级为读锁,不支持读锁升级为写锁。
举个生活中的例子,在一个演唱会中,台上有一名歌手在唱歌,我们可以理解为它是写锁,只有他在唱歌,同时台下有很多观众在听歌,观众也就是读锁,现在假如歌手唱完了,它可以立马到台下很轻松的就降级为一名观众,但是反过来我们宣布一项规定,谁先登上舞台上,谁就是歌手可以演唱一首歌并获得奖金,如果真的是这样,那么所有人必然会蜂拥而上,这时候就乱了,弄不好还会出现踩踏事故,所以观众升级为歌手这件事情代价是比较大的。
阻塞队列 BlockingQueue 阻塞队列(BlockingQueue),是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会被阻塞等待队列变为非空。在队列满时,存储元素的线程会阻塞等待队列可用
文章图片
阻塞队列常用于生产者消费者场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。而阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
阻塞队列提供了四种处理方法:
- 抛出异常:插入
add(e)
, 移除remove()
, 检查element()
; - 返回特殊值:插入
offer(e)
,移除poll()
, 检查peek()
,无检查; - 一直阻塞:插入
put(e)
, 移除take()
- 超时退出;插入
offer(e,time,unit)
,移除poll(time,unit)
,无检查;
文章图片
线程池的优点:
线程池要做的工作只是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等待,等其他线程执行完毕,再从队列中取出任务来执行。
主要特点:
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
- 提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行;
- 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分调优和监控;
- Java中的线程池是通过Executor框架实现的,该框架中用到了Executor、Executors、ExecutorService、ThreadPoolExecutor这几个类;
int corePoolSize
:线程池的核心线程数;int maximumPoolSize
:能容纳的最大线程数;long keepAliveTime
:空闲线程存活时间;TimeUnit unit
:存活的时间单位;BlockingQueue
:阻塞队列,存放提交单位执行任务的队列;workQueue ThreadFactory threadFactory
:创建线程的工厂类;RejectedExecutionHandler handler
:阻塞队列满后的拒绝策略;
corePoolSize
-核心线程数(最小线程数),workQueue
-阻塞队列,maximumPoolSize
-最大线程数。当提交任务数大于
corePoolSize
的时候,会优先将任务放到workQueue
阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到maximumPoolSize
最大线程配置。此时再有新的任务,就会触发线程池的拒绝策略。总结:当提交的任务数大于
workQueue.size()+maximumPool.size()
时,就会触发线程池的拒绝策略。线程池底层的运行逻辑 我们设线程池中核心线程数量为2,最大线程数量为5;
那么线程池运行的逻辑为:
- 核心线程没有被占用的时候,任务来由核心线程执行,直到核心线程占满;
- 核心线程已全被占用,这时新任务再来则放入阻塞队列,直到阻塞队列满;
- 核心线程、阻塞队列均满,这时再有新任务来则新建线程直接来执行任务,依次往复直至达到最大线程数量;
- 线程的建立已达到最大线程数量,且所有线程和阻塞队列均满,这时再有新任务到达则执行拒绝策略;
文章图片
CallerRunsPolicy
::当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程者直接运行任务(谁派来的找谁执行)。一般并发比较小、性能要求不高、不允许失败。但是由于是调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大。2.
AbortPolicy
:丢弃任务并抛出拒绝执行异常RejectedExecutionExeception
,线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续任务执行。3.
DiscardPolicy
:直接丢弃,其他什么都没有;4.
DiscardOldestPolicy
:当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列workQueue
中最老的一个任务,并将新任务加入。(谁等的时间最长丢弃谁,加入最新的)线程池的种类与创建
Executors.newFixedThreadPool(int)
一池N线程
public class ThreadPoolDemo1 {
public static void main(String[] args) {
//一池N线程
ExecutorService executorService = Executors.newFixedThreadPool(5);
//模拟5个窗口,10个顾客
try {
for(int i = 0;
i <= 10;
i++) {
executorService.execute(()->{
System.out.println(Thread.currentThread().getName() + " do service");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}
文章图片
Executors.newSingleThreadExecutor()
一池一线程
public class ThreadPoolDemo1 {
public static void main(String[] args) {
//一池N线程
ExecutorService executorService = Executors.newSingleThreadExecutor();
//模拟5个窗口,10个顾客
try {
for(int i = 0;
i <= 10;
i++) {
executorService.execute(()->{
System.out.println(Thread.currentThread().getName() + " do service");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}
文章图片
Executors.newCachedThreadPool
线程池根据需求创建线程、可扩容、遇强则强 作用:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。特点:
- 线程池中数量没有固定,可达到最大值(
Integer.MAX_VALUE
); - 线程池中的线程可进行缓存重复利用和回收(回收默认时间为1分钟)
- 当线程池中,没有可用线程,会重新创建一个线程;
public class ThreadPoolDemo1 {
public static void main(String[] args) {
//一池N线程
ExecutorService executorService = Executors.newCachedThreadPool();
//模拟5个窗口,10个顾客
try {
for(int i = 0;
i <= 10;
i++) {
executorService.execute(()->{
System.out.println(Thread.currentThread().getName() + " do service");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}
文章图片
自定义线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
文章图片
文章图片
实例:
public class ThreadPoolDemo1 {
public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
try {
for(int i = 0;
i <= 10;
i++) {
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName() + " do service");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
文章图片
使用CallableFuture、Future、ExecutorService实现线程池
ExecutorService
会自动提供线程池和相关API,用于为其分配任务。ExecutorService
可以执行Runnable
和Callable
任务;
Runnable runnableTask = () -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Callable callableTask = () -> {
TimeUnit.MILLISECONDS.sleep(300);
return "Task's execution";
};
List> callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);
【Java|Java多线程,JUC学习】创建完任务,就可以用多种方法将任务分配给
ExecutorService
,比如execute()
,还有submit()
, invokeAny()
, invokeAll()
等方法,这些方法都继承自Executor
接口。- 首先看
execute()
方法
该方法返回值为void
,因此使用该方法没有任何可以获得的任务执行结果或者检查任务的状态(任务正在运行running
还是执行完毕executed
)。 submit()
submit()
方法会将一个Callable
或Runnable
方法提交给ExecutorService
并返回给Future
类型的结果。
Future future = executorService.submit(callableTask);
invokeAny()
invokeAny()
方法将一组任务分配给ExecutorService
,使每个任务执行,并返回任意一个成功执行的任务的结果;
String result = executorService.invokeAny(callableTasks);
invokeAll()
invokeAll()
方法将一组任务分配给ExecutorService
,使每个任务执行,并以Futrue类型的对象列表的形式返回所有任务执行的结果。
List> futures = executorService.invokeAll(callableTasks);
Future接口 任务分配给
ExecutorService
执行之后,要获取运行后的结果,这里通过Future
接口实现。get()
方法Future
接口提供了一个特殊的阻塞方法get()
, 他返回Callable任务执行的实际结果,但如果是Runnable任务只会返回null。因为
get()
方法是阻塞的,如果调用get()
方法时任务仍在运行,那么调用将会一直被阻塞,知道任务正确执行完毕并结果可用时才返回。更重要的是,正在执行的任务随时都可能抛出异常或中断执行,因此我们要将get()
调用放在try catch
语块中,捕捉InterruptedException
或ExecutionException
异常。
Future future = executorService.submit(callableTask);
String result = null;
try {
result = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
关闭
ExecutorService
在我们通过Future
接口的get()
方法获得了任务运行的结果后,一般情况下ExecutorService
并不会自动关闭,它会一直处于等待状态,等待我们给它分配新的工作。这种机制在某些情况下非常有用,比如:如果应用程序需要处理不定期出现的任务,或者在编译时不知道这些任务的数量。
但也有副作用,比如:即使应用程序可能已经到达它的终点,但并不会被停止,因为等待的ExecutorService会导致JVM继续运行,这样我们就需要主动关闭ExecutorService。要正确地关闭ExecutorService,可以调用实例的
shutdown()
, shutdownNow()
方法。shutdown()
shutdown()
方法并不会立即销毁ExecutorService实例,而是首先让ExecutorService停止接收新任务,并在所有正在运行的线程完成当前工作后关闭。
executorService.shutdown();
shutdownNow()
List notExecutedTasks = executorService.shutDownNow();
shutdownNow()
方法会尝试立即销毁ExecutorService实例,所以并不能保证所有正在运行的线程将同时停止。该方法会返回等待处理的任务列表,由开发人员自行决定如何处理这些任务。该方法会返回等待处理的任务列表,有开发人员自行决定如何处理这些任务。推荐阅读
- JAVA|JAVA IO - 一文搞懂字节-字符转换流 - InputStreamReader+OutputStreamWriter
- kafka|Kafka详细教程-及热点面试题
- 面试|裸辞后的两个月,我拿到了360的offer
- 遇见Golang|【Go开源宝藏】十分强大的日志库 logrus
- python|wasm转c调用实战
- springSecurity|3.spring security授权流程
- #|智能电网中需求响应研究(Matlab代码实现)
- Python创造新事物|用Python制作我的核酸检测日历
- Java|从无到有构建亿级高性能高并发高可用的亿级微服务秒杀系统