Java concurrency之Condition条件_动力节点Java学院整理
Condition介绍
Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。
Condition函数列表
//造成当前线程在接到信号或被中断之前一直处于等待状态。 voidawait() //造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。 booleanawait(longtime,TimeUnitunit) //造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。 longawaitNanos(longnanosTimeout) //造成当前线程在接到信号之前一直处于等待状态。 voidawaitUninterruptibly() //造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。 booleanawaitUntil(Datedeadline) //唤醒一个等待线程。 voidsignal() //唤醒所有等待线程。 voidsignalAll()
Condition示例
示例1是通过Object的wait(),notify()来演示线程的休眠/唤醒功能。
示例2是通过Condition的await(),signal()来演示线程的休眠/唤醒功能。
示例3是通过Condition的高级功能。
示例1
publicclassWaitTest1{
publicstaticvoidmain(String[]args){
ThreadAta=newThreadA("ta");
synchronized(ta){//通过synchronized(ta)获取“对象ta的同步锁”
try{
System.out.println(Thread.currentThread().getName()+"startta");
ta.start();
System.out.println(Thread.currentThread().getName()+"block");
ta.wait();//等待
System.out.println(Thread.currentThread().getName()+"continue");
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}
staticclassThreadAextendsThread{
publicThreadA(Stringname){
super(name);
}
publicvoidrun(){
synchronized(this){//通过synchronized(this)获取“当前对象的同步锁”
System.out.println(Thread.currentThread().getName()+"wakupothers");
notify();//唤醒“当前对象上的等待线程”
}
}
}
}
示例2
importjava.util.concurrent.locks.Lock;
importjava.util.concurrent.locks.Condition;
importjava.util.concurrent.locks.ReentrantLock;
publicclassConditionTest1{
privatestaticLocklock=newReentrantLock();
privatestaticConditioncondition=lock.newCondition();
publicstaticvoidmain(String[]args){
ThreadAta=newThreadA("ta");
lock.lock();//获取锁
try{
System.out.println(Thread.currentThread().getName()+"startta");
ta.start();
System.out.println(Thread.currentThread().getName()+"block");
condition.await();//等待
System.out.println(Thread.currentThread().getName()+"continue");
}catch(InterruptedExceptione){
e.printStackTrace();
}finally{
lock.unlock();//释放锁
}
}
staticclassThreadAextendsThread{
publicThreadA(Stringname){
super(name);
}
publicvoidrun(){
lock.lock();//获取锁
try{
System.out.println(Thread.currentThread().getName()+"wakupothers");
condition.signal();//唤醒“condition所在锁上的其它线程”
}finally{
lock.unlock();//释放锁
}
}
}
}
运行结果:
mainstartta mainblock tawakupothers maincontinue
通过“示例1”和“示例2”,我们知道Condition和Object的方法有一下对应关系:
Object Condition
休眠 wait await
唤醒个线程 notify signal
唤醒所有线程 notifyAll signalAll
Condition除了支持上面的功能之外,它更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。
例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒"读线程";当从缓冲区读出数据之后,唤醒"写线程";并且当缓冲区满的时候,"写线程"需要等待;当缓冲区为空时,"读线程"需要等待。 如果采用Object类中的wait(),notify(),notifyAll()实现该缓冲区,当向缓冲区写入数据之后需要唤醒"读线程"时,不可能通过notify()或notifyAll()明确的指定唤醒"读线程",而只能通过notifyAll唤醒所有线程(但是notifyAll无法区分唤醒的线程是读线程,还是写线程)。 但是,通过Condition,就能明确的指定唤醒读线程。
看看下面的示例3,可能对这个概念有更深刻的理解。
示例3
importjava.util.concurrent.locks.Lock;
importjava.util.concurrent.locks.Condition;
importjava.util.concurrent.locks.ReentrantLock;
classBoundedBuffer{
finalLocklock=newReentrantLock();
finalConditionnotFull=lock.newCondition();
finalConditionnotEmpty=lock.newCondition();
finalObject[]items=newObject[5];
intputptr,takeptr,count;
publicvoidput(Objectx)throwsInterruptedException{
lock.lock();//获取锁
try{
//如果“缓冲已满”,则等待;直到“缓冲”不是满的,才将x添加到缓冲中。
while(count==items.length)
notFull.await();
//将x添加到缓冲中
items[putptr]=x;
//将“put统计数putptr+1”;如果“缓冲已满”,则设putptr为0。
if(++putptr==items.length)putptr=0;
//将“缓冲”数量+1
++count;
//唤醒take线程,因为take线程通过notEmpty.await()等待
notEmpty.signal();
//打印写入的数据
System.out.println(Thread.currentThread().getName()+"put"+(Integer)x);
}finally{
lock.unlock();//释放锁
}
}
publicObjecttake()throwsInterruptedException{
lock.lock();//获取锁
try{
//如果“缓冲为空”,则等待;直到“缓冲”不为空,才将x从缓冲中取出。
while(count==0)
notEmpty.await();
//将x从缓冲中取出
Objectx=items[takeptr];
//将“take统计数takeptr+1”;如果“缓冲为空”,则设takeptr为0。
if(++takeptr==items.length)takeptr=0;
//将“缓冲”数量-1
--count;
//唤醒put线程,因为put线程通过notFull.await()等待
notFull.signal();
//打印取出的数据
System.out.println(Thread.currentThread().getName()+"take"+(Integer)x);
returnx;
}finally{
lock.unlock();//释放锁
}
}
}
publicclassConditionTest2{
privatestaticBoundedBufferbb=newBoundedBuffer();
publicstaticvoidmain(String[]args){
//启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);
//启动10个“读线程”,从BoundedBuffer中不断的读数据。
for(inti=0;i<10;i++){
newPutThread("p"+i,i).start();
newTakeThread("t"+i).start();
}
}
staticclassPutThreadextendsThread{
privateintnum;
publicPutThread(Stringname,intnum){
super(name);
this.num=num;
}
publicvoidrun(){
try{
Thread.sleep(1);//线程休眠1ms
bb.put(num);//向BoundedBuffer中写入数据
}catch(InterruptedExceptione){
}
}
}
staticclassTakeThreadextendsThread{
publicTakeThread(Stringname){
super(name);
}
publicvoidrun(){
try{
Thread.sleep(10);//线程休眠1ms
Integernum=(Integer)bb.take();//从BoundedBuffer中取出数据
}catch(InterruptedExceptione){
}
}
}
}
(某一次)运行结果:
p1put 1
p4put 4
p5put 5
p0put 0
p2put 2
t0take1
p3put 3
t1take4
p6put 6
t2take5
p7put 7
t3take0
p8put 8
t4take2
p9put 9
t5take3
t6take6
t7take7
t8take8
t9take9
结果说明:
(01)BoundedBuffer是容量为5的缓冲,缓冲中存储的是Object对象,支持多线程的读/写缓冲。多个线程操作“一个BoundedBuffer对象”时,它们通过互斥锁lock对缓冲区items进行互斥访问;而且同一个BoundedBuffer对象下的全部线程共用“notFull”和“notEmpty”这两个Condition。
notFull用于控制写缓冲,notEmpty用于控制读缓冲。当缓冲已满的时候,调用put的线程会执行notFull.await()进行等待;当缓冲区不是满的状态时,就将对象添加到缓冲区并将缓冲区的容量count+1,最后,调用notEmpty.signal()缓冲notEmpty上的等待线程(调用notEmpty.await的线程)。简言之,notFull控制“缓冲区的写入”,当往缓冲区写入数据之后会唤醒notEmpty上的等待线程。
同理,notEmpty控制“缓冲区的读取”,当读取了缓冲区数据之后会唤醒notFull上的等待线程。
(02)在ConditionTest2的main函数中,启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);同时,也启动10个“读线程”,从BoundedBuffer中不断的读数据。
(03)简单分析一下运行结果。
1,p1线程向缓冲中写入1。 此时,缓冲区数据: |1| | | | |
2,p4线程向缓冲中写入4。 此时,缓冲区数据: |1|4| | | |
3,p5线程向缓冲中写入5。 此时,缓冲区数据: |1|4|5| | |
4,p0线程向缓冲中写入0。 此时,缓冲区数据: |1|4|5|0| |
5,p2线程向缓冲中写入2。 此时,缓冲区数据: |1|4|5|0|2|
此时,缓冲区容量为5;缓冲区已满!如果此时,还有“写线程”想往缓冲中写入数据,会调用put中的notFull.await()等待,直接缓冲区非满状态,才能继续运行。
6,t0线程从缓冲中取出数据1。此时,缓冲区数据: | |4|5|0|2|
7,p3线程向缓冲中写入3。 此时,缓冲区数据: |3|4|5|0|2|
8,t1线程从缓冲中取出数据4。此时,缓冲区数据: |3| |5|0|2|
9,p6线程向缓冲中写入6。 此时,缓冲区数据: |3|6|5|0|2|
...