关于线程 thread (5)线程池

线程池及新特性 sun在java5中,对java线程中的类库做了大量的扩展,其中线程池就是java5的新特性之一,除了线程池之外还有很多线多线程相关的内容,为多线程的编程带来了极大的便利,为了编写稳定可靠的多线程程序,线程部分的新增内容显得尤为重要。
有关java5线程新特性的内容全部在java.util.concurrent下面,里面包含了数目众多的接口和类,熟悉这部分API特性是一项艰难的学习过程。
线程池的基本思想还是一种对象池的思想,开辟一块内存空间,里面放了众多未死亡的线程,池中线程执行调度由池管理器来处理。当有线程任务时, 从池中取出一个,执行完后线程对象归池,这样就可以避免反复创建线程对象所带来的性能开销,节省了系统的资源。
在java5之前,要实现一个线程池是相当有难度的,现在java5位我们做好了一切,我们只需要按照提供的API来使用,即可享受线程池给我们带来的极大便利。
线程池分好多种,固定尺寸线程池,可变尺寸线程池
在使用线程池之前,必须知道如何去创建一个线程池,需要了解的是java.util.concurrent.Executors类的API,这个类提供了大量的创建线程池的静态方法。
固定大小的线程池

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPoolTest { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(2); MyThread3 t1 = new MyThread3(); MyThread3 t2 = new MyThread3(); MyThread3 t3 = new MyThread3(); MyThread3 t4 = new MyThread3(); MyThread3 t5 = new MyThread3(); MyThread3 t6 = new MyThread3(); MyThread3 t7 = new MyThread3(); pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); pool.execute(t6); pool.execute(t7); } }class MyThread3 extends Thread{ public void run() { System.out.println(Thread.currentThread() + "......"); } }

关于线程 thread (5)线程池
文章图片

单任务线程池 改个方法就行了
关于线程 thread (5)线程池
文章图片

关于线程 thread (5)线程池
文章图片

对于以上两种线程池,大小都是固定的,当要加入的池的线程或者任务超过池的最大尺寸的时候,则入次线程需要排队等待。一旦线程池中有线程完毕,则排队等待的某个线程就会入池执行。
可变尺寸的线程池 关于线程 thread (5)线程池
文章图片

关于线程 thread (5)线程池
文章图片

延迟线程池 关于线程 thread (5)线程池
文章图片

关于线程 thread (5)线程池
文章图片

单任务延时连接池 关于线程 thread (5)线程池
文章图片

关于线程 thread (5)线程池
文章图片

额,,上面的例子发现忘了调用 pool.shutdown(); 了,,不调用的话程序是不会结束的。
新定义,有返回值的线程 在java5之前,线程是没有返回值的,常常为了有返回值而费尽周折,并且代码很不好写,或者干脆绕过这道坎,走别的路了。现在java终于有可返回值得任务了。可返回值的任务必须实现callable接口,类似的,无返回值的任务必须Runnable接口,执行完Callable任务后,可以获取一个Future的对象,在该对象上调用get就可以获取到callable任务返回的object了。
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class CallableDemo { public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(2); MyCallable callable1 = new MyCallable("A"); MyCallable callable2 = new MyCallable("B"); Future fu1 = service.submit(callable1); Future fu2 = service.submit(callable2); try { System.out.println(fu1.get().toString()); System.out.println(fu2.get().toString()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); }service.shutdown(); } }class MyCallable implements Callable{ private String s = ""; public MyCallable (String s) { this.s = s; } public Object call() throws Exception { return s; } }

关于线程 thread (5)线程池
文章图片

锁类 在java5中,专门提供了锁对象,利用锁可以方便的实现资源的封锁,(我靠它要锁谁,封锁的是谁啊?),用来控制对竞争资源并发的访问的控制,这些内容主要集中在Java.util.concurrent.lock包下面,里面有三个重要的接口,Condition, Lock, ReadWriteLock。
Condition:它将object类的监视器方法(wait(), notify(), notifyAll())分解成截然不同的对象,以便通过这些对象与任意Lock实现组合使用, 为每个对象提供多个等待,set(wait-ser)。
Lock:Lock实现提供了比使用synchronized方法和语句可获得更广泛的锁定操作。
ReadWriteLock: 维护了一对相关的锁定,一个用于只读操作,一个用于写入操作。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class LockDemo1 { public static void main(String[] args) { Money m = new Money(); Lock lock = new ReentrantLock(); ControlMoneyThread thread1 = new ControlMoneyThread(m, 100, lock); ControlMoneyThread thread2 = new ControlMoneyThread(m, 1000, lock); ControlMoneyThread thread3 = new ControlMoneyThread(m, -345, lock); ControlMoneyThread thread4 = new ControlMoneyThread(m, -456, lock); ControlMoneyThread thread5 = new ControlMoneyThread(m, 321, lock); ControlMoneyThread thread6 = new ControlMoneyThread(m, 100, lock); ExecutorService pool = Executors.newCachedThreadPool(); pool.execute(thread1); pool.execute(thread2); pool.execute(thread3); pool.execute(thread4); pool.execute(thread5); pool.execute(thread6); pool.shutdown(); } }class Money{ private int money = 0; private int getMoney(){ return money; } public void addMoney(int m) { money += m; System.out.println(Thread.currentThread() + " 涉及金额=" + m + " 当前余额是: " + money); } }class ControlMoneyThread extends Thread { private Money m = null; private int change = 0; private Lock lock = null; public ControlMoneyThread(Money m, int changeCount, Lock lock) { // TODO Auto-generated constructor stub this.m = m; change = changeCount; this.lock = lock; } public void run() { // TODO Auto-generated method stub lock.lock(); m.addMoney(change); lock.unlock(); } }

关于线程 thread (5)线程池
文章图片

一定要注意的是,在获取了锁的对象之后,用完之后一定要尽快的释放锁!以便于等待该锁的线程有机会执行。
在上文中提到了Lock接口及对象,使用它,可以很优雅的控制竞争资源的安全访问,但是这种锁是不区分读写的,这种锁我们称之为普通锁。为了提高性能,java提供了读写锁,在读的地方使用读锁,在写的地方使用写锁,灵活控制,在一定程度上提高了程序的执行效率。
java并发编程系列之ReadWriteLock读写锁的使用
java中有读写锁有个接口java.util.concurrent.locks.ReadWriteLock, 也有具体的实现ReentrantReadWriteLock,详细的API可以查看API文档
public class ReadWriteLockDemo1 { public static void main(String[] args) { final ReadWriteLockDemo1 demo = new ReadWriteLockDemo1(); Thread thread = new Thread(){ public void run() { demo.get(Thread.currentThread()); }; }; Thread thread1 = new Thread(){ public void run() { demo.get(Thread.currentThread()); }; }; thread.start(); thread1.start(); } public synchronized void get (Thread thread) { System.out.println("start time = " + System.currentTimeMillis()); for (int i = 0; i < 5; i++) { try { Thread.sleep(20); } catch (Exception e) { // TODO: handle exception } System.out.println(thread.getName() + "正在进行读操作......"); } System.out.println(thread.getName() + "读操作完毕"); System.out.println("end time:" + System.currentTimeMillis()); } }

关于线程 thread (5)线程池
文章图片

我们可以看到,即使是在读取文件,加了synchronized关键字之后,读与读之间也是互斥的,也就是说必须等到Thread-1读完以后,才会轮到Thread-0执行,而无法做到同时读文件,这种情况下如果存在大量的线程需要同时读文件的话,读写锁的效率就体现出来了,它的效率要明显高于synchronized关键字的实现。
import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockDemo1 { //添加读写锁 ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); public static void main(String[] args) { final ReadWriteLockDemo1 demo = new ReadWriteLockDemo1(); Thread thread = new Thread(){ public void run() { demo.get(Thread.currentThread()); }; }; Thread thread1 = new Thread(){ public void run() { demo.get(Thread.currentThread()); }; }; thread.start(); thread1.start(); } public void get (Thread thread) { //添加锁 lock.readLock().lock(); try { System.out.println("start time = " + System.currentTimeMillis()); for (int i = 0; i < 5; i++) { try { Thread.sleep(20); } catch (Exception e) { // TODO: handle exception } System.out.println(thread.getName() + "正在进行读操作......"); } System.out.println(thread.getName() + "读操作完毕"); System.out.println("end time:" + System.currentTimeMillis()); } catch (Exception e) { // TODO: handle exception } finally { //释放锁 lock.readLock().unlock(); } } }

关于线程 thread (5)线程池
文章图片

可以看到效率高了近一倍
不过要注意的是,如果有一个线程已经占用了读锁,则此时其他的线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁,如果一个线程已经占用了写锁,则其他的线程如果申请写锁或者读锁,则申请的线程会一直等待当前写锁的释放,读锁和写锁是互斥的,写锁和写锁也是互斥的,但是读锁和读锁不是互斥的,
import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockDemo1 { //添加读写锁 ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); public static void main(String[] args) { final ReadWriteLockDemo1 demo = new ReadWriteLockDemo1(); Thread thread = new Thread(){ public void run() { demo.get(Thread.currentThread()); }; }; Thread thread1 = new Thread(){ public void run() { demo.get(Thread.currentThread()); }; }; Thread thread3 = new Thread() { public void run() { demo.add(Thread.currentThread()); } }; thread.start(); thread1.start(); thread3.start(); } public void get (Thread thread) { //添加锁 lock.readLock().lock(); try { System.out.println("read start time = " + System.currentTimeMillis()); for (int i = 0; i < 5; i++) { try { Thread.sleep(20); } catch (Exception e) { // TODO: handle exception } System.out.println(thread.getName() + "正在进行读操作......"); } System.out.println(thread.getName() + "读操作完毕"); System.out.println("read end time:" + System.currentTimeMillis()); } catch (Exception e) { // TODO: handle exception } finally { //释放锁 lock.readLock().unlock(); } } public void add (Thread thread) { //添加锁 lock.writeLock().lock(); try { System.out.println("write start time = " + System.currentTimeMillis()); for (int i = 0; i < 5; i++) { try { Thread.sleep(20); } catch (Exception e) { // TODO: handle exception } System.out.println(thread.getName() + "正在进行写操作......"); } System.out.println(thread.getName() + "写操作完毕"); System.out.println("write end time:" + System.currentTimeMillis()); } catch (Exception e) { // TODO: handle exception } finally { //释放锁 lock.writeLock().unlock(); } } }

关于线程 thread (5)线程池
文章图片

可重入锁(Reentrant):
如果锁具备可重入性,则称之为可重入锁。像syncheonized和ReenTranLock都是可重入锁,可重入性在我们看来实际上表明了锁的分配机制,基于线程的分配,而不是基于方法调用的分配。举个例子,当一个线程执行到synchronized方法时,比如method1,而在method1中会调用同一个类的另外一个synchronized方法method2,此时线程不必重新去申请锁,而是可以直接执行方法method2,,因为如果不这样搞得话,逻辑根本执行不下去,如果不具有可重入性,这样走,不相当于死锁么。
可中断锁:
可中断锁,顾名思义,就是可以相应中断的锁。在java中,synchronized就不是可中断的,但是Lock是可中断的锁。
如果某一线程A在执行同步代码,另一线程B正在等待正在获取锁,可能由于等待时间太长,线程B不想再等待了,想先处理其他的事情,我们可以让它中断自己或者在别的线程中中断它,这种就是可中断锁。
公平锁:
公平锁即尽量以请求锁的顺序来获取锁。比如同是有多个线程在等待一个锁,当这个锁被释放时,等待最久的线程(也就是最先请求的线程)会获得该锁,这就是公平锁。
那么与之对立的就是非公平锁,非公平锁既无法保证锁的获取是按照请求顺序进行的,这样就可能导致某个或者一些线程永远获取不到锁。 在java中,synchronized就是非公平锁,它无法保证等待的线程获取到锁的顺序。而对于,ReentrantLock和ReentrantReadWriteLock,它默认情况下是非公平锁,但是可以设置为公平锁,设置方法如下 ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
读写锁:前面有。
新特性-信号量 java的信号量实际上是一个功能完毕的计数器,对控制一定资源的消费与回收有着重要的意义,信号量常常用于多线程的代码中,并能监控有多少数目的线程等待获取资源,并且通过信号量可以得知可用资源的数目等,这里总是在强调数目二字,但是不能指出是那、哪些在等待,哪些资源可用。如果信号量类能够返回数目还能知道哪些对象在等待,哪些资源可用,那就非常完美了,仅仅拿到这些概括性的数字,对精确控制的意义并不是很大。
新特性-阻塞队列 阻塞队列是java5线程新特性中的内容,java定义了阻塞队列的接口,java.util.concurrent,BlockingQueue,阻塞队列的概念是,一个指定长度的队列,如果队列满了,添加新元素的操作会被阻塞等待,直到有空位位置。同样,当队列为空的时候,请求队列元素的操作同样会阻塞等待,直到有元素可用为止。
有了这样的功能,就为多线程的排队等候的模型开辟了便捷通道,非常有用。java.util.concurrent.BlockingQueue继承了java.Queue接口
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueDemo1 { public static void main(String[] args) { BlockingQueue queque = new ArrayBlockingQueue(8); for (int i = 0; i < 30; i++) { try { queque.put(new Integer(i)); System.out.println("向阻塞队列加了一个新元素:" + i); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }System.out.println("全部添加完毕。。。"); } }

关于线程 thread (5)线程池
文章图片

添加了8个之后,线程就阻塞了!因为阻塞队列撑死能盛8个元素。
另外,阻塞队列还有更多实现类,来满足各种复杂的需求。ArrayBlockingQueue,DelayQueue,LinkedBlockingQueue,PriorityBlockingQueue,SunchronousQueue,具体的API差别也很小。
新特性-阻塞栈 对于阻塞栈,与阻塞队列相似,不同点是栈是后进先出的结构。每次操作的是栈顶,而队列是先进先出的结构,每次操作的是队列头。
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; public class BlockingQueueDemo1 { public static void main(String[] args) { BlockingDeque deque = new LinkedBlockingDeque(8); //也就改了这一处而已 for (int i = 0; i < 30; i++) { try { deque.put(new Integer(i)); System.out.println("向阻塞队列加了一个新元素:" + i); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }System.out.println("全部添加完毕。。。"); } }

关于线程 thread (5)线程池
文章图片

从上面结果可以看出,程序并没有结束,二是线程阻塞了,原因是栈满了,后面添加元素的操作就被阻塞了。
新特性-条件变量 条件变量是java5线程中很重要的一个概念,顾名思义,条件变量就是表示条件的一种变量。但是必须说明,这里的条件是没有实际含义的,仅仅是个标志而已,并且条件的含义旺旺通过代码来赋予含义。
这里的条件和普通的条件表达式有着天壤之别:条件变量都实现了java.util.concurrent.lock.Condition接口,条件变量的实例化是通过对一个Lock对象上调用newCondition()方法来获取的,这样,条件就和一个锁对象绑起来了,因此,java中的条件变量只能配合锁使用,并控制并发程序访问竞争资源的安全。
条件变量的出现是为了更精细控制线程等待与唤醒,在java5之前,线程的等待与唤醒依靠的是object对象的wait() 和 notify()/notifyAll()方法,这样处理不够精细。
而在java5中,一个锁可以有多个条件,每个条件上可以有多个线程等待,通过调用await()方法,可以让线程在该条件下等待,当调用signalAll()方法,又可以唤醒该条件下等待的线程。
条件变量是比较抽象的,原因是他不是自然语言中的条件概念,而是程序控制的一种手段。
以下以银行存取款的模拟程序来揭开java多线程条件变量的神秘面纱:
有一个账户,多个用户在同时操作这个账户,有的存款有的取款,存款随便存,取款有限制,不能透支,任何试图透支的操作都将等待里面有足够的存款时才执行操作。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TiaojianBianLiangDemo { public static void main(String[] args) { MyCount count = new MyCount("123456788947243746", 1000); ControlThread thread1 = new ControlThread(count, "张三", 10000, ControlThread.TYPE_SAVE); ControlThread thread2 = new ControlThread(count, "张三的老婆", 1000, ControlThread.TYPE_DRAW); ControlThread thread3 = new ControlThread(count, "张三的儿子", 1000, ControlThread.TYPE_DRAW); ControlThread thread4 = new ControlThread(count, "张三的闺女", 1000, ControlThread.TYPE_DRAW); ControlThread thread5 = new ControlThread(count, "张三的父亲", 1000, ControlThread.TYPE_DRAW); ControlThread thread6 = new ControlThread(count, "张三的母亲", 1000, ControlThread.TYPE_DRAW); ControlThread thread7 = new ControlThread(count, "张三的岳父", 1000, ControlThread.TYPE_DRAW); ControlThread thread8 = new ControlThread(count, "张三的岳母", 1000, ControlThread.TYPE_DRAW); ControlThread thread9 = new ControlThread(count, "张三的哥哥", 1000, ControlThread.TYPE_DRAW); ControlThread thread10 = new ControlThread(count, "张三的弟弟", 1000, ControlThread.TYPE_DRAW); ControlThread thread11 = new ControlThread(count, "张三的妹妹", 1000, ControlThread.TYPE_DRAW); //张三好惨。。。ExecutorService pool = Executors.newFixedThreadPool(11); pool.execute(thread11); pool.execute(thread10); pool.execute(thread9); pool.execute(thread8); pool.execute(thread7); pool.execute(thread6); pool.execute(thread5); pool.execute(thread4); pool.execute(thread3); pool.execute(thread2); pool.execute(thread1); pool.shutdown(); } }class MyCount{ private String mOid; //账号 private int mCash; //账户余额 private Lock mLock = new ReentrantLock(); //账户锁 private Condition mSave_Condition = mLock.newCondition(); //存款条件 private Condition mDraw_Condition = mLock.newCondition(); //取款条件 public MyCount(String id, int cash) { this.mOid = id; this.mCash = cash; } /** * 存款 */ public void saving(int x, String name) { mLock.lock(); if (x > 0) { mCash += x; System.out.println("存款完毕! 执行人:" + name + " 金额:" + x + "目前余额为:" + mCash); } mDraw_Condition.signalAll(); //唤醒所有取款线程 mLock.unlock(); } public void drawing (int x, String name) { mLock.lock(); try { if (mCash - x < 0) { System.out.println("余额不足," + name + "要取" + x + "元, 余额为:" + mCash + "元!"); mDraw_Condition.await(); //等待 } // 进行取款处理 mCash -= x; System.out.println("取款完毕! 执行人:" + name + " 金额:" + x); //mSave_Condition.signalAll(); //为什么有这一步??} catch (Exception e) { e.printStackTrace(); } finally { mLock.unlock(); } } }class ControlThread extends Thread { public static final int TYPE_SAVE = 0x01; public static final int TYPE_DRAW = 0x02; private MyCount mMyCount; private String mName; private int mMoney = 0; private int mType = 0; public ControlThread (MyCount count, String name, int number, int type) { this.mMyCount = count; this.mName = name; this.mMoney = number; this.mType = type; } public void run() { if (mType == TYPE_SAVE) { mMyCount.saving(mMoney, mName); } else if (mType == TYPE_DRAW) { mMyCount.drawing(mMoney, mName); } } }

关于线程 thread (5)线程池
文章图片

新特性-障碍器 哎,一堆生词。Java5中,添加了障碍器,为了适应一种新的设计需求,比如一个大型任务,常常需要分配好多子任务去执行,只有当所有的子任务完成之后,才能执行主任务,这时候就可以用障碍器了,不明觉厉!感觉很有用!
import java.util.concurrent.CyclicBarrier; public class CylicBarrierDemo { public static void main(String[] args) { CyclicBarrier cb = new CyclicBarrier(7, new MainTask()); new HuluwaThread("红娃大力士", cb).start(); new HuluwaThread("橙娃千里眼", cb).start(); new HuluwaThread("黄娃铜头铁臂", cb).start(); new HuluwaThread("绿娃喷火娃", cb).start(); new HuluwaThread("青娃水娃", cb).start(); new HuluwaThread("蓝娃隐身娃", cb).start(); new HuluwaThread("紫娃吸妖娃", cb).start(); } }class MainTask implements Runnable{ public void run() { System.out.println("妖怪,还我爷爷,饶你不死!"); } }class HuluwaThread extends Thread { private String name; private CyclicBarrier cb; public HuluwaThread(String name, CyclicBarrier cb) { this.name = name; this.cb = cb; } public void run() { try { System.out.println(name + "合体完毕!"); cb.await(); } catch (Exception e) { // TODO: handle exception } finally {} } }

【关于线程 thread (5)线程池】关于线程 thread (5)线程池
文章图片

    推荐阅读