业务中netty的使用场景有哪些?怎么实现netty?

TheDisguiser 2020-08-01 16:07:00 java常见问答 10053

在业务里netty的使用场景有哪些小伙伴们知道吗?它要怎么实现?本篇文章就带小伙伴们详细了解下。

Netty实现

一、服务端实现

package com.test.thread.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
// Constant里面用到的几个参数自己可以直接写死
import com.test.thread.utils.Constant;
/**
 * tcp/ip 服务端用netty实现
 * @author zhb
 *
 */
public class ServerNetty
{
    private int port;
    public ServerNetty(int port)
    {
        this.port = port;
    }
    // netty 服务端启动
    public void action() throws InterruptedException
    {
        // 用来接收进来的连接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 用来处理已经被接收的连接,一旦bossGroup接收到连接,就会把连接信息注册到workerGroup上
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try
        {
            // nio服务的启动类
            ServerBootstrap sbs = new ServerBootstrap();
            // 配置nio服务参数
            sbs.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class) // 说明一个新的Channel如何接收进来的连接
                .option(ChannelOption.SO_BACKLOG, 128) // tcp最大缓存链接个数
                .childOption(ChannelOption.SO_KEEPALIVE, true) //保持连接
                .handler(new LoggingHandler(LogLevel.INFO)) // 打印日志级别
                .childHandler(new ChannelInitializer < SocketChannel > ()
                {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception
                    {
                        // marshalling 序列化对象的解码
                        //                      socketChannel.pipeline().addLast(MarshallingCodefactory.buildDecoder());
                        // marshalling 序列化对象的编码
                        //                      socketChannel.pipeline().addLast(MarshallingCodefactory.buildEncoder());
                        // 网络超时时间
                        //                      socketChannel.pipeline().addLast(new ReadTimeoutHandler(5));
                        // 处理接收到的请求
                        socketChannel.pipeline()
                            .addLast(new ServerHandler()); // 这里相当于过滤器,可以配置多个
                    }
                });
            System.err.println("server 开启--------------");
            // 绑定端口,开始接受链接
            ChannelFuture cf = sbs.bind(port)
                .sync();
            // 开多个端口
            //          ChannelFuture cf2 = sbs.bind(3333).sync();
            //          cf2.channel().closeFuture().sync();
            // 等待服务端口的关闭;在这个例子中不会发生,但你可以优雅实现;关闭你的服务
            cf.channel()
                .closeFuture()
                .sync();
        }
        finally
        {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    // 开启netty服务线程
    public static void main(String[] args) throws InterruptedException
    {
        new ServerNetty(Constant.serverSocketPort)
            .action();
    }
    /**
     * 
     * 解决数据传输中中的拆包和粘包问题方案:
     * 
     * 一 . 用特定字符当做分隔符,例如:$_ 
     *  (1) 将下列代码添加到 initChannel方法内
        //将双方约定好的分隔符转成buf
        ByteBuf bb = Unpooled.copiedBuffer("$_".getBytes(Constant.charset));
        socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, bb));
        //将接收到信息进行解码,可以直接把msg转成字符串
        socketChannel.pipeline().addLast(new StringDecoder());
        
        (2) 在 ServerHandler中的 channelRead方法中应该替换内容为
        // 如果把msg直接转成字符串,必须在服务中心添加 socketChannel.pipeline().addLast(new StringDecoder());
        String reqStr = (String)msg;
        System.err.println("server 接收到请求信息是:"+reqStr);
        String respStr = new StringBuilder("来自服务器的响应").append(reqStr).append("$_").toString();
        // 返回给客户端响应
        ctx.writeAndFlush(Unpooled.copiedBuffer(respStr.getBytes()));
        
        (3) 因为分隔符是双方约定好的,在ClientNetty和channelRead中也应该有响应的操作
                
        
     二. 双方约定好是定长报文   
         // 双方约定好定长报文为6,长度不足时服务端会一直等待直到6个字符,所以客户端不足6个字符时用空格补充;其余操作,参考分隔符的情况 
         socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(6));
         
         
    三. 请求分为请求头和请求体,请求头放的是请求体的长度;一般生产上常用的
      
          (1)通信双方约定好报文头的长度,先截取改长度,
          (2)根据报文头的长度读取报文体的内容
     * 
     * 
     */
}
package com.test.thread.netty;
import java.io.UnsupportedEncodingException;
import com.test.thread.utils.Constant;
import com.test.thread.xlh.Student;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/**
 * 处理某个客户端的请求
 * @author zhb
 */
public class ServerHandler extends ChannelInboundHandlerAdapter
{
    // 读取数据
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
    {
        // 普通的处理 及过滤器不多
        simpleRead(ctx, msg);
        // 有分隔符处理信息
        //      Delimiterread(ctx, msg);
    }
    /**
     * 最简单的处理
     * @param ctx
     * @param msg
     * @throws UnsupportedEncodingException
     */
    public void simpleRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException
    {
        ByteBuf bb = (ByteBuf) msg;
        // 创建一个和buf同等长度的字节数组
        byte[] reqByte = new byte[bb.readableBytes()];
        // 将buf中的数据读取到数组中
        bb.readBytes(reqByte);
        String reqStr = new String(reqByte, Constant.charset);
        System.err.println("server 接收到客户端的请求: " + reqStr);
        String respStr = new StringBuilder("来自服务器的响应")
            .append(reqStr)
            .append("$_")
            .toString();
        // 返回给客户端响应                                                                                                                                                       和客户端链接中断即短连接,当信息返回给客户端后中断
        ctx.writeAndFlush(Unpooled.copiedBuffer(respStr.getBytes())); //.addListener(ChannelFutureListener.CLOSE);
        // 有了写操作(writeAndFlush)下面就不用释放msg
        //      ReferenceCountUtil.release(msg);
    }
    /**
     * 有分隔符的请求信息分包情况处理,包含了转码
     * @param ctx
     * @param msg
     */
    private void Delimiterread(ChannelHandlerContext ctx, Object msg)
    {
        // 如果把msg直接转成字符串,必须在服务中心添加 socketChannel.pipeline().addLast(new StringDecoder());
        String reqStr = (String) msg;
        System.err.println("server 接收到请求信息是:" + reqStr);
        String respStr = new StringBuilder("来自服务器的响应")
            .append(reqStr)
            .append("$_")
            .toString();
        // 返回给客户端响应                                                                                                                                                       和客户端链接中断即短连接,当信息返回给客户端后中断
        ctx.writeAndFlush(Unpooled.copiedBuffer(respStr.getBytes()))
            .addListener(ChannelFutureListener.CLOSE);
    }
    // 数据读取完毕的处理
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
    {
        System.err.println("服务端读取数据完毕");
    }
    // 出现异常的处理
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        System.err.println("server 读取数据出现异常");
        ctx.close();
    }
    /**
     * 将请求信息直接转成对象
     * @param ctx
     * @param msg
     */
    private void handlerObject(ChannelHandlerContext ctx, Object msg)
    {
        // 需要序列化 直接把msg转成对象信息,一般不会用,可以用json字符串在不同语言中传递信息
        Student student = (Student) msg;
        System.err.println("server 获取信息:" + student.getId() + student.getName());
        student.setName("李四");
        ctx.write(student);
    }
}
package com.test.thread.netty;
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
/**
 * 使用netty 把信息序列化成对象时使用
 * @author zhb
 */
public class MarshallingCodefactory
{
    /**
     * 创建 marshalling 解码器
     * @return
     */
    public static MarshallingDecoder buildDecoder()
    {
        // 先通过marshalling的工具类提供的方法实例化marshalling对象,参数serial是创建Java序列化工厂对象 serial
        MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
        // 创建MarshallingConfiguration对象,设置版本为5
        final MarshallingConfiguration config = new MarshallingConfiguration();
        config.setVersion(5);
        // 根据 marshalling 的factory 和 config创建 provider;
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, config);
        // 构建netty的MarshallingDecoder对象,两个参数分别是provide和单个信息序列后的最大长度
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1204 * 1);
        return decoder;
    }
    /**
     * 创建 marshalling 编码器
     * @return
     */
    public static MarshallingEncoder buildEncoder()
    {
        // 先通过marshalling的工具类提供的方法实例化marshalling对象,参数serial是创建Java序列化工厂对象
        final MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
        // 创建MarshallingConfiguration对象,设置版本为5
        final MarshallingConfiguration config = new MarshallingConfiguration();
        config.setVersion(5);
        // 根据 marshalling 的factory 和 config创建 provider;
        MarshallerProvider provider = new DefaultMarshallerProvider(factory, config);
        // 构建netty 的编码对象, 用于实现序列化接口的pojo对象序列化为二进制数组
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}

二、客户端实现

package com.test.thread.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.UnsupportedEncodingException;
import com.test.thread.utils.Constant;
/**
 * 客户端发送请求
 * @author zhb
 *
 */
public class ClientNetty
{
    // 要请求的服务器的ip地址
    private String ip;
    // 服务器的端口
    private int port;
    public ClientNetty(String ip, int port)
    {
        this.ip = ip;
        this.port = port;
    }
    // 请求端主题
    private void action() throws InterruptedException, UnsupportedEncodingException
    {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        Bootstrap bs = new Bootstrap();
        bs.group(bossGroup)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .handler(new ChannelInitializer < SocketChannel > ()
            {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception
                {
                    // marshalling 序列化对象的解码
                    //                  socketChannel.pipeline().addLast(MarshallingCodefactory.buildDecoder());
                    // marshalling 序列化对象的编码
                    //                  socketChannel.pipeline().addLast(MarshallingCodefactory.buildEncoder());
                    // 处理来自服务端的响应信息
                    socketChannel.pipeline()
                        .addLast(new ClientHandler());
                }
            });
        // 客户端开启
        ChannelFuture cf = bs.connect(ip, port)
            .sync();
        String reqStr = "我是客户端请求1$_";
        // 发送客户端的请求
        cf.channel()
            .writeAndFlush(Unpooled.copiedBuffer(reqStr.getBytes(Constant.charset)));
        //      Thread.sleep(300);
        //      cf.channel().writeAndFlush(Unpooled.copiedBuffer("我是客户端请求2$_---".getBytes(Constant.charset)));
        //      Thread.sleep(300);
        //      cf.channel().writeAndFlush(Unpooled.copiedBuffer("我是客户端请求3$_".getBytes(Constant.charset)));
        //      Student student = new Student();
        //      student.setId(3);
        //      student.setName("张三");
        //      cf.channel().writeAndFlush(student);
        // 等待直到连接中断
        cf.channel()
            .closeFuture()
            .sync();
    }
    public static void main(String[] args) throws UnsupportedEncodingException, InterruptedException
    {
        new ClientNetty("127.0.0.1", Constant.serverSocketPort)
            .action();
    }
}
package com.test.thread.netty;
import com.test.thread.utils.Constant;
import com.test.thread.xlh.Student;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/**
 * 读取服务器返回的响应信息
 * @author zhb
 *
 */
public class ClientHandler extends ChannelInboundHandlerAdapter
{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
    {
        try
        {
            ByteBuf bb = (ByteBuf) msg;
            byte[] respByte = new byte[bb.readableBytes()];
            bb.readBytes(respByte);
            String respStr = new String(respByte, Constant.charset);
            System.err.println("client--收到响应:" + respStr);
            // 直接转成对象
            //          handlerObject(ctx, msg);
        }
        finally
        {
            // 必须释放msg数据
            ReferenceCountUtil.release(msg);
        }
    }
    private void handlerObject(ChannelHandlerContext ctx, Object msg)
    {
        Student student = (Student) msg;
        System.err.println("server 获取信息:" + student.getId() + student.getName());
    }
    // 数据读取完毕的处理
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
    {
        System.err.println("客户端读取数据完毕");
    }
    // 出现异常的处理
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        System.err.println("client 读取数据出现异常");
        ctx.close();
    }
}

使用场景

--构建高性能、低时延的各种 Java 中间件,如 MQ、分布式服务框架、ESB 消息总线等,Netty 主要作为基础通信框架提供高性能、低时延的通信服务;

--公有或者私有协议栈的基础通信框架,如可以基于 Netty 构建异步、高性能的 WebSocket 协议栈;

--各领域应用,例如大数据、游戏等,Netty 作为高性能的通信框架用于内部各模块的数据分发、传输和汇总等,实现模块之间高性能通信。

以上就是本篇文章的所有内容,小伙伴们知道怎么实现netty了吗?还想了解更多java架构师详细内容的话就来关注我们吧。

推荐阅读:

netty框架面试题及答案解析

netty可以干什么?有什么特点?

netty有必要学吗?有什么优点?