在业务里netty的使用场景有哪些小伙伴们知道吗?它要怎么实现?本篇文章就带小伙伴们详细了解下。
Netty实现
一、服务端实现
package com.test.thread.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; // Constant里面用到的几个参数自己可以直接写死 import com.test.thread.utils.Constant; /** * tcp/ip 服务端用netty实现 * @author zhb * */ public class ServerNetty { private int port; public ServerNetty(int port) { this.port = port; } // netty 服务端启动 public void action() throws InterruptedException { // 用来接收进来的连接 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用来处理已经被接收的连接,一旦bossGroup接收到连接,就会把连接信息注册到workerGroup上 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // nio服务的启动类 ServerBootstrap sbs = new ServerBootstrap(); // 配置nio服务参数 sbs.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // 说明一个新的Channel如何接收进来的连接 .option(ChannelOption.SO_BACKLOG, 128) // tcp最大缓存链接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) //保持连接 .handler(new LoggingHandler(LogLevel.INFO)) // 打印日志级别 .childHandler(new ChannelInitializer < SocketChannel > () { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // marshalling 序列化对象的解码 // socketChannel.pipeline().addLast(MarshallingCodefactory.buildDecoder()); // marshalling 序列化对象的编码 // socketChannel.pipeline().addLast(MarshallingCodefactory.buildEncoder()); // 网络超时时间 // socketChannel.pipeline().addLast(new ReadTimeoutHandler(5)); // 处理接收到的请求 socketChannel.pipeline() .addLast(new ServerHandler()); // 这里相当于过滤器,可以配置多个 } }); System.err.println("server 开启--------------"); // 绑定端口,开始接受链接 ChannelFuture cf = sbs.bind(port) .sync(); // 开多个端口 // ChannelFuture cf2 = sbs.bind(3333).sync(); // cf2.channel().closeFuture().sync(); // 等待服务端口的关闭;在这个例子中不会发生,但你可以优雅实现;关闭你的服务 cf.channel() .closeFuture() .sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } // 开启netty服务线程 public static void main(String[] args) throws InterruptedException { new ServerNetty(Constant.serverSocketPort) .action(); } /** * * 解决数据传输中中的拆包和粘包问题方案: * * 一 . 用特定字符当做分隔符,例如:$_ * (1) 将下列代码添加到 initChannel方法内 //将双方约定好的分隔符转成buf ByteBuf bb = Unpooled.copiedBuffer("$_".getBytes(Constant.charset)); socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, bb)); //将接收到信息进行解码,可以直接把msg转成字符串 socketChannel.pipeline().addLast(new StringDecoder()); (2) 在 ServerHandler中的 channelRead方法中应该替换内容为 // 如果把msg直接转成字符串,必须在服务中心添加 socketChannel.pipeline().addLast(new StringDecoder()); String reqStr = (String)msg; System.err.println("server 接收到请求信息是:"+reqStr); String respStr = new StringBuilder("来自服务器的响应").append(reqStr).append("$_").toString(); // 返回给客户端响应 ctx.writeAndFlush(Unpooled.copiedBuffer(respStr.getBytes())); (3) 因为分隔符是双方约定好的,在ClientNetty和channelRead中也应该有响应的操作 二. 双方约定好是定长报文 // 双方约定好定长报文为6,长度不足时服务端会一直等待直到6个字符,所以客户端不足6个字符时用空格补充;其余操作,参考分隔符的情况 socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(6)); 三. 请求分为请求头和请求体,请求头放的是请求体的长度;一般生产上常用的 (1)通信双方约定好报文头的长度,先截取改长度, (2)根据报文头的长度读取报文体的内容 * * */ }
package com.test.thread.netty; import java.io.UnsupportedEncodingException; import com.test.thread.utils.Constant; import com.test.thread.xlh.Student; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; /** * 处理某个客户端的请求 * @author zhb */ public class ServerHandler extends ChannelInboundHandlerAdapter { // 读取数据 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 普通的处理 及过滤器不多 simpleRead(ctx, msg); // 有分隔符处理信息 // Delimiterread(ctx, msg); } /** * 最简单的处理 * @param ctx * @param msg * @throws UnsupportedEncodingException */ public void simpleRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException { ByteBuf bb = (ByteBuf) msg; // 创建一个和buf同等长度的字节数组 byte[] reqByte = new byte[bb.readableBytes()]; // 将buf中的数据读取到数组中 bb.readBytes(reqByte); String reqStr = new String(reqByte, Constant.charset); System.err.println("server 接收到客户端的请求: " + reqStr); String respStr = new StringBuilder("来自服务器的响应") .append(reqStr) .append("$_") .toString(); // 返回给客户端响应 和客户端链接中断即短连接,当信息返回给客户端后中断 ctx.writeAndFlush(Unpooled.copiedBuffer(respStr.getBytes())); //.addListener(ChannelFutureListener.CLOSE); // 有了写操作(writeAndFlush)下面就不用释放msg // ReferenceCountUtil.release(msg); } /** * 有分隔符的请求信息分包情况处理,包含了转码 * @param ctx * @param msg */ private void Delimiterread(ChannelHandlerContext ctx, Object msg) { // 如果把msg直接转成字符串,必须在服务中心添加 socketChannel.pipeline().addLast(new StringDecoder()); String reqStr = (String) msg; System.err.println("server 接收到请求信息是:" + reqStr); String respStr = new StringBuilder("来自服务器的响应") .append(reqStr) .append("$_") .toString(); // 返回给客户端响应 和客户端链接中断即短连接,当信息返回给客户端后中断 ctx.writeAndFlush(Unpooled.copiedBuffer(respStr.getBytes())) .addListener(ChannelFutureListener.CLOSE); } // 数据读取完毕的处理 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.err.println("服务端读取数据完毕"); } // 出现异常的处理 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.err.println("server 读取数据出现异常"); ctx.close(); } /** * 将请求信息直接转成对象 * @param ctx * @param msg */ private void handlerObject(ChannelHandlerContext ctx, Object msg) { // 需要序列化 直接把msg转成对象信息,一般不会用,可以用json字符串在不同语言中传递信息 Student student = (Student) msg; System.err.println("server 获取信息:" + student.getId() + student.getName()); student.setName("李四"); ctx.write(student); } }
package com.test.thread.netty; import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; /** * 使用netty 把信息序列化成对象时使用 * @author zhb */ public class MarshallingCodefactory { /** * 创建 marshalling 解码器 * @return */ public static MarshallingDecoder buildDecoder() { // 先通过marshalling的工具类提供的方法实例化marshalling对象,参数serial是创建Java序列化工厂对象 serial MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial"); // 创建MarshallingConfiguration对象,设置版本为5 final MarshallingConfiguration config = new MarshallingConfiguration(); config.setVersion(5); // 根据 marshalling 的factory 和 config创建 provider; UnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, config); // 构建netty的MarshallingDecoder对象,两个参数分别是provide和单个信息序列后的最大长度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1204 * 1); return decoder; } /** * 创建 marshalling 编码器 * @return */ public static MarshallingEncoder buildEncoder() { // 先通过marshalling的工具类提供的方法实例化marshalling对象,参数serial是创建Java序列化工厂对象 final MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial"); // 创建MarshallingConfiguration对象,设置版本为5 final MarshallingConfiguration config = new MarshallingConfiguration(); config.setVersion(5); // 根据 marshalling 的factory 和 config创建 provider; MarshallerProvider provider = new DefaultMarshallerProvider(factory, config); // 构建netty 的编码对象, 用于实现序列化接口的pojo对象序列化为二进制数组 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
二、客户端实现
package com.test.thread.netty; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.io.UnsupportedEncodingException; import com.test.thread.utils.Constant; /** * 客户端发送请求 * @author zhb * */ public class ClientNetty { // 要请求的服务器的ip地址 private String ip; // 服务器的端口 private int port; public ClientNetty(String ip, int port) { this.ip = ip; this.port = port; } // 请求端主题 private void action() throws InterruptedException, UnsupportedEncodingException { EventLoopGroup bossGroup = new NioEventLoopGroup(); Bootstrap bs = new Bootstrap(); bs.group(bossGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer < SocketChannel > () { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // marshalling 序列化对象的解码 // socketChannel.pipeline().addLast(MarshallingCodefactory.buildDecoder()); // marshalling 序列化对象的编码 // socketChannel.pipeline().addLast(MarshallingCodefactory.buildEncoder()); // 处理来自服务端的响应信息 socketChannel.pipeline() .addLast(new ClientHandler()); } }); // 客户端开启 ChannelFuture cf = bs.connect(ip, port) .sync(); String reqStr = "我是客户端请求1$_"; // 发送客户端的请求 cf.channel() .writeAndFlush(Unpooled.copiedBuffer(reqStr.getBytes(Constant.charset))); // Thread.sleep(300); // cf.channel().writeAndFlush(Unpooled.copiedBuffer("我是客户端请求2$_---".getBytes(Constant.charset))); // Thread.sleep(300); // cf.channel().writeAndFlush(Unpooled.copiedBuffer("我是客户端请求3$_".getBytes(Constant.charset))); // Student student = new Student(); // student.setId(3); // student.setName("张三"); // cf.channel().writeAndFlush(student); // 等待直到连接中断 cf.channel() .closeFuture() .sync(); } public static void main(String[] args) throws UnsupportedEncodingException, InterruptedException { new ClientNetty("127.0.0.1", Constant.serverSocketPort) .action(); } }
package com.test.thread.netty; import com.test.thread.utils.Constant; import com.test.thread.xlh.Student; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; /** * 读取服务器返回的响应信息 * @author zhb * */ public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ByteBuf bb = (ByteBuf) msg; byte[] respByte = new byte[bb.readableBytes()]; bb.readBytes(respByte); String respStr = new String(respByte, Constant.charset); System.err.println("client--收到响应:" + respStr); // 直接转成对象 // handlerObject(ctx, msg); } finally { // 必须释放msg数据 ReferenceCountUtil.release(msg); } } private void handlerObject(ChannelHandlerContext ctx, Object msg) { Student student = (Student) msg; System.err.println("server 获取信息:" + student.getId() + student.getName()); } // 数据读取完毕的处理 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.err.println("客户端读取数据完毕"); } // 出现异常的处理 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.err.println("client 读取数据出现异常"); ctx.close(); } }
使用场景
--构建高性能、低时延的各种 Java 中间件,如 MQ、分布式服务框架、ESB 消息总线等,Netty 主要作为基础通信框架提供高性能、低时延的通信服务;
--公有或者私有协议栈的基础通信框架,如可以基于 Netty 构建异步、高性能的 WebSocket 协议栈;
--各领域应用,例如大数据、游戏等,Netty 作为高性能的通信框架用于内部各模块的数据分发、传输和汇总等,实现模块之间高性能通信。
以上就是本篇文章的所有内容,小伙伴们知道怎么实现netty了吗?还想了解更多java架构师详细内容的话就来关注我们吧。
推荐阅读: