Springboot+Netty+Websocket实现消息推送实例
前言
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocketAPI中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
Netty框架的优势
1.API使用简单,开发门槛低;
2.功能强大,预置了多种编解码功能,支持多种主流协议;
3.定制能力强,可以通过ChannelHandler对通信框架进行灵活地扩展;
4.性能高,通过与其他业界主流的NIO框架对比,Netty的综合性能最优;
5.成熟、稳定,Netty修复了已经发现的所有JDKNIOBUG,业务开发人员不需要再为NIO的BUG而烦恼
提示:以下是本篇文章正文内容,下面案例可供参考
一、引入netty依赖
io.netty netty-all 4.1.48.Final
二、使用步骤
1.引入基础配置类
packagecom.test.netty; publicenumCmd{ START("000","连接成功"), WMESSAGE("001","消息提醒"), ; privateStringcmd; privateStringdesc; Cmd(Stringcmd,Stringdesc){ this.cmd=cmd; this.desc=desc; } publicStringgetCmd(){ returncmd; } publicStringgetDesc(){ returndesc; } }
2.netty服务启动监听器
packagecom.test.netty; importio.netty.bootstrap.ServerBootstrap; importio.netty.channel.ChannelFuture; importio.netty.channel.ChannelOption; importio.netty.channel.EventLoopGroup; importio.netty.channel.nio.NioEventLoopGroup; importio.netty.channel.socket.nio.NioServerSocketChannel; importlombok.extern.slf4j.Slf4j; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.beans.factory.annotation.Value; importorg.springframework.boot.ApplicationRunner; importorg.springframework.context.annotation.Bean; importorg.springframework.stereotype.Component; /** *@authortest **服务启动监听器 **/ @Slf4j @Component publicclassNettyServer{ @Value("${server.netty.port}") privateintport; @Autowired privateServerChannelInitializerserverChannelInitializer; @Bean ApplicationRunnernettyRunner(){ returnargs->{ //new一个主线程组 EventLoopGroupbossGroup=newNioEventLoopGroup(1); //new一个工作线程组 EventLoopGroupworkGroup=newNioEventLoopGroup(); ServerBootstrapbootstrap=newServerBootstrap() .group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(serverChannelInitializer) //设置队列大小 .option(ChannelOption.SO_BACKLOG,1024) //两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 .childOption(ChannelOption.SO_KEEPALIVE,true); //绑定端口,开始接收进来的连接 try{ ChannelFuturefuture=bootstrap.bind(port).sync(); log.info("服务器启动开始监听端口:{}",port); future.channel().closeFuture().sync(); }catch(InterruptedExceptione){ e.printStackTrace(); }finally{ //关闭主线程组 bossGroup.shutdownGracefully(); //关闭工作线程组 workGroup.shutdownGracefully(); } }; } }
3.netty服务端处理器
packagecom.test.netty; importcom.test.common.util.JsonUtil; importio.netty.channel.Channel; importio.netty.channel.ChannelHandler; importio.netty.channel.ChannelHandlerContext; importio.netty.channel.SimpleChannelInboundHandler; importio.netty.handler.codec.http.websocketx.TextWebSocketFrame; importio.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; importlombok.Data; importlombok.extern.slf4j.Slf4j; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.stereotype.Component; importjava.net.URLDecoder; importjava.util.*; /** *@authortest **netty服务端处理器 **/ @Slf4j @Component @ChannelHandler.Sharable publicclassNettyServerHandlerextendsSimpleChannelInboundHandler
{ @Autowired privateServerChannelCachecache; privatestaticfinalStringdataKey="test="; @Data publicstaticclassChannelCache{ } /** *客户端连接会触发 */ @Override publicvoidchannelActive(ChannelHandlerContextctx)throwsException{ Channelchannel=ctx.channel(); log.info("通道连接已打开,ID->{}......",channel.id().asLongText()); } @Override publicvoiduserEventTriggered(ChannelHandlerContextctx,Objectevt)throwsException{ if(evtinstanceofWebSocketServerProtocolHandler.HandshakeComplete){ Channelchannel=ctx.channel(); WebSocketServerProtocolHandler.HandshakeCompletehandshakeComplete=(WebSocketServerProtocolHandler.HandshakeComplete)evt; StringrequestUri=handshakeComplete.requestUri(); requestUri=URLDecoder.decode(requestUri,"UTF-8"); log.info("HANDSHAKE_COMPLETE,ID->{},URI->{}",channel.id().asLongText(),requestUri); StringsocketKey=requestUri.substring(requestUri.lastIndexOf(dataKey)+dataKey.length()); if(socketKey.length()>0){ cache.add(socketKey,channel); this.send(channel,Cmd.DOWN_START,null); }else{ channel.disconnect(); ctx.close(); } } super.userEventTriggered(ctx,evt); } @Override publicvoidchannelInactive(ChannelHandlerContextctx)throwsException{ Channelchannel=ctx.channel(); log.info("通道连接已断开,ID->{},用户ID->{}......",channel.id().asLongText(),cache.getCacheId(channel)); cache.remove(channel); } /** *发生异常触发 */ @Override publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{ Channelchannel=ctx.channel(); log.error("连接出现异常,ID->{},用户ID->{},异常->{}......",channel.id().asLongText(),cache.getCacheId(channel),cause.getMessage(),cause); cache.remove(channel); ctx.close(); } /** *客户端发消息会触发 */ @Override protectedvoidchannelRead0(ChannelHandlerContextctx,TextWebSocketFramemsg)throwsException{ try{ //log.info("接收到客户端发送的消息:{}",msg.text()); ctx.channel().writeAndFlush(newTextWebSocketFrame(JsonUtil.toString(Collections.singletonMap("cmd","100")))); }catch(Exceptione){ log.error("消息处理异常:{}",e.getMessage(),e); } } publicvoidsend(Cmdcmd,Stringid,Objectobj){ HashMap channels=cache.get(id); if(channels==null){ return; } Map data=newLinkedHashMap<>(); data.put("cmd",cmd.getCmd()); data.put("data",obj); Stringmsg=JsonUtil.toString(data); log.info("服务器下发消息:{}",msg); channels.values().forEach(channel->{ channel.writeAndFlush(newTextWebSocketFrame(msg)); }); } publicvoidsend(Channelchannel,Cmdcmd,Objectobj){ Map data=newLinkedHashMap<>(); data.put("cmd",cmd.getCmd()); data.put("data",obj); Stringmsg=JsonUtil.toString(data); log.info("服务器下发消息:{}",msg); channel.writeAndFlush(newTextWebSocketFrame(msg)); } }
4.netty服务端缓存类
packagecom.test.netty; importio.netty.channel.Channel; importio.netty.util.AttributeKey; importorg.springframework.stereotype.Component; importjava.util.HashMap; importjava.util.concurrent.ConcurrentHashMap; @Component publicclassServerChannelCache{ privatestaticfinalConcurrentHashMap>CACHE_MAP=newConcurrentHashMap<>(); privatestaticfinalAttributeKey CHANNEL_ATTR_KEY=AttributeKey.valueOf("test"); publicStringgetCacheId(Channelchannel){ returnchannel.attr(CHANNEL_ATTR_KEY).get(); } publicvoidadd(StringcacheId,Channelchannel){ channel.attr(CHANNEL_ATTR_KEY).set(cacheId); HashMap hashMap=CACHE_MAP.get(cacheId); if(hashMap==null){ hashMap=newHashMap<>(); } hashMap.put(channel.id().asShortText(),channel); CACHE_MAP.put(cacheId,hashMap); } publicHashMap get(StringcacheId){ if(cacheId==null){ returnnull; } returnCACHE_MAP.get(cacheId); } publicvoidremove(Channelchannel){ StringcacheId=getCacheId(channel); if(cacheId==null){ return; } HashMap hashMap=CACHE_MAP.get(cacheId); if(hashMap==null){ hashMap=newHashMap<>(); } hashMap.remove(channel.id().asShortText()); CACHE_MAP.put(cacheId,hashMap); } }
5.netty服务初始化器
packagecom.test.netty; importio.netty.channel.ChannelInitializer; importio.netty.channel.ChannelPipeline; importio.netty.channel.socket.SocketChannel; importio.netty.handler.codec.http.HttpObjectAggregator; importio.netty.handler.codec.http.HttpServerCodec; importio.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; importio.netty.handler.stream.ChunkedWriteHandler; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.stereotype.Component; /** *@authortest **netty服务初始化器 **/ @Component publicclassServerChannelInitializerextendsChannelInitializer
{ @Autowired privateNettyServerHandlernettyServerHandler; @Override protectedvoidinitChannel(SocketChannelsocketChannel)throwsException{ ChannelPipelinepipeline=socketChannel.pipeline(); pipeline.addLast(newHttpServerCodec()); pipeline.addLast(newChunkedWriteHandler()); pipeline.addLast(newHttpObjectAggregator(8192)); pipeline.addLast(newWebSocketServerProtocolHandler("/test.io",true,5000)); pipeline.addLast(nettyServerHandler); } }
6.html测试
test functionWebSocketTest() { if("WebSocket"inwindow) { alert("您的浏览器支持WebSocket!"); //打开一个websocket varws=newWebSocket("ws://localhost:port/test.io"); ws.onopen=function() { //WebSocket已连接上,使用send()方法发送数据 ws.send("发送数据"); alert("数据发送中..."); }; ws.onmessage=function(evt) { varreceived_msg=evt.data; alert("数据已接收..."); }; ws.onclose=function() { //关闭websocket alert("连接已关闭..."); }; } else { //浏览器不支持WebSocket alert("您的浏览器不支持WebSocket!"); } } 运行WebSocket
7.vue测试
mounted(){ this.initWebsocket(); }, methods:{ initWebsocket(){ letwebsocket=newWebSocket('ws://localhost:port/test.io?test=123456'); websocket.onmessage=(event)=>{ letmsg=JSON.parse(event.data); switch(msg.cmd){ case"000": this.$message({ type:'success', message:"建立实时连接成功!", duration:1000 }) setInterval(()=>{websocket.send("heartbeat")},60*1000); break; case"001": this.$message.warning("收到一条新的信息,请及时查看!") break; } } websocket.onclose=()=>{ setTimeout(()=>{ this.initWebsocket(); },30*1000); } websocket.onerror=()=>{ setTimeout(()=>{ this.initWebsocket(); },30*1000); } }, }, ![在这里插入图片描述](https://img-blog.csdnimg.cn/20210107160420568.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3d1X3Fpbmdfc29uZw==,size_16,color_FFFFFF,t_70#pic_center)
8.服务器下发消息
@Autowired privateNettyServerHandlernettyServerHandler; nettyServerHandler.send(CmdWeb.WMESSAGE,id,message);
到此这篇关于Springboot+Netty+Websocket实现消息推送实例的文章就介绍到这了,更多相关SpringbootWebsocket消息推送内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!