Java编程rabbitMQ实现消息的收发
java实现rAMQP,即AdvancedMessageQueuingProtocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
本文不介绍amqp和rabbitmq相关知识,请自行网上查阅
本文是基于spring-rabbit中间件来实现消息的发送接受功能
seehttp://www.rabbitmq.com/tutorials/tutorial-one-java.html
seehttp://www.springsource.org/spring-amqp
Java编程通过操作rabbitMQ消息的收发实现代码如下:
com.rabbitmq amqp-client 2.8.2 org.springframework.amqp spring-amqp 1.1.1.RELEASE org.springframework.amqp spring-rabbit 1.1.1.RELEASE com.caucho hessian 4.0.7
首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象
publicclassEventMessageimplementsSerializable{ privateStringqueueName; privateStringexchangeName; privatebyte[]eventData; publicEventMessage(StringqueueName,StringexchangeName,byte[]eventData){ this.queueName=queueName; this.exchangeName=exchangeName; this.eventData=eventData; } publicEventMessage(){ } publicStringgetQueueName(){ returnqueueName; } publicStringgetExchangeName(){ returnexchangeName; } publicbyte[]getEventData(){ returneventData; } @Override publicStringtoString(){ return"EopEventMessage[queueName="+queueName+",exchangeName=" +exchangeName+",eventData="+Arrays.toString(eventData) +"]"; } }
为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂
publicinterfaceCodecFactory{ byte[]serialize(Objectobj)throwsIOException; ObjectdeSerialize(byte[]in)throwsIOException; }
下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式
publicclassHessionCodecFactoryimplementsCodecFactory{ privatefinalLoggerlogger=Logger.getLogger(HessionCodecFactory.class); @Override publicbyte[]serialize(Objectobj)throwsIOException{ ByteArrayOutputStreambaos=null; HessianOutputoutput=null; try{ baos=newByteArrayOutputStream(1024); output=newHessianOutput(baos); output.startCall(); output.writeObject(obj); output.completeCall(); }catch(finalIOExceptionex){ throwex; }finally{ if(output!=null){ try{ baos.close(); }catch(finalIOExceptionex){ this.logger.error("Failedtoclosestream.",ex); } } } returnbaos!=null?baos.toByteArray():null; } @Override publicObjectdeSerialize(byte[]in)throwsIOException{ Objectobj=null; ByteArrayInputStreambais=null; HessianInputinput=null; try{ bais=newByteArrayInputStream(in); input=newHessianInput(bais); input.startReply(); obj=input.readObject(); input.completeReply(); }catch(finalIOExceptionex){ throwex; }catch(finalThrowablee){ this.logger.error("Failedtodecodeobject.",e); }finally{ if(input!=null){ try{ bais.close(); }catch(finalIOExceptionex){ this.logger.error("Failedtoclosestream.",ex); } } } returnobj; } }
接下来就先实现发送功能,新增一个接口专门用来实现发送功能
publicinterfaceEventTemplate{ voidsend(StringqueueName,StringexchangeName,ObjecteventContent)throwsSendRefuseException; voidsend(StringqueueName,StringexchangeName,ObjecteventContent,CodecFactorycodecFactory)throwsSendRefuseException; }
SendRefuseException是自定义的发送失败异常类
下面是它的实现类,主要的任务就是将数据转换为EventMessage
publicclassDefaultEventTemplateimplementsEventTemplate{ privatestaticfinalLoggerlogger=Logger.getLogger(DefaultEventTemplate.class); privateAmqpTemplateeventAmqpTemplate; privateCodecFactorydefaultCodecFactory; // privateDefaultEventControllereec; // publicDefaultEventTemplate(AmqpTemplateeopAmqpTemplate, // CodecFactorydefaultCodecFactory,DefaultEventControllereec){ // this.eventAmqpTemplate=eopAmqpTemplate; // this.defaultCodecFactory=defaultCodecFactory; // this.eec=eec; // } publicDefaultEventTemplate(AmqpTemplateeopAmqpTemplate,CodecFactorydefaultCodecFactory){ this.eventAmqpTemplate=eopAmqpTemplate; this.defaultCodecFactory=defaultCodecFactory; } @Override publicvoidsend(StringqueueName,StringexchangeName,ObjecteventContent) throwsSendRefuseException{ this.send(queueName,exchangeName,eventContent,defaultCodecFactory); } @Override publicvoidsend(StringqueueName,StringexchangeName,ObjecteventContent, CodecFactorycodecFactory)throwsSendRefuseException{ if(StringUtils.isEmpty(queueName)||StringUtils.isEmpty(exchangeName)){ thrownewSendRefuseException("queueNameexchangeNamecannotbeempty."); } // if(!eec.beBinded(exchangeName,queueName)) // eec.declareBinding(exchangeName,queueName); byte[]eventContentBytes=null; if(codecFactory==null){ if(eventContent==null){ logger.warn("FindeventContentisnull,areyousure..."); }else{ thrownewSendRefuseException( "codecFactorymustnotbenull,unlesseventContentisnull"); } }else{ try{ eventContentBytes=codecFactory.serialize(eventContent); }catch(IOExceptione){ thrownewSendRefuseException(e); } } //构造成Message EventMessagemsg=newEventMessage(queueName,exchangeName, eventContentBytes); try{ eventAmqpTemplate.convertAndSend(exchangeName,queueName,msg); }catch(AmqpExceptione){ logger.error("sendeventfail.EventMessage:["+eventContent+"]",e); thrownewSendRefuseException("sendeventfail",e); } } }
注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明
然后我们再实现接受消息
首先我们需要一个消费接口,所有的消费程序都实现这个类
publicinterfaceEventProcesser{ publicvoidprocess(Objecte); }
为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器
/** *MessageListenerAdapter的Pojo *消息处理适配器,主要功能:
*1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由A处理器来出来
*2、执行消息的消费分发,调用相应的处理器来消费属于它的消息
* */ publicclassMessageAdapterHandler{ privatestaticfinalLoggerlogger=Logger.getLogger(MessageAdapterHandler.class); privateConcurrentMapepwMap; publicMessageAdapterHandler(){ this.epwMap=newConcurrentHashMap (); } publicvoidhandleMessage(EventMessageeem){ logger.debug("ReceiveanEventMessage:["+eem+"]"); //先要判断接收到的message是否是空的,在某些异常情况下,会产生空值 if(eem==null){ logger.warn("ReceiveannullEventMessage,itmayproductsomeerrors,andprocessingmessageiscanceled."); return; } if(StringUtils.isEmpty(eem.getQueueName())||StringUtils.isEmpty(eem.getExchangeName())){ logger.warn("TheEventMessage'squeueNameandexchangeNameisempty,thisisnotallowed,andprocessingmessageiscanceled."); return; } //解码,并交给对应的EventHandle执行 EventProcessorWrapeepw=epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName()); if(eepw==null){ logger.warn("ReceiveanEopEventMessage,butnoprocessorcandoit."); return; } try{ eepw.process(eem.getEventData()); }catch(IOExceptione){ logger.error("EventcontentcannotbeDeserialized,checktheprovidedCodecFactory.",e); return; } } protectedvoidadd(StringqueueName,StringexchangeName,EventProcesserprocessor,CodecFactorycodecFactory){ if(StringUtils.isEmpty(queueName)||StringUtils.isEmpty(exchangeName)||processor==null||codecFactory==null){ thrownewRuntimeException("queueNameandexchangeNamecannotbeempty,andprocessororcodecFactorycannotbenull."); } EventProcessorWrapepw=newEventProcessorWrap(codecFactory,processor); EventProcessorWrapoldProcessorWrap=epwMap.putIfAbsent(queueName+"|"+exchangeName,epw); if(oldProcessorWrap!=null){ logger.warn("Theprocessorofthisqueueandexchangeexists,andthenewonecan'tbeadd"); } } protectedSet getAllBinding(){ Set keySet=epwMap.keySet(); returnkeySet; } protectedstaticclassEventProcessorWrap{ privateCodecFactorycodecFactory; privateEventProcessereep; protectedEventProcessorWrap(CodecFactorycodecFactory, EventProcessereep){ this.codecFactory=codecFactory; this.eep=eep; } publicvoidprocess(byte[]eventData)throwsIOException{ Objectobj=codecFactory.deSerialize(eventData); eep.process(obj); } } }
这是正常情况下的消息处理方式,如果rabbitmq消息接受发生异常,也要监控到,新增一个消费类专门用来处理错误异常的消息
publicclassMessageErrorHandlerimplementsErrorHandler{ privatestaticfinalLoggerlogger=Logger.getLogger(MessageErrorHandler.class); @Override publicvoidhandleError(Throwablet){ logger.error("RabbitMQhappenaerror:"+t.getMessage(),t); } }
接下来我们可能需要一个专门配置和rabbitmq通信的一些信息,比如地址,端口等信息
publicclassEventControlConfig{ privatefinalstaticintDEFAULT_PORT=5672; privatefinalstaticStringDEFAULT_USERNAME="guest"; privatefinalstaticStringDEFAULT_PASSWORD="guest"; privatefinalstaticintDEFAULT_PROCESS_THREAD_NUM=Runtime.getRuntime().availableProcessors()*2; privatestaticfinalintPREFETCH_SIZE=1; privateStringserverHost; privateintport=DEFAULT_PORT; privateStringusername=DEFAULT_USERNAME; privateStringpassword=DEFAULT_PASSWORD; privateStringvirtualHost; /** *和rabbitmq建立连接的超时时间 */ privateintconnectionTimeout=0; /** *事件消息处理线程数,默认是CPU核数*2 */ privateinteventMsgProcessNum; /** *每次消费消息的预取值 */ privateintprefetchSize; publicEventControlConfig(StringserverHost){ this(serverHost,DEFAULT_PORT,DEFAULT_USERNAME,DEFAULT_PASSWORD,null,0,DEFAULT_PROCESS_THREAD_NUM,DEFAULT_PROCESS_THREAD_NUM,newHessionCodecFactory()); } publicEventControlConfig(StringserverHost,intport,Stringusername, Stringpassword,StringvirtualHost,intconnectionTimeout, inteventMsgProcessNum,intprefetchSize,CodecFactorydefaultCodecFactory){ this.serverHost=serverHost; this.port=port>0?port:DEFAULT_PORT; this.username=username; this.password=password; this.virtualHost=virtualHost; this.connectionTimeout=connectionTimeout>0?connectionTimeout:0; this.eventMsgProcessNum=eventMsgProcessNum>0?eventMsgProcessNum:DEFAULT_PROCESS_THREAD_NUM; this.prefetchSize=prefetchSize>0?prefetchSize:PREFETCH_SIZE; } publicStringgetServerHost(){ returnserverHost; } publicintgetPort(){ returnport; } publicStringgetUsername(){ returnusername; } publicStringgetPassword(){ returnpassword; } publicStringgetVirtualHost(){ returnvirtualHost; } publicintgetConnectionTimeout(){ returnconnectionTimeout; } publicintgetEventMsgProcessNum(){ returneventMsgProcessNum; } publicintgetPrefetchSize(){ returnprefetchSize; } }
具体的发送、接受程序已经好了,接下来也是最重要的就是管理控制和rabbitmq的通信
publicinterfaceEventController{ /** *控制器启动方法 */ voidstart(); /** *获取发送模版 */ EventTemplategetEopEventTemplate(); /** *绑定消费程序到对应的exchange和queue */ EventControlleradd(StringqueueName,StringexchangeName,EventProcessereventProcesser); /*inmap,thekeyisqueuename,butvalueisexchangename*/ EventControlleradd(Mapbindings,EventProcessereventProcesser); }
它的实现类如下:
/** *和rabbitmq通信的控制器,主要负责: *1、和rabbitmq建立连接
*2、声明exChange和queue以及它们的绑定关系
*3、启动消息监听容器,并将不同消息的处理者绑定到对应的exchange和queue上
*4、持有消息发送模版以及所有exchange、queue和绑定关系的本地缓存
*@authoryangyong * */ publicclassDefaultEventControllerimplementsEventController{ privateCachingConnectionFactoryrabbitConnectionFactory; privateEventControlConfigconfig; privateRabbitAdminrabbitAdmin; privateCodecFactorydefaultCodecFactory=newHessionCodecFactory(); privateSimpleMessageListenerContainermsgListenerContainer;//rabbitMQmsglistenercontainer privateMessageAdapterHandlermsgAdapterHandler=newMessageAdapterHandler(); privateMessageConverterserializerMessageConverter=newSerializerMessageConverter();//直接指定 //queuecache,keyisexchangeName privateMapexchanges=newHashMap (); //queuecache,keyisqueueName privateMap queues=newHashMap (); //bindrelationofqueuetoexchangecache,valueisexchangeName|queueName privateSet binded=newHashSet (); privateEventTemplateeventTemplate;//给App使用的Event发送客户端 privateAtomicBooleanisStarted=newAtomicBoolean(false); privatestaticDefaultEventControllerdefaultEventController; publicsynchronizedstaticDefaultEventControllergetInstance(EventControlConfigconfig){ if(defaultEventController==null){ defaultEventController=newDefaultEventController(config); } returndefaultEventController; } privateDefaultEventController(EventControlConfigconfig){ if(config==null){ thrownewIllegalArgumentException("Configcannotbenull."); } this.config=config; initRabbitConnectionFactory(); //初始化AmqpAdmin rabbitAdmin=newRabbitAdmin(rabbitConnectionFactory); //初始化RabbitTemplate RabbitTemplaterabbitTemplate=newRabbitTemplate(rabbitConnectionFactory); rabbitTemplate.setMessageConverter(serializerMessageConverter); eventTemplate=newDefaultEventTemplate(rabbitTemplate,defaultCodecFactory,this); } /** *初始化rabbitmq连接 */ privatevoidinitRabbitConnectionFactory(){ rabbitConnectionFactory=newCachingConnectionFactory(); rabbitConnectionFactory.setHost(config.getServerHost()); rabbitConnectionFactory.setChannelCacheSize(config.getEventMsgProcessNum()); rabbitConnectionFactory.setPort(config.getPort()); rabbitConnectionFactory.setUsername(config.getUsername()); rabbitConnectionFactory.setPassword(config.getPassword()); if(!StringUtils.isEmpty(config.getVirtualHost())){ rabbitConnectionFactory.setVirtualHost(config.getVirtualHost()); } } /** *注销程序 */ publicsynchronizedvoiddestroy()throwsException{ if(!isStarted.get()){ return; } msgListenerContainer.stop(); eventTemplate=null; rabbitAdmin=null; rabbitConnectionFactory.destroy(); } @Override publicvoidstart(){ if(isStarted.get()){ return; } Set mapping=msgAdapterHandler.getAllBinding(); for(Stringrelation:mapping){ String[]relaArr=relation.split("\\|"); declareBinding(relaArr[1],relaArr[0]); } initMsgListenerAdapter(); isStarted.set(true); } /** *初始化消息监听器容器 */ privatevoidinitMsgListenerAdapter(){ MessageListenerlistener=newMessageListenerAdapter(msgAdapterHandler,serializerMessageConverter); msgListenerContainer=newSimpleMessageListenerContainer(); msgListenerContainer.setConnectionFactory(rabbitConnectionFactory); msgListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); msgListenerContainer.setMessageListener(listener); msgListenerContainer.setErrorHandler(newMessageErrorHandler()); msgListenerContainer.setPrefetchCount(config.getPrefetchSize());//设置每个消费者消息的预取值 msgListenerContainer.setConcurrentConsumers(config.getEventMsgProcessNum()); msgListenerContainer.setTxSize(config.getPrefetchSize());//设置有事务时处理的消息数 msgListenerContainer.setQueues(queues.values().toArray(newQueue[queues.size()])); msgListenerContainer.start(); } @Override publicEventTemplategetEopEventTemplate(){ returneventTemplate; } @Override publicEventControlleradd(StringqueueName,StringexchangeName,EventProcessereventProcesser){ returnadd(queueName,exchangeName,eventProcesser,defaultCodecFactory); } publicEventControlleradd(StringqueueName,StringexchangeName,EventProcessereventProcesser,CodecFactorycodecFactory){ msgAdapterHandler.add(queueName,exchangeName,eventProcesser,defaultCodecFactory); if(isStarted.get()){ initMsgListenerAdapter(); } returnthis; } @Override publicEventControlleradd(Map bindings, EventProcessereventProcesser){ returnadd(bindings,eventProcesser,defaultCodecFactory); } publicEventControlleradd(Map bindings, EventProcessereventProcesser,CodecFactorycodecFactory){ for(Map.Entry item:bindings.entrySet()) msgAdapterHandler.add(item.getKey(),item.getValue(),eventProcesser,codecFactory); returnthis; } /** *exchange和queue是否已经绑定 */ protectedbooleanbeBinded(StringexchangeName,StringqueueName){ returnbinded.contains(exchangeName+"|"+queueName); } /** *声明exchange和queue已经它们的绑定关系 */ protectedsynchronizedvoiddeclareBinding(StringexchangeName,StringqueueName){ StringbindRelation=exchangeName+"|"+queueName; if(binded.contains(bindRelation))return; booleanneedBinding=false; DirectExchangedirectExchange=exchanges.get(exchangeName); if(directExchange==null){ directExchange=newDirectExchange(exchangeName,true,false,null); exchanges.put(exchangeName,directExchange); rabbitAdmin.declareExchange(directExchange);//声明exchange needBinding=true; } Queuequeue=queues.get(queueName); if(queue==null){ queue=newQueue(queueName,true,false,false); queues.put(queueName,queue); rabbitAdmin.declareQueue(queue); //声明queue needBinding=true; } if(needBinding){ Bindingbinding=BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange rabbitAdmin.declareBinding(binding);//声明绑定关系 binded.add(bindRelation); } } }
搞定,现在可以将DefaultEventTemplate里的注释去掉了,接下来最后完成单元测试,为了测试传递对象,建立一个PO
@SuppressWarnings("serial") publicclassPeopleimplementsSerializable{ privateintid; privateStringname; privatebooleanmale; privatePeoplespouse; privateListfriends; publicintgetId(){ returnid; } publicvoidsetId(intid){ this.id=id; } publicStringgetName(){ returnname; } publicvoidsetName(Stringname){ this.name=name; } publicbooleanisMale(){ returnmale; } publicvoidsetMale(booleanmale){ this.male=male; } publicPeoplegetSpouse(){ returnspouse; } publicvoidsetSpouse(Peoplespouse){ this.spouse=spouse; } publicList getFriends(){ returnfriends; } publicvoidsetFriends(List friends){ this.friends=friends; } @Override publicStringtoString(){ //TODOAuto-generatedmethodstub return"People[id="+id+",name="+name+",male="+male+"]"; } }
建立单元测试
publicclassRabbitMqTest{ privateStringdefaultHost="127.0.0.1"; privateStringdefaultExchange="EXCHANGE_DIRECT_TEST"; privateStringdefaultQueue="QUEUE_TEST"; privateDefaultEventControllercontroller; privateEventTemplateeventTemplate; @Before publicvoidinit()throwsIOException{ EventControlConfigconfig=newEventControlConfig(defaultHost); controller=DefaultEventController.getInstance(config); eventTemplate=controller.getEopEventTemplate(); controller.add(defaultQueue,defaultExchange,newApiProcessEventProcessor()); controller.start(); } @Test publicvoidsendString()throwsSendRefuseException{ eventTemplate.send(defaultQueue,defaultExchange,"helloworld"); } @Test publicvoidsendObject()throwsSendRefuseException{ eventTemplate.send(defaultQueue,defaultExchange,mockObj()); } @Test publicvoidsendTemp()throwsSendRefuseException,InterruptedException{ StringtempExchange="EXCHANGE_DIRECT_TEST_TEMP";//以前未声明的exchange StringtempQueue="QUEUE_TEST_TEMP";//以前未声明的queue eventTemplate.send(tempQueue,tempExchange,mockObj()); //发送成功后此时不会接受到消息,还需要绑定对应的消费程序 controller.add(tempQueue,tempExchange,newApiProcessEventProcessor()); } @After publicvoidend()throwsInterruptedException{ Thread.sleep(2000); } privatePeoplemockObj(){ Peoplejack=newPeople(); jack.setId(1); jack.setName("JACK"); jack.setMale(true); Listfriends=newArrayList<>(); friends.add(jack); PeoplehanMeiMei=newPeople(); hanMeiMei.setId(1); hanMeiMei.setName("韩梅梅"); hanMeiMei.setMale(false); hanMeiMei.setFriends(friends); PeopleliLei=newPeople(); liLei.setId(2); liLei.setName("李雷"); liLei.setMale(true); liLei.setFriends(friends); liLei.setSpouse(hanMeiMei); hanMeiMei.setSpouse(liLei); returnhanMeiMei; } classApiProcessEventProcessorimplementsEventProcesser{ @Override publicvoidprocess(Objecte){//消费程序这里只是打印信息 Assert.assertNotNull(e); System.out.println(e); if(einstanceofPeople){ Peoplepeople=(People)e; System.out.println(people.getSpouse()); System.out.println(people.getFriends()); } } } }
源码地址请点击这里
以上就是本文关于java实现rabbitmq消息的发送接受的全部内容,希望对大家有所帮助。
感谢大家对本站的支持。