《实战java高并发程序设计》笔记(三)
写在前面
前1、2章中,我们对于 并发编程中一些基本的概念和原理有了一定的了解。
在第3章中,我们将学习一些 JDK 内部提供的使用的API 和框架。主要分为3个部分:同步控制的工具、线程池的支持、支持并发的容器
第三章 JDK并发包
文章图片
第三章知识框架图 3.1.1 重入锁 ReentrantLock
重入锁使用 java.util.concurrent.locks.ReentrantLock 类实现。之所以叫重入是因为其允许 锁反复进入,这里的反复进入限于一个线程。
// lock 是一个 ReentrantLock 对象
lock.lock();
lock.lock();
try{
i++;
}finally{
lock.unlock();
lock.unlock();
}
重入锁跟 synchronized 比较:
(1)原始构成
- Sync 是属于JVM层面的关键字。编译后会在同步块的前后分别形成 monitorenter & monitorexit 两个字节码指令。在执行 monitorenter 指令时,首先尝试获取对象锁。如果获取锁成功,则把 锁的计数器 加1;相应的在执行 monitorexit 指令时将锁的计数器减1,计数器为0,锁就被释放了。
- ReentrantLock 是API锁
(2)使用方法 - Sync 不需要手动释放锁
- ReentrantLock 需要手动释放,不然会死锁。一般写在 finally{ unlcok() }
(3)等待是否可中断
(5)绑定多个条件Condition
ReentrantLock 通过绑定多个条件可以实现精准的分组唤醒线程。而 Sync 只能随机唤醒一个或者全部唤醒。
下面看一下重入锁的重要方法:
- lock():获得锁,锁被占用则等待。
- lockInterruptibly():获得锁,但是优先响应中断
- tryLock():尝试获得锁,返回boolean,不等待
- tryLock(long time, TimeUnit unit):给定时间内获得锁
- unlock():释放锁
1. 等待可中断
就是说,在等待锁的同时,可以响应中断。通常情况下,这种机制被用于处理死锁。
package ConcurrencyTest;
import java.util.concurrent.locks.ReentrantLock;
/**
* 测试重入锁中的响应中断功能
*/public class LockInterruptilyTest {
private static ReentrantLock lock = new ReentrantLock();
private static class ReentrantLockThread implements Runnable{
@Override
public void run() {
try{
lock.lockInterruptibly();
for (int i=0;
i<3;
i++){
System.out.println(Thread.currentThread().getName()+"得到了锁 i="+i);
}
}catch (InterruptedException e){
System.out.println(Thread.currentThread().getName()+"被打断了");
}finally {
// 查询当前线程是否保持此锁
if(lock.isHeldByCurrentThread()){
lock.unlock();
}
}
}
}public static void main(String args[]) {
ReentrantLockThread test = new ReentrantLockThread();
Thread t1 = new Thread(test,"t1");
Thread t2 = new Thread(test,"t2");
Thread t3 = new Thread(test,"t3");
t1.start();
t2.start();
t3.start();
t2.interrupt();
}
}
运行结果
t1得到了锁 i=0
t1得到了锁 i=1
t1得到了锁 i=2
t2被打断了
t3得到了锁 i=0
t3得到了锁 i=1
t3得到了锁 i=2
2. 公平锁
只需要设置 ReentrantLock(True)即可。
ReentrantLock fairLock = new ReentrantLock(True);
下面看一个公平锁的例子
package ConcurrencyTest;
import java.util.concurrent.locks.ReentrantLock;
/**
* 用 ReentrantLock 实现公平锁
*/
public class FairLockViaReentrantLock implements Runnable{
private static ReentrantLock fairLock = new ReentrantLock(true);
@Override
public void run() {
int i = 0;
while (i<10){
try{
fairLock.lock();
System.out.println(Thread.currentThread().getName()+"获得锁");
}finally {
fairLock.unlock();
}
i++;
}
}public static void main(String args[]){
FairLockViaReentrantLock f1 = new FairLockViaReentrantLock();
Thread t1 = new Thread(f1,"Thread_t1");
Thread t2 = new Thread(f1,"Thread_t2");
t1.start();
t2.start();
}
}
由于线程是由公平锁实现的,执行结果两个线程交替获得锁。
Thread_t1获得锁
Thread_t2获得锁
Thread_t1获得锁
Thread_t2获得锁
3. 绑定多个条件实现精确唤醒
在这里例子里面,就是通过3个Condition条件,来实现锁的精确唤醒顺序。比如这个 线程执行为 A--B--C 这样的 。
/**
* 锁绑定多个条件Condition
* 题目:多线程之间按顺序执行,实现A->B->C三个线程启动,要求如下: A打印5次,B打印10次,C打印15次,
* 紧接着 A打印5次,B打印10次,C打印15次, . . . 循环执行10轮
*/
public class LockConditionDemo {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
new Thread(() -> {
for (int i = 0;
i < 10;
i++) {
shareResource.print5();
}
}, "A").start();
new Thread(() -> {
for (int i = 0;
i < 10;
i++) {
shareResource.print10();
}
}, "B").start();
new Thread(() -> {
for (int i = 0;
i < 10;
i++) {
shareResource.print15();
}
}, "C").start();
}
}/**
* 共享资源类
*/
class ShareResource {
// A:1 B:2 C:3
private int num = 1;
private Lock lock = new ReentrantLock();
private Condition conditionA = lock.newCondition();
private Condition conditionB = lock.newCondition();
private Condition conditionC = lock.newCondition();
// 循环打印5次
public void print5() {
// 1、获取锁资源
lock.lock();
try {
// 2、判断是否可以执行业务
while (num != 1) {
// 阻塞等待
conditionA.await();
}
// 模拟业务执行
for (int i = 0;
i < 5;
i++) {
System.out.println(Thread.currentThread().getName() + "\t" + (i + 1));
}
// 3、通知其他线程,通过signal()方法唤醒线程
num = 2;
conditionB.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}// 循环打印10次
public void print10() {
// 1、获取锁资源
lock.lock();
try {
// 2、判断是否可以执行业务
while (num != 2) {
conditionB.await();
}
// 模拟业务执行
for (int i = 0;
i < 10;
i++) {
System.out.println(Thread.currentThread().getName() + "\t" + (i + 1));
}
// 3、通知其他线程
num = 3;
conditionC.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}// 循环打印15次
public void print15() {
// 1、获取锁资源
lock.lock();
try {
// 2、判断是否可以执行业务
while (num != 3) {
conditionC.await();
}
// 模拟业务执行
for (int i = 0;
i < 15;
i++) {
System.out.println(Thread.currentThread().getName() + "\t" + (i + 1));
}
// 3、通知其他线程
num = 1;
conditionA.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
执行结果
A1
A2
A3
A4
A5
B1
B2
B3
B4
B5
B6
B7
B8
B9
B10
C1
C2
C3
C4
C5
C6
C7
C8
C9
C10
C11
C12
C13
C14
C15
......// 后面循环10次
3.1.2 Condition 条件 Condition对象和 wait() & notify() 方法作用基本是类似的。但是 wait() 和 notiry() 是和 sync 关键字结合使用的,而 Condition 是跟重入锁关联的。
通过 Lock 接口(重入锁就是实现了这个接口)的 Condition newCondition() 方法,生成一个与当前重入锁绑定的 Condition 实例。利用 Condition 独享,就可以让线程在合适的时间等待,或者通知线程继续执行。
Condition 接口提供的基本方法:
- await():让当前线程等待,同时释放锁,这个等待是响应中断的
- awaitUninterruptibly():
- awaitNanos()
- await(long time, TimiUnit unit):带时间的等待
- awaitUntil(Date deadline)
- void signal():唤醒
- void signalAll()
package ConcurrencyTest;
import java.util.concurrent.locks.*;
/**
* 测试Condition条件相关的等待和唤醒功能
*/
public class ConditionTest {
private static ReentrantLock lock = new ReentrantLock();
// 通过一个lock对象,生成 与 对象绑定的 Condition 对象
private static Condition condition = lock.newCondition();
public static class ConditionThread implements Runnable{
@Override
public void run() {
try{
lock.lock();
condition.await();
System.out.println("等待完成,线程继续执行");
}catch (InterruptedException e){
e.printStackTrace();
}
finally {
lock.unlock();
}
}
}public static void main(String args[]) throws InterruptedException{
ConditionThread conditionThread = new ConditionThread();
Thread thread = new Thread(conditionThread,"t1");
thread.start();
Thread.sleep(2000);
// 这里还是给 signal() 方法加锁了
lock.lock();
condition.signal();
lock.unlock();
}
}
3.1.3 信号量Semaphore:允许多个线程同时访问
- 信号量用于指定同时可以有多少个线程访问某一个资源。是对锁的一种扩展
- Semaphore 有两个构造参数,第一个传入一个整数,表示某段代码最多有 n 个线程可以访问;另外一个参数表示是否公平
- acquire() 用来获取一个许可,会等待直到获得许可
- acquireUninterruptily()
- boolean tryAcquire()
- boolean tryAcquire(long timeout, TimeUnit unit)
- release() 用来释放许可。释放前必须先获得
package ConcurrencyTest;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* 用来测试信号量
*/public class SemaphoreTest {
// 声明了包含3个许可的信号量
private static Semaphore semaphore = new Semaphore(3);
private static class SemaphoreThread implements Runnable{
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"获取一个许可");
Thread.sleep(1);
System.out.println(Thread.currentThread().getName()+"完成,开始释放许可");
semaphore.release();
}catch (InterruptedException e) {
e.printStackTrace();
}
}public static void main(String args[]){
SemaphoreThread semaphoreThread = new SemaphoreThread();
ExecutorService executorService = Executors.newFixedThreadPool(10);
for(int i=0;
i<12;
i++){
executorService.submit(semaphoreThread);
}
}
}
}
运行结果会以3个线程为1组,来获取许可,最后释放许可。
3.1.4 ReadWriteLock 读写锁
- 读-写互斥:读阻塞写,写也阻塞读。写-写互斥。
- 在读次数远大于写次数时使用 读写分离锁,可以有效减少锁竞争。
- 获得读锁:reentrantReadWriteLock.readLock();获得写锁 reentrantReadWriteLock.writeLock()
书上给了一个例子,说明用读写分离锁的时候,读线程之间并行,写线程阻塞。而用重入锁,所有读写线程之间要相互等待,程序执行时间比较长。
3.1.5 倒计数器 CountDownLatch
- CountDownLatch 让某个线程一直 等待到 倒计时结束再开始执行。作用类似于 join,join 是让当前线程等待 join线程结束。join 的实现原理是,检查 join 线程是否存活,如果存活,则一直 wait
- CountDownLatch 构造方法接受一个整数,表示当前计数器的计数个数(线程数)
- CountDownLatch.countDown() 可以让计数器减一, countDownLatch.await() 让当前线程一直阻塞到计数器为0
package ConcurrencyTest;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 测试倒计数器 CountDownLatch
*/public class CountDonwLatchTest implements Runnable {
static final CountDownLatch end = new CountDownLatch(10);
static final CountDonwLatchTest test = new CountDonwLatchTest();
@Override
public void run() {
try {
// 模拟火箭发射
Thread.sleep(new Random().nextInt(10)*1000);
System.out.println("检查完成");
// 一个线程已经完成任务,计数器减一
end.countDown();
}catch (InterruptedException e){
e.printStackTrace();
}
}public static void main(String args[]) throws InterruptedException{
Executor exec = Executors.newFixedThreadPool(10);
for(int i=0;
i<10;
i++){
((ExecutorService) exec).submit(test);
}
// 等待检查
end.await();
// 发射火箭
System.out.println("发射");
((ExecutorService) exec).shutdown();
}
}
这里注意,计数是针对线程数。每个线程执行完,运行 countDown() 方法,减一个计数。 await() 方法要等所有计数清零。3.1.6 循环栅栏 CyclicBarrier
- CyclicBarrier 是可以反复使用的计数器,构造函数中第一个参数int parties 指定计数器总数,第二个参数Runnable barrierAction 指定每次计数结束后要执行的动作(这个动作一般也是一个 多线程类,看下面例子中)。
- CyclicBarrier 的 await() 在所有参与线程都在此 barrier 上调用 await() 方法之前,将一直等待。
可能抛出两个异常 Interruptedexception 和 BrokenBarrierException
package ConcurrencyTest;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* 循环栅栏 CyclicBarrier 测试类
*/
public class CyclicBarrierTest {
public static class Soldier implements Runnable {
private String soldierName;
private final CyclicBarrier cyclic;
Soldier(CyclicBarrier cyclic, String soldierName) {
this.cyclic = cyclic;
this.soldierName = soldierName;
}@Override
public void run() {
try {
//等待其他士兵到齐
cyclic.await();
doWork();
//等待所有士兵完成工作
cyclic.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}void doWork() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldierName + " :任务完成");
}
}public static class BarrierRun implements Runnable {
boolean flag;
int N;
public BarrierRun(boolean flag, int N) {
this.flag = flag;
this.N = N;
}@Override
public void run() {
if (flag) {
System.out.println("司令:[士兵" + N + "个,任务完成!]");
} else {
System.out.println("司令:[士兵" + N + "个,集合完毕!]");
flag = true;
}
}
}public static void main(String[] args) {
final int N = 10;
Thread[] allSoldier = new Thread[N];
boolean flag = false;
CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
//设置屏障点,主要是为了执行这个方法
System.out.println("集合队伍!");
for (int i = 0;
i < N;
i++) {
System.out.println("士兵" + i + " 报道!");
allSoldier[i] = new Thread(new Soldier(cyclic, "士兵" + i));
allSoldier[i].start();
}
}
}
输出如下。程序两次到达屏障点,每次到达都会调用 BarrierRun 的 run() 方法,由于 flag 的设置,两次输出不同。关于倒计时器和循环栅栏的区别
集合队伍!
士兵0 报道!
士兵1 报道!
士兵2 报道!
士兵3 报道!
士兵4 报道!
士兵5 报道!
士兵6 报道!
士兵7 报道!
士兵8 报道!
士兵9 报道!
司令:[士兵10个,集合完毕!]
士兵7 :任务完成
士兵9 :任务完成
士兵5 :任务完成
士兵6 :任务完成
士兵0 :任务完成
士兵4 :任务完成
士兵3 :任务完成
士兵8 :任务完成
士兵1 :任务完成
士兵2 :任务完成
司令:[士兵10个,任务完成!]
3.1.7 线程阻塞工具类:LockSupport
- LockSupport 可以在任意位置让线程阻塞。其静态方法 park()。和 Object.wait() 相比,park 方法不需要获取对象的锁。
- 和 suspend 比,不会造成死锁,因为 LockSupport 内部是基与信号量 Semaphore实现的,为每一个线程都准备了一个许可,如果许可可用,park() 函数会立即返回。
- LockSupport 支持中断影响,但是不抛出中断异常,需要 Thread.interrupted() 进行判断。
/**
* LockSupport 线程阻塞工具类测试
*/public class LockSupportTest {
private static class LockSupportThread implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"线程阻塞");
LockSupport.park();
for(int i=0;
i<5;
i++){
System.out.println(i);
}
}
}public static void main(String args[]) throws InterruptedException{
LockSupportThread lockSupportThread = new LockSupportThread();
Thread thread = new Thread(lockSupportThread,"t1");
thread.start();
thread.sleep(1000);
// 唤醒
System.out.println("main唤醒阻塞线程");
LockSupport.unpark(thread);
}
}
t1线程阻塞
main唤醒阻塞线程
0
1
2
3
4
3.2.1 线程池 线程池的概念:
- 线程池中有几个活跃的线程,需要使用线程的时候,从池子中随便拿一个空闲线程;完成工作后,将线程放回池子中。
- 创建线程的工作变成了从池子中获取活跃线程,销毁线程变成了向线程池归还线程。
- 大量的线程的创建和销毁会消耗很多时间
- 线程本身也要占据内存
- 线程回收给GC带来负担
文章图片
Executor 框架结构图
对 线程池 Executor 框架更详细的介绍
文章图片
Executor 框架成员
- Executors 类是一个工厂类,用于配置线程池,主要有下面的工厂方法
- public static ExecutorService newFixedThreadPool(int nThreads)
- public static ExecutorService newSingleThreadExecutor()
- public static ExecutorService newCachedThreadPool()
- public static ScheduledExecutorService newSingleThreadScheduledExecutor()
- public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
注意上面的方法的返回类型。接下来对上面提到的方法逐个解析:
- newFixedThreadPool() 方法。返回一个固定线程数量的线程池。线程数量不变,没有空闲线程时候,任务被暂存到任务队列。
public static void main(String args[]){
TestThread testThread = new TestThread();
// testThread 是一个多线程类
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 配置线程池
for(int i=0;
i<5;
i++){
executorService.submit(testThread);
// submit() 方法,用于向线程池提交任务。
}
}
- new SingleThreadExecutor():返回只有一个线程的线程池,多余任务会等待,FIFO执行等待任务。
- new CachedThreadPool():返回可调整线程数的线程池。多余的任务会创建新的线程处理任务。
public static void main(String[] args) {
CachedThreadPoolTask cachedThreadPoolTask = new CachedThreadPoolTask();
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0;
i < 10;
i++) {
executorService.submit(cachedThreadPoolTask);
}
}
}
运行结果会创建10个线程。
- newScheduledThreadPool() 计划任务:返回 ScheduledExecutorService对象,可以根据时间对线程进行调度。ScheduledExecutorService 接口在 ExecutorService 接口上扩展了在给定时间执行某个任务的功能,比如在固定的延迟之后执行或者周期性执行某个任务。
这里给出了3个方法, schedule() 方法在给定的时间对任务进行一次调度。scheduleAtFixedRate() 以固定的频率调度,上一次任务开始执行之后 period 时间调度下一次任务,shceduleWithFixedDelay() 在上一次任务结束后,经过 delay 时间进行任务调度。
/**
* 用来测试 计划任务
* newScheduledThreadPool 线程池
*/public class ScheduledExecutorServiceDemo {
public static void main(String args[]){
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
ses.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(System.currentTimeMillis()/1000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
},0,2,TimeUnit.SECONDS);
}
}
执行结果是以3s的间隔调度任务。如果是 scheduleAtFixedRate() 方法,任务时间设置为1s,执行间隔设置为2s,那么任务调度间隔最后就是2s;5s,3s,就是5s,因为 ScheduledExecutorService 不会出现任务堆叠,就是说任务执行周期太短,就会在任务结束后立即调用下一个任务。
- newSingleThreadScheduledExecutor():返回 ScheduledExecutorService对象,线程池大小为1。
跟上面唯一的区别就在于线程池size的区别,深入的还需要进一步学习。
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
)
【《实战java高并发程序设计》笔记(三)】参数含义如下:
- int corePoolSize 指定了线程池中的线程数量
- int maximumPoolSize 指定了线程池中最大线程数量
- long keepAliveTime 指定了线程池线程超过了 corePoolSize 时,多余空闲线程的存活时间(理解成多余线程的存活时间)
- BlockingQueue
workQueue 指定了 任务队列,提交但还未完成的任务。 - ThreadFactory threadFactory 指定了线程工厂,用于创建线程,一般使用默认的即可
- RejectedExecutionHandler handler 指定了 拒绝策略,当任务很多来不及处理的时候,如何拒绝任务。
workQueue 是一个 BlockingQueue 对象,用于存放 Runnable 对象。在 ThreadPoolExecutor 的构造函数中,可以使用下面几种 BlockingQueue 接口:
- 直接提交的队列:直接提交队列功能由 SynchronousQueue 对象提供,该对象没有容量,插入和删除需要互相等待,提交的任务不会保存,没有空闲线程则创建线程来执行,线程数量超过最大线程数执行拒绝策略。
下面的 newCachedThreadPool() 方法就是用直接提交的队列实现的。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
- 有界的任务队列:使用 ArrayBlockingQueue 实现,必须带容量,表示队列最大容量。
- 无界的任务队列:通过 LinkedBlockingQueue 实现。由于是无界的,所以除非系统资源耗尽,则不存在入队失败的情况
newFixedThreadPool() 产生固定大小线程池就是用 无界任务队列来实现的。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
- 优先任务队列:通过 PriorityBlockingQueue 实现,是一个特殊的无界队列,可以根据任务自身的优先级执行任务。相比之下,无论是 有界队列ArrayBlockingQueue 类 还是 指定大小的 LinkedBlockingQueue 类,都只是按照FIFO规则来执行任务的。
- 线程池核心调度逻辑:任务提交数量 < 线程池线程数量corePoolSize ,直接分配线程执行任务; 大于,提交到等待队列,提交成功则等待执行;提交失败(比如说有界队列达到上限,或者说是用了 SynchronousQueue类),则要重新提交向线程池,判断是否 达到 最大线程数(maximumPoolSize),大于就执行拒绝策略。
文章图片
ThreadPoolExecutor 类的任务调度逻辑
- AbortPolicy(中止策略):直接拒绝,抛出异常,阻止系统工作
- CallerRunsPolicy(调用者运行策略):直接在调用者的县城中运行当前被丢弃的任务
- DiscardOldestPolicy(抛弃最老策略):尝试丢弃最老的一个请求也就是即将被执行的一个任务,然后尝试再次提交(妥协重试的思想)
- DiscardPolicy(抛弃策略):直接丢弃,不处理。(注意这个和 终止策略 的区别在于,抛弃策略不会停止程序运行。)
Thread newThread(Runnable r);
3.2.3 小节中提到过,ThreadPoolExecutor 的构造方法中,倒数第二个参数就是 ThreadFactory 对象,通常情况下,使用默认的,不用指定。
书上给出了一个重写 ThreadFactory 对象的例子,说明通过自定义 ThreadFactory,我们可以灵活地创建线程池中的线程。
public static void main(String args[]) throws InterruptedException{
ThreadFactoryTask threadFactoryTask = new ThreadFactoryTask();
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
System.out.println("creat"+ thread);
return thread;
}
});
for(int i=0;
i<5;
i++){
es.submit(threadFactoryTask);
}
Thread.sleep(2000);
}
由于这里将线程池中的线程都设置为 后台线程,所以在 main 线程结束后,整个程序结束。
3.2.6 扩展线程池 ThreadPoolExecutor 其实也是一个可以扩展的线程池。其提供了 beforeExecute()、afterExecute() 、terminated() 三个方法来对线程池进行监控。
/**
* 测试 ThreadPoolExecutor 的可扩展性
*/public class ThreadPoolExecutorTest {
public static class ThreadPoolExecutorTask implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+":"+System.currentTimeMillis());
}
}public static void main(String args[]) throws InterruptedException{
ExecutorService es = new ThreadPoolExecutor(5,5,
0L,TimeUnit.MILLISECONDS,
new SynchronousQueue()){
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行" + Thread.currentThread().getName());
}@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行结束" + Thread.currentThread().getName());
}@Override
protected void terminated() {
System.out.println("线程退出");
}
};
ThreadPoolExecutorTask threadPoolExecutorTask = new ThreadPoolExecutorTask();
for(int i=0;
i<5;
i++){
es.submit(threadPoolExecutorTask);
Thread.sleep(1000);
}
es.shutdown();
}
}
执行结果
准备执行pool-1-thread-1
pool-1-thread-1:1551530510144
执行结束pool-1-thread-1
准备执行pool-1-thread-2
pool-1-thread-2:1551530510160
执行结束pool-1-thread-2
准备执行pool-1-thread-4
pool-1-thread-4:1551530510160
执行结束pool-1-thread-4
准备执行pool-1-thread-5
准备执行pool-1-thread-3
pool-1-thread-3:1551530510160
执行结束pool-1-thread-3
pool-1-thread-5:1551530510160
执行结束pool-1-thread-5
线程退出
3.2.9 Fork/Join 框架
- 使用 fork() 方法后系统多一个执行分支,所以要 join 到这个分支执行完成之后,才能得到结果。如下图,将主任务分解成为若干个子任务,等待(join)所有子任务都有结果后,才能得到主任务的结果。
文章图片
Fork&Join 框架 - JDK 提供了 ForkJoinPool 线程池来节省资源,可以向线程池来提交任务 ForkJoinTask。而 ForkJoinTask 模板类有两个子类,有返回值的 RecursiveTask 和无返回值的 RecursiveAction
3.3.1 并发集合简介
- ConcurrentHashMap :线程安全的 HashMap
- CopyOnWriteArrayList:在 读多写少的场合,性能非常好
- ConcurrentLinkedQueue:高效的并发队列,使用链表实现。看成是一个线程安全的 LinkedList
- BlockingQueue:阻塞队列。JDK 内部通过 链表、数组实现。适合作为数据共享的通道。
- ConcurrentSkipListMap:跳表的实现。是一个 Map,使用跳表的数据结构进行快速查找
3.3.2 ConcurrentHashMap
- Collections.synchronizedMap() 方法可以用来包装任意 Map,从而生成一个并发安全的 Map。但是这种在高并发情境下,等待问题较为严重。
- 实际上,更为专业的方法是使用 ConcurrentHashMap
- ConcurrentHashMap 在 jdk1.7中,通过减小锁粒度来提高性能。
ConcurrentHashMap较为详细的解释
3.3.3 有关List的线程安全 跟上面提到过的一样,我们可以用 Collections.SynchronizedList() 方法来包装 list。获得一个线程安全的 List。
List l = Collections.synchronizedList(new LinkedList());
3.3.4 ConcurrentLinkedQueue 高效的并发队列(只需要记住是性能最好的并发环境下的队列即可),基于链表实现,可以看成是线程安全的 LinkedList。
3.3.5 CopyOnWriteArrayList
- 读多写少的场合,这个 List 性能非常好,因为其读取不加锁,写入不阻塞,只有 写入和写入之间同步.
- 写入操作使用锁来用于 写-写 的情况,然后通过内部复制生成一个新的数组,再用新数组替换老数组,修改完成后,读线程会察觉到 volatile 修饰的 array 被修改了,但是整个修改过程是不影响读的。所以从这个角度来说,是不用读取加锁的。
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
3.3.6 BlockingQueue
- 是一个接口。适合用作数据共享的通道。
- 队列为空,take 操作时,当前线程会等待(在 notEmpty上),直到新的元素插入;队列满,put 操作会等待,直到有空位。
- 跳表可以进行快速查找。数据结构与平衡树类似,但是插入和删除操作无需和平衡树一样进行全局调整。因此在并发情况下并不需要对全局加锁,只需要部分加锁。
- 跳表维持了多个链表,最底层链表维持了所有元素,上一层是下一层的子集,采用空间换时间的算法来提高查找速度。
文章图片
跳表结构示意 比如在上图这个结构中,查找元素 7。 顶层元素最少,在顶层找,没有;下一层;找到8,;下一层,确定6--8之间。即可以快速查找。
推荐阅读
- 慢慢的美丽
- 《真与假的困惑》???|《真与假的困惑》??? ——致良知是一种伟大的力量
- 《跨界歌手》:亲情永远比爱情更有泪点
- 诗歌:|诗歌: 《让我们举起世界杯,干了!》
- 期刊|期刊 | 国内核心期刊之(北大核心)
- 《魔法科高中的劣等生》第26卷(Invasion篇)发售
- 人间词话的智慧
- 《一代诗人》37期,生活,江南j,拨动心潭的一泓秋水
- 广角叙述|广角叙述 展众生群像——试析鲁迅《示众》的展示艺术
- 书评——《小行星》