JAVA_Concurrent

切面编程

【JAVA_Concurrent】面向切面编程,也可以说是面向方面编程
  • 定义
    所谓切面就是说贯穿到系统的各个模块中,系统的一个功能就是一个方面(切面)。比如日志系统,权限检查,统一的异常检查等
  • 好处
    可以动态的添加和删除在切面上的逻辑而不影响原来的执行代码。
  • 使用
    把功能模块对应的对象作为切面嵌入到原来的各个系统模块中,采用代理技术,代理调用目标,同时把切面对象加入进来。
线程并发
对java.util.concurrent包中的类进行总结
  • threadLocal
    1. 本质就是一个HashMap,key就是当前的线程。
    2. 存的时候是与当前线程相关,取得时候也是与当前线程相关。
  • AtomicInteger
    • 线程安全的整数类
    • AtomicLong,AtomicIntegerArray,AtomicIntegerFieldUpdater(对类中的整型数据进行原子操作)
    • 【共享变量】如果是Integer,那么可以使用该对象替换
  • 线程池
    1. 固定数量的
      Executors.newFixedThreadPool(3)
    2. 不固定数量(动态变化的)
      Executors.newCachedThreadPool()
    3. 一个线程
      • Executors.newSingleThreadExecutor();
      • 与new Thread区别:线程死了以后会自动创建一个新的,保证线程池里面始终有一个线程(如何实现线程死掉以后重新启动?)
    4. 定时器
      Executors.newScheduledThreadPool(3).schedule(runnable,delay,unit)
  • Callable&Future
  1. 一个任务的情况
    线程池去submit一个callable,callable任务可以返回一个结果,结果存在future中,然后将来某个时间从future中去拿.(前提是callable执行结束,否则会一直等待)
    • 为何不什么时候需要什么时候执行
      避免主线程做耗时操作。
    • Future取得的结果类型必须和Callable返回的结果类型一致,通过泛型实现的(泛型的使用)
ExecutorService pool = Executors.newSingleThreadExecutor(); Future future = pool.submit(new Callable() {@Override public String call() throws Exception { return "future result"; } }); try { System.out.print(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }

  1. 一组任务的情况
    提交一组任务Callable,先结束的任务可以取得结果
ExecutorService pool = Executors.newFixedThreadPool(10); CompletionService service = new ExecutorCompletionService(pool); for (int i = 0; i < 10; i++) { final s = i; service.submit(new Callable() {@Override public String call() throws Exception { return ""+s; } }); } for (int i = 0; i < 10; i++) { try { System.out.print(service.take().get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }


  • 读写锁
多个读锁不互斥
读锁与写锁互斥
写锁与写锁互斥
final Queue3 queue3 = new Queue3(); for (int i=0; i< 3; i++){ new Thread(){ @Override public void run() { super.run(); while (true){ queue3.get(); } } }.start(); new Thread(){ @Override public void run() { super.run(); while (true){ queue3.put(new Random().nextInt(1000)); } } }.start(); }

class Queue3 { private Object data = https://www.it610.com/article/null; ReadWriteLock rwl = new ReentrantReadWriteLock(); public void get(){ rwl.readLock().lock(); try { System.out.print("begin read"); Thread.sleep((long) (Math.random()*1000)); System.out.print(Thread.currentThread().getName()); System.out.print("stop read"); } catch (InterruptedException e) { e.printStackTrace(); } finally { rwl.readLock().unlock(); } }public void put(Object object){ rwl.writeLock().lock(); try { System.out.print("begin put"); Thread.sleep((long) (Math.random()*1000)); this.data = https://www.it610.com/article/object; System.out.print("stop put"); } catch (InterruptedException e) { e.printStackTrace(); } finally { rwl.writeLock().unlock(); } } }

  • Condition
  1. 类似传统线程里面的object.wait object.notify
  2. 一个锁内部可以有多个Condition,Lock和Condition可以实现可阻塞队列
  3. 存在的意义就是可以用在多线程条件下
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull= lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } }public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }

  • Semaphore
  1. 维护当前访问自身线程的个数,并提供同步机制
  2. 可以控制同时访问资源的线程个数
  3. 实现一个文件允许的并发访问数
  4. 单个信号灯可以实现互斥锁,这可应用于 死锁 恢复的一些场合
  5. 可以控制等待进入线程的执行顺序,构造方法参数
public static class SemaphoreTest{ public static void main(){ ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for(int i=0 ; i< 10; i++){ Runnable runnable = new Runnable() { @Override public void run() { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); }System.out.print("线程进入:"+ Thread.currentThread().getName()+ " 剩余信号灯:"+semaphore.availablePermits()); try { Thread.sleep((long) (Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); }System.out.print("线程:"+ Thread.currentThread().getName() +"离开"); semaphore.release(); } }; executorService.execute(runnable); } } }

  • CountDownLatch(倒计时计数器)
  1. 调用countDown方法,计数器减一,当减到0的时候,多个等待者或者单个等待者执行
  2. 可以实现一个人通知多个人(裁判通知所有运动员可以开始各就各位预备跑)也可以多个人通知一个人(所有运动员跑完全程以后,裁判才可以公布成绩)
public static class CountDownLatchTest{ public static void main(){ ExecutorService executorService = Executors.newCachedThreadPool(); //吹口哨计数器 一个人通知多个人 final CountDownLatch countDownLatch = new CountDownLatch(1); //公布成绩计数器多个人通知一个人 final CountDownLatch countDownLatch1 = new CountDownLatch(3); for (int i = 0; i < 3; i++) { Runnable runnable = new Runnable() { @Override public void run() { try { System.out.print("线程"+Thread.currentThread().getName()+"正在准备接受命令"); //子线程wait countDownLatch.await(); System.out.print("线程"+Thread.currentThread().getName()+"已接受命令"); Thread.sleep((long) (Math.random()*1000)); System.out.print("线程"+Thread.currentThread().getName()+"回应处理结果"); countDownLatch1.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }; executorService.execute(runnable); }try { System.out.print("即将发布奔跑命令"); //主线程去countdown countDownLatch.countDown(); System.out.print("已经发布奔跑命令,等待结果"); countDownLatch1.await(); System.out.print("线程已经收到所有响应结果,裁判公布成绩"); } catch (InterruptedException e) { e.printStackTrace(); } } }

  • Exchanger
  1. 用于两个线程间的数据交换
  2. 类似毒品交易(第一个拿出数据的人将等待第二个人拿着数据到来时,才能进行数据交换)
//线程1和线程2互换数据 public static class ExchangerTest{ public static void main(){ ExecutorService executorService = Executors.newCachedThreadPool(); final Exchanger exchanger = new Exchanger(); executorService.execute(new Runnable() { @Override public void run() { try { String data1 = "xiaoming"; System.out.print("线程"+Thread.currentThread().getName()+"正要把数据:"+data1+"换出去"); Thread.sleep((long) (Math.random()*10000)); String data2 = (String) exchanger.exchange(data1); System.out.print("线程"+Thread.currentThread().getName()+"换回的数据:"+data2); } catch (InterruptedException e) { e.printStackTrace(); } } }); executorService.execute(new Runnable() { @Override public void run() { try { String data1 = "xiaoli"; System.out.print("线程"+Thread.currentThread().getName()+"正要把数据:"+data1+"换出去"); Thread.sleep((long) (Math.random()*10000)); String data2 = (String) exchanger.exchange(data1); System.out.print("线程"+Thread.currentThread().getName()+"换回的数据:"+data2); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }

  • BlockingQueue
与普通队列区别:
  1. 正常队列满了以后再添加就报错了
    阻塞队列满了以后再添加会等待 阻塞住
  2. 正常队列空了以后再取数据就报错
    阻塞队列空了以后再取数据会等待 阻塞住
  3. 阻塞队列实现阻塞同步的方式很简单,使用的就是是lock锁的多条件(condition)阻塞控制
JAVA_Concurrent
文章图片
4 组不同的方法用于插入、移除以及对队列中的元素进行检查
final BlockingQueue queue = new ArrayBlockingQueue(3); for (int i=0; i< 2; i++){ new Thread(){ @Override public void run() { while (true){ try { Thread.sleep((long) (Math.random()*1000)); Log.i("TEST",Thread.currentThread().getName() + "准备放数据"); queue.put(1); Log.i("TEST",Thread.currentThread().getName() + "已经放了数据"+"队列目前有"+queue.size()+"个数据"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); new Thread(){ @Override public void run() { while (true){ try { Thread.sleep(1000); Log.i("TEST",Thread.currentThread().getName()+"准备取数据"); queue.take(); Log.i("TEST",Thread.currentThread().getName()+"已经取走数据"+"队列目前有"+queue.size()+"个数据"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); }

生产者消费者
public class BlockingQueueTest { //生产者 public static class Producer implements Runnable{ private final BlockingQueue blockingQueue; private volatile boolean flag; private Random random; public Producer(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; flag=false; random=new Random(); } public void run() { while(!flag){ int info=random.nextInt(100); try { blockingQueue.put(info); System.out.println(Thread.currentThread().getName()+" produce "+info); Thread.sleep(50); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public void shutDown(){ flag=true; } } //消费者 public static class Consumer implements Runnable{ private final BlockingQueue blockingQueue; private volatile boolean flag; public Consumer(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } public void run() { while(!flag){ int info; try { info = blockingQueue.take(); System.out.println(Thread.currentThread().getName()+" consumer "+info); Thread.sleep(50); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public void shutDown(){ flag=true; } } public static void main(String[] args){ BlockingQueue blockingQueue = new LinkedBlockingQueue(10); Producer producer=new Producer(blockingQueue); Consumer consumer=new Consumer(blockingQueue); //创建5个生产者,5个消费者 for(int i=0; i< 10; i++){ if(i< 5){ new Thread(producer,"producer"+i).start(); }else{ new Thread(consumer,"consumer"+(i-5)).start(); } }try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } producer.shutDown(); consumer.shutDown(); } }

主线程执行十次子任务,然后子线程执行十次子任务 循环50次 ==================================================== final Business business = new Business(); new Thread(new Runnable() { @Override public void run() { for (int i=0; i< =50; i++){ business.sub(i); } } }).start(); for (int i=0; i< =50; i++){ business.main(i); } ============================================================== static class Business { BlockingQueue blockingQueue1 = new ArrayBlockingQueue< > (1); BlockingQueue blockingQueue2 = new ArrayBlockingQueue< > (1); //无参构造,只要调用构造方法,都会调用该方法 { try { blockingQueue2.put(1); } catch (InterruptedException e) { e.printStackTrace(); } }public void main(int i){ try { blockingQueue1.put(1); } catch (InterruptedException e) { e.printStackTrace(); } for (int j = 0; j < 10; j++) { Log.i("TEST","sub thread sequece of "+j+"loop of "+i); } try { blockingQueue2.take(); } catch (InterruptedException e) { e.printStackTrace(); } }public void sub(int i){ try { blockingQueue2.put(1); } catch (InterruptedException e) { e.printStackTrace(); } for (int j = 0; j < 10; j++) { Log.i("TEST","main thread sequece of "+j+"loop of "+i); } try { blockingQueue1.take(); } catch (InterruptedException e) { e.printStackTrace(); } } }

  • 同步集合
  • 传统方式下用Collections工具类提供的synchronizedCollection方法获取同步集合
    • synchronizedCollection 通过代理模式创建一个同步集合继承自map,实现map的增删改实际是调用真实的map的增删改,只是增加synchronized关键字,以实现同步效果。
  • java.util.concurrent下
    • ConcurrentHashMap
    • CopyOnWriteArrayList
    • CopyOnWriteArraySet
  • 传统集合迭代过程中不可以修改集合会抛出ConcurrentModificationException,可以使用同步集合
  • sample
实现效果:十个线程顺序执行doSome方法 key: 1. semaphore与lock效果相同,控制并发访问量 2. SynchronousQueue只有其他线程取数据take,才能存数据put。 > Ain which each insert operation must wait for a corresponding remove operation by another thread public class TestThread {/** * @param args */ public static void main(String[] args) {System.out.println("begin:"+(System.currentTimeMillis()/1000)); //final Semaphore semaphore = new Semaphore(1); final Lock lock = new ReentrantLock(); final SynchronousQueue queue = new SynchronousQueue(); for (int i = 0; i < 10; i++) { new Thread(new Runnable() {public void run() { try { //semaphore.acquire(); lock.lock(); System.out.println("take"); String output = TestDo.doSome(queue.take()); System.out.println(Thread.currentThread().getName() + ":"+output); //semaphore.release(); lock.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); }for (int i = 0; i < 10; i++) { try { System.out.println("put"); queue.put(i+""); } catch (InterruptedException e) { e.printStackTrace(); } } }}class TestDo { public static String doSome(String input){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } String output = input + ":"+ (System.currentTimeMillis()/1000); return output; } }

    推荐阅读