Java编程rabbitMQ实现消息的收发
java实现rAMQP,即AdvancedMessageQueuingProtocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
本文不介绍amqp和rabbitmq相关知识,请自行网上查阅
本文是基于spring-rabbit中间件来实现消息的发送接受功能
seehttp://www.rabbitmq.com/tutorials/tutorial-one-java.html
seehttp://www.springsource.org/spring-amqp
Java编程通过操作rabbitMQ消息的收发实现代码如下:
com.rabbitmq amqp-client 2.8.2 org.springframework.amqp spring-amqp 1.1.1.RELEASE org.springframework.amqp spring-rabbit 1.1.1.RELEASE com.caucho hessian 4.0.7
首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象
publicclassEventMessageimplementsSerializable{
privateStringqueueName;
privateStringexchangeName;
privatebyte[]eventData;
publicEventMessage(StringqueueName,StringexchangeName,byte[]eventData){
this.queueName=queueName;
this.exchangeName=exchangeName;
this.eventData=eventData;
}
publicEventMessage(){
}
publicStringgetQueueName(){
returnqueueName;
}
publicStringgetExchangeName(){
returnexchangeName;
}
publicbyte[]getEventData(){
returneventData;
}
@Override
publicStringtoString(){
return"EopEventMessage[queueName="+queueName+",exchangeName="
+exchangeName+",eventData="+Arrays.toString(eventData)
+"]";
}
}
为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂
publicinterfaceCodecFactory{
byte[]serialize(Objectobj)throwsIOException;
ObjectdeSerialize(byte[]in)throwsIOException;
}
下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式
publicclassHessionCodecFactoryimplementsCodecFactory{
privatefinalLoggerlogger=Logger.getLogger(HessionCodecFactory.class);
@Override
publicbyte[]serialize(Objectobj)throwsIOException{
ByteArrayOutputStreambaos=null;
HessianOutputoutput=null;
try{
baos=newByteArrayOutputStream(1024);
output=newHessianOutput(baos);
output.startCall();
output.writeObject(obj);
output.completeCall();
}catch(finalIOExceptionex){
throwex;
}finally{
if(output!=null){
try{
baos.close();
}catch(finalIOExceptionex){
this.logger.error("Failedtoclosestream.",ex);
}
}
}
returnbaos!=null?baos.toByteArray():null;
}
@Override
publicObjectdeSerialize(byte[]in)throwsIOException{
Objectobj=null;
ByteArrayInputStreambais=null;
HessianInputinput=null;
try{
bais=newByteArrayInputStream(in);
input=newHessianInput(bais);
input.startReply();
obj=input.readObject();
input.completeReply();
}catch(finalIOExceptionex){
throwex;
}catch(finalThrowablee){
this.logger.error("Failedtodecodeobject.",e);
}finally{
if(input!=null){
try{
bais.close();
}catch(finalIOExceptionex){
this.logger.error("Failedtoclosestream.",ex);
}
}
}
returnobj;
}
}
接下来就先实现发送功能,新增一个接口专门用来实现发送功能
publicinterfaceEventTemplate{
voidsend(StringqueueName,StringexchangeName,ObjecteventContent)throwsSendRefuseException;
voidsend(StringqueueName,StringexchangeName,ObjecteventContent,CodecFactorycodecFactory)throwsSendRefuseException;
}
SendRefuseException是自定义的发送失败异常类
下面是它的实现类,主要的任务就是将数据转换为EventMessage
publicclassDefaultEventTemplateimplementsEventTemplate{
privatestaticfinalLoggerlogger=Logger.getLogger(DefaultEventTemplate.class);
privateAmqpTemplateeventAmqpTemplate;
privateCodecFactorydefaultCodecFactory;
// privateDefaultEventControllereec;
// publicDefaultEventTemplate(AmqpTemplateeopAmqpTemplate,
// CodecFactorydefaultCodecFactory,DefaultEventControllereec){
// this.eventAmqpTemplate=eopAmqpTemplate;
// this.defaultCodecFactory=defaultCodecFactory;
// this.eec=eec;
// }
publicDefaultEventTemplate(AmqpTemplateeopAmqpTemplate,CodecFactorydefaultCodecFactory){
this.eventAmqpTemplate=eopAmqpTemplate;
this.defaultCodecFactory=defaultCodecFactory;
}
@Override
publicvoidsend(StringqueueName,StringexchangeName,ObjecteventContent)
throwsSendRefuseException{
this.send(queueName,exchangeName,eventContent,defaultCodecFactory);
}
@Override
publicvoidsend(StringqueueName,StringexchangeName,ObjecteventContent,
CodecFactorycodecFactory)throwsSendRefuseException{
if(StringUtils.isEmpty(queueName)||StringUtils.isEmpty(exchangeName)){
thrownewSendRefuseException("queueNameexchangeNamecannotbeempty.");
}
// if(!eec.beBinded(exchangeName,queueName))
// eec.declareBinding(exchangeName,queueName);
byte[]eventContentBytes=null;
if(codecFactory==null){
if(eventContent==null){
logger.warn("FindeventContentisnull,areyousure...");
}else{
thrownewSendRefuseException(
"codecFactorymustnotbenull,unlesseventContentisnull");
}
}else{
try{
eventContentBytes=codecFactory.serialize(eventContent);
}catch(IOExceptione){
thrownewSendRefuseException(e);
}
}
//构造成Message
EventMessagemsg=newEventMessage(queueName,exchangeName,
eventContentBytes);
try{
eventAmqpTemplate.convertAndSend(exchangeName,queueName,msg);
}catch(AmqpExceptione){
logger.error("sendeventfail.EventMessage:["+eventContent+"]",e);
thrownewSendRefuseException("sendeventfail",e);
}
}
}
注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明
然后我们再实现接受消息
首先我们需要一个消费接口,所有的消费程序都实现这个类
publicinterfaceEventProcesser{
publicvoidprocess(Objecte);
}
为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器
/** *MessageListenerAdapter的Pojo *消息处理适配器,主要功能:
*1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由A处理器来出来
*2、执行消息的消费分发,调用相应的处理器来消费属于它的消息
* */ publicclassMessageAdapterHandler{ privatestaticfinalLoggerlogger=Logger.getLogger(MessageAdapterHandler.class); privateConcurrentMapepwMap; publicMessageAdapterHandler(){ this.epwMap=newConcurrentHashMap (); } publicvoidhandleMessage(EventMessageeem){ logger.debug("ReceiveanEventMessage:["+eem+"]"); //先要判断接收到的message是否是空的,在某些异常情况下,会产生空值 if(eem==null){ logger.warn("ReceiveannullEventMessage,itmayproductsomeerrors,andprocessingmessageiscanceled."); return; } if(StringUtils.isEmpty(eem.getQueueName())||StringUtils.isEmpty(eem.getExchangeName())){ logger.warn("TheEventMessage'squeueNameandexchangeNameisempty,thisisnotallowed,andprocessingmessageiscanceled."); return; } //解码,并交给对应的EventHandle执行 EventProcessorWrapeepw=epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName()); if(eepw==null){ logger.warn("ReceiveanEopEventMessage,butnoprocessorcandoit."); return; } try{ eepw.process(eem.getEventData()); }catch(IOExceptione){ logger.error("EventcontentcannotbeDeserialized,checktheprovidedCodecFactory.",e); return; } } protectedvoidadd(StringqueueName,StringexchangeName,EventProcesserprocessor,CodecFactorycodecFactory){ if(StringUtils.isEmpty(queueName)||StringUtils.isEmpty(exchangeName)||processor==null||codecFactory==null){ thrownewRuntimeException("queueNameandexchangeNamecannotbeempty,andprocessororcodecFactorycannotbenull."); } EventProcessorWrapepw=newEventProcessorWrap(codecFactory,processor); EventProcessorWrapoldProcessorWrap=epwMap.putIfAbsent(queueName+"|"+exchangeName,epw); if(oldProcessorWrap!=null){ logger.warn("Theprocessorofthisqueueandexchangeexists,andthenewonecan'tbeadd"); } } protectedSet getAllBinding(){ Set keySet=epwMap.keySet(); returnkeySet; } protectedstaticclassEventProcessorWrap{ privateCodecFactorycodecFactory; privateEventProcessereep; protectedEventProcessorWrap(CodecFactorycodecFactory, EventProcessereep){ this.codecFactory=codecFactory; this.eep=eep; } publicvoidprocess(byte[]eventData)throwsIOException{ Objectobj=codecFactory.deSerialize(eventData); eep.process(obj); } } }
这是正常情况下的消息处理方式,如果rabbitmq消息接受发生异常,也要监控到,新增一个消费类专门用来处理错误异常的消息
publicclassMessageErrorHandlerimplementsErrorHandler{
privatestaticfinalLoggerlogger=Logger.getLogger(MessageErrorHandler.class);
@Override
publicvoidhandleError(Throwablet){
logger.error("RabbitMQhappenaerror:"+t.getMessage(),t);
}
}
接下来我们可能需要一个专门配置和rabbitmq通信的一些信息,比如地址,端口等信息
publicclassEventControlConfig{
privatefinalstaticintDEFAULT_PORT=5672;
privatefinalstaticStringDEFAULT_USERNAME="guest";
privatefinalstaticStringDEFAULT_PASSWORD="guest";
privatefinalstaticintDEFAULT_PROCESS_THREAD_NUM=Runtime.getRuntime().availableProcessors()*2;
privatestaticfinalintPREFETCH_SIZE=1;
privateStringserverHost;
privateintport=DEFAULT_PORT;
privateStringusername=DEFAULT_USERNAME;
privateStringpassword=DEFAULT_PASSWORD;
privateStringvirtualHost;
/**
*和rabbitmq建立连接的超时时间
*/
privateintconnectionTimeout=0;
/**
*事件消息处理线程数,默认是CPU核数*2
*/
privateinteventMsgProcessNum;
/**
*每次消费消息的预取值
*/
privateintprefetchSize;
publicEventControlConfig(StringserverHost){
this(serverHost,DEFAULT_PORT,DEFAULT_USERNAME,DEFAULT_PASSWORD,null,0,DEFAULT_PROCESS_THREAD_NUM,DEFAULT_PROCESS_THREAD_NUM,newHessionCodecFactory());
}
publicEventControlConfig(StringserverHost,intport,Stringusername,
Stringpassword,StringvirtualHost,intconnectionTimeout,
inteventMsgProcessNum,intprefetchSize,CodecFactorydefaultCodecFactory){
this.serverHost=serverHost;
this.port=port>0?port:DEFAULT_PORT;
this.username=username;
this.password=password;
this.virtualHost=virtualHost;
this.connectionTimeout=connectionTimeout>0?connectionTimeout:0;
this.eventMsgProcessNum=eventMsgProcessNum>0?eventMsgProcessNum:DEFAULT_PROCESS_THREAD_NUM;
this.prefetchSize=prefetchSize>0?prefetchSize:PREFETCH_SIZE;
}
publicStringgetServerHost(){
returnserverHost;
}
publicintgetPort(){
returnport;
}
publicStringgetUsername(){
returnusername;
}
publicStringgetPassword(){
returnpassword;
}
publicStringgetVirtualHost(){
returnvirtualHost;
}
publicintgetConnectionTimeout(){
returnconnectionTimeout;
}
publicintgetEventMsgProcessNum(){
returneventMsgProcessNum;
}
publicintgetPrefetchSize(){
returnprefetchSize;
}
}
具体的发送、接受程序已经好了,接下来也是最重要的就是管理控制和rabbitmq的通信
publicinterfaceEventController{
/**
*控制器启动方法
*/
voidstart();
/**
*获取发送模版
*/
EventTemplategetEopEventTemplate();
/**
*绑定消费程序到对应的exchange和queue
*/
EventControlleradd(StringqueueName,StringexchangeName,EventProcessereventProcesser);
/*inmap,thekeyisqueuename,butvalueisexchangename*/
EventControlleradd(Mapbindings,EventProcessereventProcesser);
}
它的实现类如下:
/** *和rabbitmq通信的控制器,主要负责: *1、和rabbitmq建立连接
*2、声明exChange和queue以及它们的绑定关系
*3、启动消息监听容器,并将不同消息的处理者绑定到对应的exchange和queue上
*4、持有消息发送模版以及所有exchange、queue和绑定关系的本地缓存
*@authoryangyong * */ publicclassDefaultEventControllerimplementsEventController{ privateCachingConnectionFactoryrabbitConnectionFactory; privateEventControlConfigconfig; privateRabbitAdminrabbitAdmin; privateCodecFactorydefaultCodecFactory=newHessionCodecFactory(); privateSimpleMessageListenerContainermsgListenerContainer;//rabbitMQmsglistenercontainer privateMessageAdapterHandlermsgAdapterHandler=newMessageAdapterHandler(); privateMessageConverterserializerMessageConverter=newSerializerMessageConverter();//直接指定 //queuecache,keyisexchangeName privateMapexchanges=newHashMap (); //queuecache,keyisqueueName privateMap queues=newHashMap (); //bindrelationofqueuetoexchangecache,valueisexchangeName|queueName privateSet binded=newHashSet (); privateEventTemplateeventTemplate;//给App使用的Event发送客户端 privateAtomicBooleanisStarted=newAtomicBoolean(false); privatestaticDefaultEventControllerdefaultEventController; publicsynchronizedstaticDefaultEventControllergetInstance(EventControlConfigconfig){ if(defaultEventController==null){ defaultEventController=newDefaultEventController(config); } returndefaultEventController; } privateDefaultEventController(EventControlConfigconfig){ if(config==null){ thrownewIllegalArgumentException("Configcannotbenull."); } this.config=config; initRabbitConnectionFactory(); //初始化AmqpAdmin rabbitAdmin=newRabbitAdmin(rabbitConnectionFactory); //初始化RabbitTemplate RabbitTemplaterabbitTemplate=newRabbitTemplate(rabbitConnectionFactory); rabbitTemplate.setMessageConverter(serializerMessageConverter); eventTemplate=newDefaultEventTemplate(rabbitTemplate,defaultCodecFactory,this); } /** *初始化rabbitmq连接 */ privatevoidinitRabbitConnectionFactory(){ rabbitConnectionFactory=newCachingConnectionFactory(); rabbitConnectionFactory.setHost(config.getServerHost()); rabbitConnectionFactory.setChannelCacheSize(config.getEventMsgProcessNum()); rabbitConnectionFactory.setPort(config.getPort()); rabbitConnectionFactory.setUsername(config.getUsername()); rabbitConnectionFactory.setPassword(config.getPassword()); if(!StringUtils.isEmpty(config.getVirtualHost())){ rabbitConnectionFactory.setVirtualHost(config.getVirtualHost()); } } /** *注销程序 */ publicsynchronizedvoiddestroy()throwsException{ if(!isStarted.get()){ return; } msgListenerContainer.stop(); eventTemplate=null; rabbitAdmin=null; rabbitConnectionFactory.destroy(); } @Override publicvoidstart(){ if(isStarted.get()){ return; } Set mapping=msgAdapterHandler.getAllBinding(); for(Stringrelation:mapping){ String[]relaArr=relation.split("\\|"); declareBinding(relaArr[1],relaArr[0]); } initMsgListenerAdapter(); isStarted.set(true); } /** *初始化消息监听器容器 */ privatevoidinitMsgListenerAdapter(){ MessageListenerlistener=newMessageListenerAdapter(msgAdapterHandler,serializerMessageConverter); msgListenerContainer=newSimpleMessageListenerContainer(); msgListenerContainer.setConnectionFactory(rabbitConnectionFactory); msgListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); msgListenerContainer.setMessageListener(listener); msgListenerContainer.setErrorHandler(newMessageErrorHandler()); msgListenerContainer.setPrefetchCount(config.getPrefetchSize());//设置每个消费者消息的预取值 msgListenerContainer.setConcurrentConsumers(config.getEventMsgProcessNum()); msgListenerContainer.setTxSize(config.getPrefetchSize());//设置有事务时处理的消息数 msgListenerContainer.setQueues(queues.values().toArray(newQueue[queues.size()])); msgListenerContainer.start(); } @Override publicEventTemplategetEopEventTemplate(){ returneventTemplate; } @Override publicEventControlleradd(StringqueueName,StringexchangeName,EventProcessereventProcesser){ returnadd(queueName,exchangeName,eventProcesser,defaultCodecFactory); } publicEventControlleradd(StringqueueName,StringexchangeName,EventProcessereventProcesser,CodecFactorycodecFactory){ msgAdapterHandler.add(queueName,exchangeName,eventProcesser,defaultCodecFactory); if(isStarted.get()){ initMsgListenerAdapter(); } returnthis; } @Override publicEventControlleradd(Map bindings, EventProcessereventProcesser){ returnadd(bindings,eventProcesser,defaultCodecFactory); } publicEventControlleradd(Map bindings, EventProcessereventProcesser,CodecFactorycodecFactory){ for(Map.Entry item:bindings.entrySet()) msgAdapterHandler.add(item.getKey(),item.getValue(),eventProcesser,codecFactory); returnthis; } /** *exchange和queue是否已经绑定 */ protectedbooleanbeBinded(StringexchangeName,StringqueueName){ returnbinded.contains(exchangeName+"|"+queueName); } /** *声明exchange和queue已经它们的绑定关系 */ protectedsynchronizedvoiddeclareBinding(StringexchangeName,StringqueueName){ StringbindRelation=exchangeName+"|"+queueName; if(binded.contains(bindRelation))return; booleanneedBinding=false; DirectExchangedirectExchange=exchanges.get(exchangeName); if(directExchange==null){ directExchange=newDirectExchange(exchangeName,true,false,null); exchanges.put(exchangeName,directExchange); rabbitAdmin.declareExchange(directExchange);//声明exchange needBinding=true; } Queuequeue=queues.get(queueName); if(queue==null){ queue=newQueue(queueName,true,false,false); queues.put(queueName,queue); rabbitAdmin.declareQueue(queue); //声明queue needBinding=true; } if(needBinding){ Bindingbinding=BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange rabbitAdmin.declareBinding(binding);//声明绑定关系 binded.add(bindRelation); } } }
搞定,现在可以将DefaultEventTemplate里的注释去掉了,接下来最后完成单元测试,为了测试传递对象,建立一个PO
@SuppressWarnings("serial")
publicclassPeopleimplementsSerializable{
privateintid;
privateStringname;
privatebooleanmale;
privatePeoplespouse;
privateListfriends;
publicintgetId(){
returnid;
}
publicvoidsetId(intid){
this.id=id;
}
publicStringgetName(){
returnname;
}
publicvoidsetName(Stringname){
this.name=name;
}
publicbooleanisMale(){
returnmale;
}
publicvoidsetMale(booleanmale){
this.male=male;
}
publicPeoplegetSpouse(){
returnspouse;
}
publicvoidsetSpouse(Peoplespouse){
this.spouse=spouse;
}
publicListgetFriends(){
returnfriends;
}
publicvoidsetFriends(Listfriends){
this.friends=friends;
}
@Override
publicStringtoString(){
//TODOAuto-generatedmethodstub
return"People[id="+id+",name="+name+",male="+male+"]";
}
}
建立单元测试
publicclassRabbitMqTest{
privateStringdefaultHost="127.0.0.1";
privateStringdefaultExchange="EXCHANGE_DIRECT_TEST";
privateStringdefaultQueue="QUEUE_TEST";
privateDefaultEventControllercontroller;
privateEventTemplateeventTemplate;
@Before
publicvoidinit()throwsIOException{
EventControlConfigconfig=newEventControlConfig(defaultHost);
controller=DefaultEventController.getInstance(config);
eventTemplate=controller.getEopEventTemplate();
controller.add(defaultQueue,defaultExchange,newApiProcessEventProcessor());
controller.start();
}
@Test
publicvoidsendString()throwsSendRefuseException{
eventTemplate.send(defaultQueue,defaultExchange,"helloworld");
}
@Test
publicvoidsendObject()throwsSendRefuseException{
eventTemplate.send(defaultQueue,defaultExchange,mockObj());
}
@Test
publicvoidsendTemp()throwsSendRefuseException,InterruptedException{
StringtempExchange="EXCHANGE_DIRECT_TEST_TEMP";//以前未声明的exchange
StringtempQueue="QUEUE_TEST_TEMP";//以前未声明的queue
eventTemplate.send(tempQueue,tempExchange,mockObj());
//发送成功后此时不会接受到消息,还需要绑定对应的消费程序
controller.add(tempQueue,tempExchange,newApiProcessEventProcessor());
}
@After
publicvoidend()throwsInterruptedException{
Thread.sleep(2000);
}
privatePeoplemockObj(){
Peoplejack=newPeople();
jack.setId(1);
jack.setName("JACK");
jack.setMale(true);
Listfriends=newArrayList<>();
friends.add(jack);
PeoplehanMeiMei=newPeople();
hanMeiMei.setId(1);
hanMeiMei.setName("韩梅梅");
hanMeiMei.setMale(false);
hanMeiMei.setFriends(friends);
PeopleliLei=newPeople();
liLei.setId(2);
liLei.setName("李雷");
liLei.setMale(true);
liLei.setFriends(friends);
liLei.setSpouse(hanMeiMei);
hanMeiMei.setSpouse(liLei);
returnhanMeiMei;
}
classApiProcessEventProcessorimplementsEventProcesser{
@Override
publicvoidprocess(Objecte){//消费程序这里只是打印信息
Assert.assertNotNull(e);
System.out.println(e);
if(einstanceofPeople){
Peoplepeople=(People)e;
System.out.println(people.getSpouse());
System.out.println(people.getFriends());
}
}
}
}
源码地址请点击这里
以上就是本文关于java实现rabbitmq消息的发送接受的全部内容,希望对大家有所帮助。
感谢大家对本站的支持。