Java Socket编程实例(三)- TCP服务端线程池
一、服务端回传服务类:
importjava.io.IOException; importjava.io.InputStream; importjava.io.OutputStream; importjava.net.Socket; importjava.util.logging.Level; importjava.util.logging.Logger; publicclassEchoProtocolimplementsRunnable{ privatestaticfinalintBUFSIZE=32;//Size(inbytes)ofI/Obuffer privateSocketclientSocket;//Socketconnecttoclient privateLoggerlogger;//Serverlogger publicEchoProtocol(SocketclientSocket,Loggerlogger){ this.clientSocket=clientSocket; this.logger=logger; } publicstaticvoidhandleEchoClient(SocketclientSocket,Loggerlogger){ try{ //GettheinputandoutputI/Ostreamsfromsocket InputStreamin=clientSocket.getInputStream(); OutputStreamout=clientSocket.getOutputStream(); intrecvMsgSize;//Sizeofreceivedmessage inttotalBytesEchoed=0;//Bytesreceivedfromclient byte[]echoBuffer=newbyte[BUFSIZE];//ReceiveBuffer //Receiveuntilclientclosesconnection,indicatedby-1 while((recvMsgSize=in.read(echoBuffer))!=-1){ out.write(echoBuffer,0,recvMsgSize); totalBytesEchoed+=recvMsgSize; } logger.info("Client"+clientSocket.getRemoteSocketAddress()+",echoed"+totalBytesEchoed+"bytes."); }catch(IOExceptionex){ logger.log(Level.WARNING,"Exceptioninechoprotocol",ex); }finally{ try{ clientSocket.close(); }catch(IOExceptione){ } } } publicvoidrun(){ handleEchoClient(this.clientSocket,this.logger); } }
二、每个客户端请求都新启一个线程的Tcp服务端:
importjava.io.IOException; importjava.net.ServerSocket; importjava.net.Socket; importjava.util.logging.Logger; publicclassTCPEchoServerThread{ publicstaticvoidmain(String[]args)throwsIOException{ //Createaserversockettoacceptclientconnectionrequests ServerSocketservSock=newServerSocket(5500); Loggerlogger=Logger.getLogger("practical"); //Runforever,acceptingandspawningathreadforeachconnection while(true){ SocketclntSock=servSock.accept();//Blockwaitingforconnection //Spawnthreadtohandlenewconnection Threadthread=newThread(newEchoProtocol(clntSock,logger)); thread.start(); logger.info("CreatedandstartedThread"+thread.getName()); } /*NOTREACHED*/ } }
三、固定线程数的Tcp服务端:
importjava.io.IOException; importjava.net.ServerSocket; importjava.net.Socket; importjava.util.logging.Level; importjava.util.logging.Logger; publicclassTCPEchoServerPool{ publicstaticvoidmain(String[]args)throwsIOException{ intthreadPoolSize=3;//FixedThreadPoolSize finalServerSocketservSock=newServerSocket(5500); finalLoggerlogger=Logger.getLogger("practical"); //Spawnafixednumberofthreadstoserviceclients for(inti=0;i<threadPoolSize;i++){ Threadthread=newThread(){ publicvoidrun(){ while(true){ try{ SocketclntSock=servSock.accept();//Waitforaconnection EchoProtocol.handleEchoClient(clntSock,logger);//Handleit }catch(IOExceptionex){ logger.log(Level.WARNING,"Clientacceptfailed",ex); } } } }; thread.start(); logger.info("CreatedandstartedThread="+thread.getName()); } } }
四、使用线程池(使用Spring的线程次会有队列、最大线程数、最小线程数和超时时间的概念)
1.线程池工具类:
importjava.util.concurrent.*; /** *任务执行者 * *@authorWatsonXu *@since1.0.0<p>2013-6-8上午10:33:09</p> */ publicclassThreadPoolTaskExecutor{ privateThreadPoolTaskExecutor(){ } privatestaticExecutorServiceexecutor=Executors.newCachedThreadPool(newThreadFactory(){ intcount; /*执行器会在需要自行任务而线程池中没有线程的时候来调用该程序。对于callable类型的调用通过封装以后转化为runnable*/ publicThreadnewThread(Runnabler){ count++; ThreadinvokeThread=newThread(r); invokeThread.setName("CourserThread-"+count); invokeThread.setDaemon(false);////???????????? returninvokeThread; } }); publicstaticvoidinvoke(Runnabletask,TimeUnitunit,longtimeout)throwsTimeoutException,RuntimeException{ invoke(task,null,unit,timeout); } publicstatic<T>Tinvoke(Runnabletask,Tresult,TimeUnitunit,longtimeout)throwsTimeoutException, RuntimeException{ Future<T>future=executor.submit(task,result); Tt=null; try{ t=future.get(timeout,unit); }catch(TimeoutExceptione){ thrownewTimeoutException("Threadinvoketimeout..."); }catch(Exceptione){ thrownewRuntimeException(e); } returnt; } publicstatic<T>Tinvoke(Callable<T>task,TimeUnitunit,longtimeout)throwsTimeoutException,RuntimeException{ //这里将任务提交给执行器,任务已经启动,这里是异步的。 Future<T>future=executor.submit(task); //System.out.println("Taskareadyinthread"); Tt=null; try{ /* *这里的操作是确认任务是否已经完成,有了这个操作以后 *1)对invoke()的调用线程变成了等待任务完成状态 *2)主线程可以接收子线程的处理结果 */ t=future.get(timeout,unit); }catch(TimeoutExceptione){ thrownewTimeoutException("Threadinvoketimeout..."); }catch(Exceptione){ thrownewRuntimeException(e); } returnt; } }
2.具有伸缩性的Tcp服务端:
importjava.io.IOException; importjava.net.ServerSocket; importjava.net.Socket; importjava.util.concurrent.TimeUnit; importjava.util.logging.Logger; importdemo.callable.ThreadPoolTaskExecutor; publicclassTCPEchoServerExecutor{ publicstaticvoidmain(String[]args)throwsIOException{ //Createaserversockettoacceptclientconnectionrequests ServerSocketservSock=newServerSocket(5500); Loggerlogger=Logger.getLogger("practical"); //Runforever,acceptingandspawningthreadstoserviceeachconnection while(true){ SocketclntSock=servSock.accept();//Blockwaitingforconnection //executorService.submit(newEchoProtocol(clntSock,logger)); try{ ThreadPoolTaskExecutor.invoke(newEchoProtocol(clntSock,logger),TimeUnit.SECONDS,3); }catch(Exceptione){ } //service.execute(newTimelimitEchoProtocol(clntSock,logger)); } /*NOTREACHED*/ } }
以上就是本文的全部内容,查看更多Java的语法,大家可以关注:《ThinkinginJava中文手册》、《JDK1.7参考手册官方英文版》、《JDK1.6APIjava中文参考手册》、《JDK1.5APIjava中文参考手册》,也希望大家多多支持毛票票。