JUC之Semaphore源码分析
Semaphore主要用于限量控制并发执行代码的工具类,其内部通过一个permit来进行定义并发执行的数量。
/** *使用非公平版本构件Semaphore */ publicKSemaphore(intpermits){ sync=newNonfairSync(permits); } /** *指定版本构件Semaphore */ publicKSemaphore(intpermits,booleanfair){ sync=fair?newFairSync(permits):newNonfairSync(permits); }
/**AQS的子类主要定义获取释放lock*/ abstractstaticclassSyncextendsKAbstractQueuedSynchronizer{ privatestaticfinallongserialVersionUID=1192457210091910933L; /** *指定permit初始化Semaphore */ Sync(intpermits){ setState(permits); } /** *返回剩余permit */ finalintgetPermits(){ returngetState(); } /** *获取permit */ finalintnonfairTryAcquireShared(intacquires){ for(;;){ intavailable=getState(); intremaining=available-acquires;//判断获取acquires的剩余permit数目 if(remaining<0|| compareAndSetState(available,remaining)){//cas改变state returnremaining; } } } /** *释放lock */ protectedfinalbooleantryReleaseShared(intreleases){ for(;;){ intcurrent=getState(); intnext=current+releases; if(nextcurrent){//underflow thrownewError("Permitcountunderflow"); } if(compareAndSetState(current,next)){ return; } } } /**将permit置为0*/ finalintdrainPermits(){ for(;;){ intcurrent=getState(); if(current==0||compareAndSetState(current,0)){ returncurrent; } } } }
/** *调用acquireSharedInterruptibly响应中断的方式获取permit */ publicvoidacquire()throwsInterruptedException{ sync.acquireSharedInterruptibly(1); } /** *调用acquireUninterruptibly非响应中断的方式获取permit */ publicvoidacquireUninterruptibly(){ sync.acquireShared(1); } /** *尝试获取permit */ publicbooleantryAcquire(){ returnsync.nonfairTryAcquireShared(1)>=0; } /** *尝试的获取permit,支持超时与中断 */ publicbooleantryAcquire(longtimeout,TimeUnitunit)throwsInterruptedException{ returnsync.tryAcquireSharedNanos(1,unit.toNanos(timeout)); } /** *支持中断的获取permit */ publicvoidacquire(intpermits)throwsInterruptedException{ if(permits<0){ thrownewIllegalArgumentException(); } sync.acquireSharedInterruptibly(permits); } /** *不响应中断的获取permit */ publicvoidacquireUninterruptibly(intpermits){ if(permits<0)thrownewIllegalArgumentException(); sync.acquireShared(permits); } /** *尝试获取permit */ publicbooleantryAcquire(intpermits){ if(permits<0)thrownewIllegalArgumentException(); returnsync.nonfairTryAcquireShared(permits)>=0; } /** *尝试支持超时机制,支持中断的获取permit */ publicbooleantryAcquire(intpermits,longtimout,TimeUnitunit)throwsInterruptedException{ if(permits<0)thrownewIllegalArgumentException(); returnsync.tryAcquireSharedNanos(permits,unit.toNanos(timout)); }
/** *释放permit */ publicvoidrelease(){ sync.releaseShared(1); } /** *释放permit */ publicvoidrelease(intpermits){ if(permits<0)thrownewIllegalArgumentException(); sync.releaseShared(permits); }
/** *返回可用的permit */ publicintavailablePermits(){ returnsync.getPermits(); } /** *消耗光permit */ publicintdrainPermits(){ returnsync.drainPermits(); } /** *减少reduction个permit */ protectedvoidreducePermits(intreduction){ if(reduction<0)thrownewIllegalArgumentException(); sync.reducePermits(reduction); } /** *判断是否是公平版本 */ publicbooleanisFair(){ returnsyncinstanceofFairSync; } /** *返回AQS中SyncQueue里面的等待线程 */ publicfinalbooleanhasQueuedThreads(){ returnsync.hasQueuedThreads(); } /** *返回AQS中SyncQueue里面的等待线程长度 */ publicfinalintgetQueueLength(){ returnsync.getQueueLength(); } /** *返回AQS中SyncQueue里面的等待线程 */ protectedCollectiongetQueueThreads(){ returnsync.getQueuedThreads(); }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。