Netty Io

Netty Io

Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

前置历史 #

网络的处理分为两类,一类是Blocking IO(BIO, 阻塞式IO)和NO-blocking IO(NIO,非阻塞式IO)。 当然是NIO的CPU利用率要高一些。

关于JAVA 原生NIO的介绍,可以看这一篇文章: https://mp.weixin.qq.com/s/GfV9w2B0mbT7PmeBS45xLw?spm=a2c6h.12873639.0.0.53064a61jNPjIo

非阻塞式IO一般只用于网络IO,其核心的概念有三个:Buffer、channel、selector。

  • Buffer: 用于缓存网络IO的数据。
  • Channel: 是与Buffer一起使用的数据传输通道,本身不存储数据。
  • selector: 选择器可以说是NIO的核心组件,它可以监听通道的状态,来实现异步非阻塞的IO。

但是原生的NIO元素过多,并不方便编写代码,Netty是以NIO为基础的优秀实践。

Netty 使用入门 #

可以参考这篇文章的介绍: https://developer.aliyun.com/article/769587

Netty 在服务端存在两个线程组:BossGroup、WorkerGroup, 前者负责处理链接,后者负责处理业务。有两个大的工厂类:Bootstrap与ServerBootStrap,分别是客户端和服务端的工厂类。

服务端的代码如下: #


public class MyServer {
    public static void main(String[] args) throws Exception {
        //创建两个线程组 boosGroup、workerGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {

            //创建服务端的启动对象,设置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            //设置两个线程组boosGroup和workerGroup
            bootstrap.group(bossGroup, workerGroup)
                //设置服务端通道实现类型    
                .channel(NioServerSocketChannel.class)
                //设置线程队列得到连接个数    
                .option(ChannelOption.SO_BACKLOG, 128)
                //设置保持活动连接状态    
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                //使用匿名内部类的形式初始化通道对象    
                .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //给pipeline管道设置处理器
                            socketChannel.pipeline().addLast(new MyServerHandler());
                        }
                    });//给workerGroup的EventLoop对应的管道设置处理器

            System.out.println("java技术爱好者的服务端已经准备就绪...");
            //绑定端口号,启动服务端
            ChannelFuture channelFuture = bootstrap.bind(6666).sync();
            //对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();

        } finally {

            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}


// 对应的MyServerHandler

/**
 * 自定义的Handler需要继承Netty规定好的HandlerAdapter
 * 才能被Netty框架所关联,有点类似SpringMVC的适配器模式
 **/
public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取客户端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //发送消息给客户端
        ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到消息,并给你发送一个问号?", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //发生异常,关闭通道
        ctx.close();
    }
}

以上是一个基础的代码,其中最不好理解的部分就是boostrap中配置的handler。

  1. NioEventLoopGroup is a multithreaded event loop that handles I/O operation. Boss 线程将会建立连接,并交给worker线程来匹配给对应的channel。
  2. NioServerSocketChannel 是诸多种channel的一种实现,一种异步的server端的socket通道。还有很多其他的通道,这里不展开介绍。
  3. ChannelInitializer 是channel的初始化类,一个匿名类,为channel配置了pipline的handler。

客户端代码 #


public class MyClient {

    public static void main(String[] args) throws Exception {
        // 事件轮询线程
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            //创建bootstrap对象,配置参数
            Bootstrap bootstrap = new Bootstrap();
            //设置线程组
            bootstrap.group(eventExecutors)
                //设置客户端的通道实现类型    
                .channel(NioSocketChannel.class)
                //使用匿名内部类初始化通道
                .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //添加客户端通道的处理器
                            ch.pipeline().addLast(new MyClientHandler());
                        }
                    });
            System.out.println("客户端准备就绪,随时可以起飞~");
            //连接服务端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
            //对通道关闭进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            //关闭线程组
            eventExecutors.shutdownGracefully();
        }
    }
}



public class MyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //发送消息到服务端
        ctx.writeAndFlush(Unpooled.copiedBuffer("歪比巴卜~茉莉~Are you good~马来西亚~", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收服务端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }
}

以上是简单的server和client的处理。关于Netty更底层的设计逻辑,下一个小结来讨论。

底层逻辑与流程图 #

// TODO