JUC

java.util.concurrent并发编程包,这个包下都是Java处理线程相关的类
虚假唤醒 多个线程中使用wait方法的时候应始终定义在while中,wait在哪里睡就在哪里醒,会继续往下判断,如果使用的是if只会执行一次
现在有四个线程,AB做加法,CD做减法:

public class Test { public static void main(String[] args) { TestDemo testDemo = new TestDemo(); new Thread(() -> { for (int i = 0; i < 5; i++) { try { testDemo.incr(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A").start(); new Thread(() -> { for (int i = 0; i < 5; i++) { try { testDemo.incr(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B").start(); new Thread(() -> { for (int i = 0; i < 5; i++) { try { testDemo.decr(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "C").start(); new Thread(() -> { for (int i = 0; i < 5; i++) { try { testDemo.decr(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "D").start(); } }class TestDemo { private int number = 0; public synchronized void incr() throws InterruptedException { if (number != 0) { this.wait(); } number++; System.out.println(Thread.currentThread().getName() + " : " + number); this.notifyAll(); }public synchronized void decr() throws InterruptedException { if (number == 0) { this.wait(); } number--; System.out.println(Thread.currentThread().getName() + " : " + number); this.notifyAll(); } }

JUC
文章图片

上面的代码会出现虚假唤醒的情况,我们来试着分析一下为什么?
假设: A获取锁执行++; A再次获取锁判断number!=0,这时候阻塞; C获取锁执行--; B获取锁执行++; A获取锁,从当前位置醒来继续往下执行,又对number进行了++操作,所以得到2 ...

为了解决这种情况的发生,我们应该在每次醒来时都进行判断,将if改为while即可:
while (number != 0) { this.wait(); }

Lock实现案例 【JUC】Locksynchronized的区别 →Lock是接口而synchronized是关键字,Lock有着比synchronized更广泛的锁的操作
// 创建Lock private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void incr() throws InterruptedException { lock.lock(); try { while (number != 0) { condition.await(); } number++; System.out.println(Thread.currentThread().getName() + " : " + number); condition.signalAll(); } finally { lock.unlock(); } }

Condition它用来替代传统的Object的wait ()notify ()实现线程间的协作,依赖于Lock接口,需注意:传统的wait方法会自动释放锁,而使用lock需手动释放
线程集合不安全 集合本身的方法上并没有synchronized 关键字,所以是不安全的,看源码:
public boolean add(E var1) { this.ensureCapacityInternal(this.size + 1); this.elementData[this.size++] = var1; return true; }

示例代码:
List list = new ArrayList<>(); for (int i = 0; i < 30; i++) { new Thread(() -> { list.add(UUID.randomUUID().toString().substring(0, 8)); System.out.println(list); }, String.valueOf(i)).start(); }

执行上面的代码会得到一个ConcurrentModificationException 异常,由于集合中的方法并不是同步的,所以在多个线程同时写的时候就会抛出异常,如何解决呢?
方案一:使用Vector解决并发修改异常
List list = new Vector<>();

方案二:使用Collections解决并发修改异常
List list = Collections.synchronizedList(new ArrayList<>());

方案三:使用CopyOnWriteArrayList解决并发修改异常
前面两种方法其实并不常用,一般都是通过写时复制技术来解决,那何为写时复制呢?
集合在每次写的时候都会将元素复制一份出来,在新的集合中写,然后再合并,这样就实现了单写多读的操作
List list = new CopyOnWriteArrayList();

HashSet和HashMap线程不安全 跟集合一样,方法也没有synchronized关键字,也会得到并发修改异常,所以要通过写时复制技术来单写多读
HashSet:
// 通过CopyOnWriteArraySet解决 Set set = new CopyOnWriteArraySet<>();

HashMap:
// 通过ConcurrentHashMap解决 Map map = new ConcurrentHashMap<>();

多线程锁 公平锁和非公平锁
公平锁:多个线程都能得到执行
非公平锁:谁先抢到谁就执行,其他线程不能执行
ReentrantLock 来配置公平锁或非公平锁:
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }

可以看到源码中通过truefalse来配置锁
可重入锁
synchronizedLock都是可重入锁,可重入锁即可多次获得该锁
就比如我们回家,用钥匙开门之后就能随意进出房间了
JUC
文章图片

Object o = new Object(); new Thread(() -> { synchronized (o) { System.out.println(Thread.currentThread().getName() + " 外层"); synchronized (o) { System.out.println(Thread.currentThread().getName() + " 中层"); synchronized (o) { System.out.println(Thread.currentThread().getName() + " 内层"); } } } }, "t1").start();

ReentrantLock lock = new ReentrantLock(); new Thread(() -> { try { lock.lock(); System.out.println(Thread.currentThread().getName() + " 外层"); try { lock.lock(); System.out.println(Thread.currentThread().getName() + " 内层"); } finally { lock.unlock(); } } finally { lock.unlock(); } }, "t1").start();

死锁
两个或两个以上线程,因争夺资源造成互相等待的现象,需外力干涉来避免死锁
产生死锁的原因:
  • 资源系统不足
  • 进程运行推进顺序不合适
  • 资源分配不当
JUC
文章图片

public static void main(String[] args) { new Thread(() -> { synchronized (a) { try { System.out.println(Thread.currentThread().getName() + " waiting..."); TimeUnit.SECONDS.sleep(2); synchronized (b) { System.out.println(Thread.currentThread().getName() + " get b"); } } catch (InterruptedException e) { e.printStackTrace(); } } }, "线程A").start(); new Thread(() -> { synchronized (b) { try { System.out.println(Thread.currentThread().getName() + " waiting..."); TimeUnit.SECONDS.sleep(2); synchronized (a) { System.out.println(Thread.currentThread().getName() + " get a"); } } catch (InterruptedException e) { e.printStackTrace(); } } }, "线程B").start(); }

两个线程都在尝试获取对方线程资源,就造成了死锁,这是通过代码输出来判断是否为死锁,JDK中有一个堆栈跟踪工具,可以通过命令查看是否为死锁
JUC
文章图片

JUC
文章图片

Callable Runnable接口缺失了一项功能,当线程终止时,无法获得线程返回的结果,为了支持此功能,Java中提供了Callable接口
这两个接口之间的区别主要是:
  1. 是否有返回值
  2. 是否抛出异常
  3. 实现方法名称不同,一个是run,一个是call
class Demo implements Callable { @Override public String call() throws Exception { System.out.println("test callable..."); return "hello"; } }

使用Callable 就不能直接用Thread来创建线程了,需要使用FutureTask
FutureTask task = new FutureTask<>(new Demo()); new Thread(task, "callable").start(); System.out.println(task.get()); // 获取call()中的返回值

强大的辅助类 CountDownLatch减少计数:
public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(3); for (int i = 1; i <= 3; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + "号同学离开"); // 计数器-1 countDownLatch.countDown(); }, String.valueOf(i)).start(); }// 当计数器没有变为0时就会一直等待 countDownLatch.await(); System.out.println(Thread.currentThread().getName() + "班长锁门离开了"); } }

班长总是在最后一个才离开,这就是CountDownLatch 的作用
CyclicBarrier循环栅栏
public class CyclicBarrierDemo {private static final intNUMBER= 7; public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () -> { System.out.println("恭喜你集齐七颗龙珠"); }); for (int i = 1; i <= 7; i++) { new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "颗龙珠"); // 等待 cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } }, String.valueOf(i)).start(); } } }

只有在集齐七颗龙珠后才会执行CyclicBarrier 中的方法
Semaphore信号灯
public class SemaphoreDemo { public static void main(String[] args) { // 设置许可数量,只有三个车位 Semaphore semaphore = new Semaphore(3); // 模拟六辆汽车 for (int i = 1; i <= 6; i++) { new Thread(() -> { try { // 抢占车位 semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "号车抢到了车位"); // 设置随机停车时间 TimeUnit.SECONDS.sleep(new Random().nextInt(5)); System.out.println(Thread.currentThread().getName() + "号车离开了车位"); } catch (Exception e) { e.printStackTrace(); } finally { // 释放车位 semaphore.release(); } }, String.valueOf(i)).start(); } } }

用信号灯模拟停车的场景,只有三个车位,只有当某个车位的车离开了之后,其他的车才能抢占车位
读写锁 在多线程环境下对资源进行读写操作的时候,是可能会发生死锁的,需要用Java提供的读写锁来上锁和解锁,读写锁在读的时候是不能进行写操作的。
写锁:独占锁(一次只能一个线程进行写操作),读锁:共享锁(可多个线程进行读操作)
class Resource { private Map map = new HashMap<>(); private ReadWriteLock lock = new ReentrantReadWriteLock(); public void put(String key, Object value) { // 添加写锁 lock.writeLock().lock(); System.out.println(Thread.currentThread().getName() + "正在写操作" + key); try { TimeUnit.MICROSECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 释放锁 lock.writeLock().unlock(); } map.put(key, value); System.out.println(Thread.currentThread().getName() + "写完了" + key); }public Object get(String key) { // 添加读锁 lock.readLock().lock(); Object result = null; System.out.println(Thread.currentThread().getName() + "正在读操作" + key); try { TimeUnit.MICROSECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.readLock().unlock(); } result = map.get(key); System.out.println(Thread.currentThread().getName() + "读完了" + key); return result; } }

锁降级:
读写锁在读的时候是不能进行写操作的。我们可以将写锁降为读锁,读锁不能升级为写锁
public class DowngradeDemo { public static void main(String[] args) { ReadWriteLock lock = new ReentrantReadWriteLock(); Lock writeLock = lock.writeLock(); Lock readLock = lock.readLock(); // 锁降级 // 1.获取写锁 writeLock.lock(); System.out.println("write"); // 2.获取读锁 readLock.lock(); System.out.println("read"); // 3.释放写锁和读锁 writeLock.unlock(); readLock.unlock(); } }

阻塞队列 当队列为空时,获取元素将阻塞,直到插入新的元素,当队列满时,添加元素将阻塞
使用阻塞队列的好处就是,我们不需要关心什么时候阻塞线程,什么时候唤醒线程,这些操作都交给BlockingQueue来做
// 创建阻塞队列 BlockingQueue queue = new ArrayBlockingQueue<>(3); queue.add("a") queue.add("b") queue.add("c") // Queue full queue.add("d")
线程池 一种线程使用模式,维护着多个线程,等待着监督管理,避免了频繁创建与销毁线程的代价,不仅能保证内核的充分利用,还能防止过分调度
JUC
文章图片

线程池使用方式
通过Executors 工具类来创建线程
Executors.newFixedThreadPool(): 一池N线程
Executors.newSingleThreadExecutor(): 一池一线程
Executors.newCachedThreadPool(): 根据需求创建线程,可扩容
public class ThreadPoolDemo { public static void main(String[] args) { // 一池N线程 ExecutorService threadPool1 = Executors.newFixedThreadPool(5); // 一池一线程 ExecutorService threadPool2 = Executors.newSingleThreadExecutor(); // 一池可扩容线程 ExecutorService threadPool3 = Executors.newCachedThreadPool(); // 10个客户请求 for (int i = 1; i <=10 ; i++) { // 执行 threadPool3.execute(()->{ System.out.println(Thread.currentThread().getName()+"正在办理业务"); }); }threadPool3.shutdown(); } }

查看源码可以发现Executors调用的方法底层都使用了ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

构造方法中有7个参数,分别是什么意思呢?
corePoolSize :核心(常驻)的线程数量,比如一个银行有10个窗口,平时只开放5个窗口
maximumPoolSize:最大线程数量,就好比银行一共有10个窗口
keepAliveTime :线程存活时间
unit :搭配keepAliveTime 设置线程存活时间
workQueue :阻塞队列
threadFactory :用于创建线程
handler :拒绝策略(多种)
线程池的工作流程和拒绝策略
JUC
文章图片

上面的流程图即为线程池的工作流程:首先通过execute()来创建一个池子,核心线程数为2,如果要创建第三个线程,就会放到workQueue中等待,当workQueue满时就会创建新的线程直到
maximumPoolSize满,当maximumPoolSize满时就会执行拒绝策略。
JDK内置的拒绝策略:
AbortPolicy :抛出 RejectedExecutionException来拒绝新任务的处理。
CallerRunsPolicy :“调用者运行”一种调节机制,该策略不会抛弃任务和异常,而是将某些任务回退到调用者,降低新任务的流量。
DiscardPolicy :抛弃队列中等待最久的任务,然后把当前任务添加到队列中,尝试再次提交当前任务。
DiscardOldestPolicy :该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常,如果允许任务丢失,那这是最好的一种策略。
自定义线程
一般都是用自定义线程,在阿里巴巴开发手册中线程池不允许用Executors去创建,而是通过ThreadPoolExecutor 的方式,这样的处理方式让写的人更加明确线程池的运行规则,规避资源耗尽的风险。
public class CustomThreadPoolDemo { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); // 10个客户请求 for (int i = 1; i <=10 ; i++) { // 执行 threadPoolExecutor.execute(()->{ System.out.println(Thread.currentThread().getName()+"正在办理业务"); }); }threadPoolExecutor.shutdown(); } }

分支合并框架(Fork/Join) 可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果
public class ForkJoinDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { MyTask task = new MyTask(1, 100); // 创建分支合并池对象 ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask submit = forkJoinPool.submit(task); // 获取最终合并之后的结果 System.out.println(submit.get()); forkJoinPool.shutdown(); } }class MyTask extends RecursiveTask {// 拆分时差值不能大于10 private static final Integer VALUE= https://www.it610.com/article/10; private int begin; private int end; private int result; public MyTask(int begin, int end) { this.begin = begin; this.end = end; }// 拆分和合并的过程 @Override protected Integer compute() { if (end - begin <=VALUE) { // 相加 for (int i = begin; i <= end; i++) { result = result + i; } } else { // 进一步做拆分 // 获取中间值 int middle = (begin + end) / 2; // 拆分左边 MyTask task1 = new MyTask(begin, middle); // 拆分右边 MyTask task2 = new MyTask(middle + 1, end); task1.fork(); task2.fork(); // 合并结果 result = task1.join() + task2.join(); } return result; } }

异步回调
public class AsynchronousCallbackDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { // 异步调用,无返回值 CompletableFuture completableFuture1 = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName() + "completableFuture1"); }); completableFuture1.get(); // 异步调用,有返回值 CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "completableFuture2"); return 1024; }); completableFuture2.whenComplete((result, exception) -> { System.out.println("--t--" + result); // 方法返回值 System.out.println("--u--" + exception); // 异常信息 }).get(); } }

    推荐阅读