Netty原理和基础(一)

Netty原理和基础(一)

文章发布于 2020-09-08 16:05:21,最后更新于 2020-09-10 20:05:44

Netty原理和基础(一)

前言

花了三天时间学习和总结netty,内容不难但是知识点挺多的,在学习完代码后,自己也试着写了一个简单的netty,总体来说收获挺多的,对Reactor反应器模式也有了深刻的印象。

一、初识Netty

先贴一个netty的实践案例,让我们了解一下他的功能和代码结构。这个案例的功能十分简单读取客户端的输入数据,直接丢弃,不给客户端任何回复。

1.1 服务器:


public class NettyDiscardServer
{
    private final int serverPort;
    ServerBootstrap b = new ServerBootstrap();

    public NettyDiscardServer(int port)
    {
        this.serverPort = port;
    }

    public void runServer()
    {
        //创建reactor 线程组
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try
        {
            //1 设置reactor 线程组
            b.group(bossLoopGroup, workerLoopGroup);
            //2 设置nio类型的channel
            b.channel(NioServerSocketChannel.class);
            //3 设置监听端口
            b.localAddress(serverPort);
            //4 设置通道的参数
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 装配子通道流水线
            b.childHandler(new ChannelInitializer<SocketChannel>()
            {
                //有连接到达时会创建一个channel
                protected void initChannel(SocketChannel ch) throws Exception
                {
                    // pipeline管理子通道channel中的Handler
                    // 向子channel流水线添加一个handler处理器
                    ch.pipeline().addLast(new NettyDiscardHandler());
                }
            });
            // 6 开始绑定server
            // 通过调用sync同步方法阻塞直到绑定成功
            ChannelFuture channelFuture = b.bind().sync();
            Logger.info(" 服务器启动成功,监听端口: " +
                    channelFuture.channel().localAddress());

            // 7 等待通道关闭的异步任务结束
            // 服务监听通道会一直等待通道关闭的异步任务结束
            ChannelFuture closeFuture = channelFuture.channel().closeFuture();
            closeFuture.sync();
        } catch (Exception e)
        {
            e.printStackTrace();
        } finally
        {
            // 8 优雅关闭EventLoopGroup,
            // 释放掉所有资源包括创建的线程
            workerLoopGroup.shutdownGracefully();
            bossLoopGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws InterruptedException
    {
        int port = NettyDemoConfig.SOCKET_SERVER_PORT;
        new NettyDiscardServer(port).runServer();
    }
}

1.2 处理器

上面是服务器,如果要实现功能就需要处理器来处理因此就出现了业务处理器NettyDiscardHandler

public class NettyDiscardHandler extends ChannelInboundHandlerAdapter
{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
    {

        ByteBuf in = (ByteBuf) msg;
        try
        {
            Logger.info("收到消息,丢弃如下:");
            while (in.isReadable())
            {
                System.out.print((char) in.readByte());
            }
            System.out.println();
        } finally
        {
            ReferenceCountUtil.release(msg);
        }
    }
}

处理器和服务器写完之后就需要一个客户端,这里的客户端,只要能发消息到服务器即可,不需要其他特殊的功能。因此,可以直接使用前面Reactor文章中的EchoClient程序来作为客户端运行即可,因为端口是一致的。到这里你就可以先把程序跑起来,看一下执行流程。

1.3 总结

当然这些代码刚开始我看的时候也一脸懵逼,好多对象、组件都不知道是干什么的,什么用。不过如果你对Reactor反应器模式很熟悉的话,这些代码会被你刨析的很清楚,接下来跟着我的思路一起刨析这些代码吧。

二、回顾反应器模式

反应器模式前面已经讲过了这里就做一个简单的复习。整个流程分为四个步骤。

  1. 第一步: 通道注册

    IO源于通道,IO和通道是强相关的,一个IO一定是属于一个通道的。如果要查询通道的事件,就需要把通道注册到选择器中,这样选择器就可以轮询通道从而获得IO事件。

    对应到代码里就是这些。

     b.channel(NioServerSocketChannel.class);
     //3 设置监听端口
     b.localAddress(serverPort);
     //4 设置通道的参数
     b.option(ChannelOption.SO_KEEPALIVE, true);
     b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
     b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    
  2. 第二步: 查询选择

    反应器模式中,一个选择器会负责一个线程,不断地轮询获得IO事件。

    对应到代码里就是这些;

    //创建reactor 线程组
    EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
    //1 设置reactor 线程组
    b.group(bossLoopGroup, workerLoopGroup);
    

    首先创建两个线程组一个监听通道,一个负责转发IO到对应处理器。

  3. 第三步: 事件分发

    查询到IO事件就将事件分发给绑定的Handler处理器。

    对应到代码里:

    //5 装配子通道流水线
    b.childHandler(new ChannelInitializer<SocketChannel>()
    {
        //有连接到达时会创建一个channel
        protected void initChannel(SocketChannel ch) throws Exception
        {
            // pipeline管理子通道channel中的Handler
            // 向子channel流水线添加一个handler处理器
            ch.pipeline().addLast(new NettyDiscardHandler());
        }
    });
    
  4. 第四步: 真正的IO操作和业务处理

    完成真正的IO操作和业务处理,这一步由Handler业务处理器负责。

三、解密Netty

上面我们将netty和设计模式做了个对比,对每个代码块的作用都有了直观的理解,下面开始详细介绍Netty。

3.1 Netty中的Channel

值得一提的是,反应器模式和通道密切相关,反应器的查询和分发的IO事件都来自于通道!因此这东西是重点。

Netty中不直接使用Java NIO的Channel通道组件,对Channel通道组件进行了自己的封装。在Netty中,有一系列的Channel通道组件,为了支持多种通信协议,换句话说,对于每一种通信连接协议,Netty都实现了自己的通道。

Netty中常见的通道类型如下:

  • NioSocketChannel:异步非阻塞TCP Socket传输通道。
  • NioServerSocketChannel:异步非阻塞TCP Socket服务器端监听通道。
  • NioDatagramChannel:异步非阻塞的UDP传输通道。
  • NioSctpChannel:异步非阻塞Sctp传输通道。
  • NioSctpServerChannel:异步非阻塞Sctp服务器端监听通道。
  • OioSocketChannel:同步阻塞式TCP Socket传输通道。
  • OioServerSocketChannel:同步阻塞式TCP Socket服务器端监听通道。
  • OioDatagramChannel:同步阻塞式UDP传输通道。
  • OioSctpChannel:同步阻塞式Sctp传输通道。
  • OioSctpServerChannel:同步阻塞式Sctp服务器端监听通道。

3.2 Netty中的Reactor反应器

反应器模式中,一个反应器负责一个事件处理线程,不断轮询,通过selector选择器获取注册过的IO事件,如果获取到就转发给Handler处理器处理。

Netty中的反应器有多个实现类,与Channel通道类有关系。对应于NioSocketChannel通道,Netty的反应器类为:NioEventLoop。

NioEventLoop类绑定了两个重要的Java成员属性:一个是Thread线程类的成员,一个是Java NIO选择器的成员属性。

通过类图我们可以看出,NioEventLoop和前面说的反应器一样,都拥有一个线程来负责轮询。

理论上来说,一个EventLoopNetty反应器和NettyChannel通道是一对多的关系:一个反应器可以注册成千上万的通道。

3.3 Netty中的Handler处理器

我们知道之前监听通道时候可以处理的IO事件类型有四种分别是:

  • 可读:SelectionKey.OP_READ
  • 可写:SelectionKey.OP_WRITE
  • 连接:SelectionKey.OP_CONNECT
  • 接收:SelectionKey.OP_ACCEPT

Netty的Handler处理器分为两大类:第一类是ChannelInboundHandler通道入站处理器;第二类是ChannelOutboundHandler通道出站处理器。二者都继承了ChannelHandler处理器接口

3.3.1 Netty中的入站处理

从通道底层触发,由Netty通过层层传递,调用ChannelInboundHandler通道入站处理器进行的某个处理。

以底层的Java NIO中的OP_READ输入事件为例:在通道中发生了OP_READ事件后,会被EventLoop查询到,然后分发给ChannelInboundHandler通道入站处理器,调用它的入站处理的方法read。

在ChannelInboundHandler通道入站处理器内部的read方法可以从通道中读取数据。Netty中的入站处理,触发的方向为:从通道到ChannelInboundHandler通道入站处理器.

3.3.2 Netty中的入站处理

Netty中的出站处理指的是从ChanneOutboundHandler通道出站处理器到通道的某次IO操作

例如,在应用程序完成业务处理后,可以通过ChanneOutboundHandler通道出站处理器将处理的结果写入底层通道。它的最常用的一个方法就是write()方法,把数据写入到通道。

这两个业务处理接口都有各自的默认实现:ChannelInboundHandler的默认实现为ChannelInboundHandlerAdapter,叫作通道入站处理适配器。ChanneOutboundHandler的默认实现为ChanneloutBoundHandlerAdapter,叫作通道出站处理适配器。这两个默认的通道处理适配器,分别实现了入站操作和出站操作的基本功能。如果要实现自己的业务处理器,不需要从零开始去实现处理器的接口,只需要继承通道处理适配器即可。

这也就解释了为什么前面的代码中处理器继承了接口

3.3 Netty的流水线(Pipeline)

在之前的代码中还有一块没有介绍就是流水线。

这个流水线是Netty独有的组件,也叫做ChannelPipleline(通道流水线),他像一条管道,将绑定到一个通道的多个Handler处理器串联起来,形成一条流水线。

ChannelPipeline(通道流水线)的默认实现,实际上被设计成一个双向链表。所有的Handler处理器实例被包装成了双向链表的节点,被加入到了ChannelPipeline(通道流水线)中。

以入站处理为例。每一个来自通道的IO事件,都会进入一次ChannelPipeline通道流水线。在进入第一个Handler处理器后,这个IO事件将按照既定的从前往后次序,在流水线上不断地向后流动,流向下一个Handler处理器。在向后流动的过程中,会出现3种情况:

(1)如果后面还有其他Handler入站处理器,那么IO事件可以交给下一个Handler处理器,向后流动。

(2)如果后面没有其他的入站处理器,这就意味着这个IO事件在此次流水线中的处理结束了。

(3)如果在流水线中间需要终止流动,可以选择不将IO事件交给下一个Handler处理器,流水线的执行也被终止了。

总之,流水线是通道的“大管家”,为通道管理好了它的一大堆Handler“小弟”。

3.4 组件之间的关系

来梳理一下Netty的反应器模式中各个组件之间的关系:

  1. 反应器(或者SubReactor子反应器)和通道之间是一对多的关系:一个反应器可以查询很多个通道的IO事件。
  2. 通道和Handler处理器实例之间,是多对多的关系:一个通道的IO事件被多个的Handler实例处理;一个Handler处理器实例也能绑定到很多的通道,处理多个通道的IO事件。
  3. 通道和Handler处理器实例之间的绑定关系通过流水线实现

四、Netty的启动类

上面说了那麽多组件,如果开发中要我们自己实现那不得崩溃死,为了方便Netty提供了便利的工厂类,通过它可以方便的完成Netty的客户端或者服务器的Netty组件的组装以及Netty的初始化。

之前的代码中就是用了启动类:

这两个启动器仅仅是使用的地方不同,它们大致的配置和使用方法都是相同的。下面以ServerBootstrap服务器启动类作为重点的介绍对象。

在了解ServerBootstrap之前我们需要熟悉两个概念:

4.1 父子通道

在Netty中,每一个NioSocketChannel通道所封装的是Java NIO通道,再往下就对应到了操作系统底层的socket描述符。理论上来说,操作系统底层的socket描述符分为两类:

  • 连接监听类型。连接监听类型的socket描述符,放在服务器端,它负责接收客户端的套接字连接;
  • 传输数据类型。数据传输类的socket描述符负责传输数据。同一条TCP的Socket传输链路,在服务器和客户端,都分别会有一个与之相对应的数据传输类型的socket描述符。

在Netty中,异步非阻塞的服务器端监听通道NioServerSocketChannel,封装在Linux底层的描述符,是“连接监听类型”socket描述符;

而NioSocketChannel异步非阻塞TCP Socket传输通道,封装在底层Linux的描述符,是“数据传输类型”的socket描述符。

在Netty中,将有接收关系的NioServerSocketChannel和NioSocketChannel,叫作父子通道。

其中,NioServerSocketChannel负责服务器连接监听和接收,也叫父通道(Parent Channel)。对应于每一个接收到的NioSocketChannel传输类通道,也叫子通道(Child Channel)。

4.2 EventLoopGroup线程组

细心的人注意到了前面创建反应器的时候都使用了EventLoopGroup这玩意是干什么的呢?

  EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
  EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

在Netty中,一个EventLoop相当于一个子反应器(SubReactor)。大家已经知道,一个NioEventLoop子反应器拥有了一个线程,同时拥有一个Java NIO选择器。Netty如何组织外层的反应器呢?

答案是使用EventLoopGroup线程组。多个EventLoop线程组成一个EventLoopGroup线程组。

反过来说,Netty的EventLoopGroup线程组就是一个多线程版本的反应器。而其中的单个EventLoop线程对应于一个子反应器(SubReactor)。

Netty的程序开发不会直接使用单个EventLoop线程,而是使用EventLoopGroup线程组。

EventLoopGroup的参数

EventLoopGroup的构造函数有一个参数,用于指定内部的线程数。在构造器初始化时,会按照传入的线程数量,在内部构造多个Thread线程和多个EventLoop子反应器(一个线程对应一个EventLoop子反应器),进行多线程的IO事件查询和分发。

如果使用EventLoopGroup不传入参数呢?就像代码第二行

没有传入线程数或者传入的线程数为0,那么EventLoopGroup内部的线程数到底是多少呢?默认的EventLoopGroup内部线程数为最大可用的CPU处理器数量的2倍。假设电脑使用的是4核的CPU,那么在内部会启动8个EventLoop线程,相当8个子反应器(SubReactor)实例。

从之前的讲解中知道bossLoopGroup负责新连接的监听,workerLoopGroup负责IO事件的处理。他们分别对应了一个线程组。但是他们又是如何分工的呢?

负责新连接的监听和接受的EventLoopGroup线程组,查询父通道的IO事件,有点像负责招工的包工头,因此,可以形象地称为“包工头”(Boss)线程组。另一个EventLoopGroup线程组负责查询所有子通道的IO事件,并且执行Handler处理器中的业务处理——例如数据的输入和输出(有点儿像搬砖),这个线程组可以形象地称为“工人”(Worker)线程组。

4.3 启动流程

了解了这些知识我们就可以学习他的启动流程了。

  1. 首先创建启动器。

     ServerBootstrap b = new ServerBootstrap();
    
  2. 创建反应器线程组,并将反应器线程组赋值给启动器。

     //创建reactor 线程组
            EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
     //1 设置reactor 线程组
                b.group(bossLoopGroup, workerLoopGroup);
    

    两个NioEventLoopGroup线程组,一个负责处理连接监听IO事件;一个负责数据IO事件和Handler业务处理。

    值得一提的是吗,不一定非得配置两个线程组,可以仅配置一个EventLoopGroup反应器线程组。具体的配置方法是调用b.group( workerGroup)。在这种模式下,连接监听IO事件和数据传输IO事件可能被挤在了同一个线程中处理。这样会带来一个风险:新连接的接受被更加耗时的数据传输或者业务处理所阻塞。在服务器端,建议设置成两个线程组的工作模式。

  3. 设置通道的IO类型。

     //2 设置nio类型的channel
                b.channel(NioServerSocketChannel.class);
    

    如果确实需要指定Bootstrap的IO模型为BIO(阻塞式IO),那么这里配置上Netty的OioServerSocketChannel.class类即可。由于NIO的优势巨大,通常不会在Netty中使用BIO。

  4. 配置监听端口,并且设置传输通道的相关配置。

     //3 设置监听端口
                b.localAddress(serverPort);
                //4 设置通道的参数
                b.option(ChannelOption.SO_KEEPALIVE, true);
                b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
                b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    

    这里用到了Bootstrap的option() 选项设置方法。对于服务器的Bootstrap而言,这个方法的作用是:给父通道(Parent Channel)接收连接通道设置一些选项。如果要给子通道(Child Channel)设置一些通道选项,则需要用另外一个childOption()设置方法。

  5. 装配流水线,也就是绑定处理器。

     //5 装配子通道流水线
                b.childHandler(new ChannelInitializer<SocketChannel>()
                {
                    //有连接到达时会创建一个channel
                    protected void initChannel(SocketChannel ch) throws Exception
                    {
                        // pipeline管理子通道channel中的Handler
                        // 向子channel流水线添加一个handler处理器
                        ch.pipeline().addLast(new NettyDiscardHandler());
                    }
                });
    

    每一个通道的子通道,都用一条ChannelPipeline流水线。它的内部有一个双向的链表。

    装配流水线的方式是:将业务处理器ChannelHandler实例加入双向链表中。装配子通道的Handler流水线调用childHandler()方法,传递一个ChannelInitializer通道初始化类的实例。

    在父通道成功接收一个连接,并创建成功一个子通道后,就会初始化子通道,这里配置的ChannelInitializer实例就会被调用。

    在ChannelInitializer通道初始化类的实例中,有一个initChannel初始化方法,在子通道创建后会被执行到,向子通道流水线增加业务处理器。

    为什么仅装配子通道的流水线,而不需要装配父通道的流水线呢?

    原因是:父通道也就是NioServerSocketChannel连接接受通道,它的内部业务处理是固定的:接受新连接后,创建子通道,然后初始化子通道,所以不需要特别的配置。如果需要完成特殊的业务处理,可以使用ServerBootstrap的handler(ChannelHandlerhandler)方法,为父通道设置ChannelInitializer初始化器。

  6. 开始绑定服务器新连接的监听端口

    // 6 开始绑定server
                // 通过调用sync同步方法阻塞直到绑定成功
                ChannelFuture channelFuture = b.bind().sync();
                Logger.info(" 服务器启动成功,监听端口: " +
                        channelFuture.channel().localAddress());
    //////////////////////////////////////////////////////////////////////////
      ChannelFuture channelFuture = b.bind().addListener(
                        future ->
                        {
                            if (future.isSuccess()) {
                                System.out.println(new Date() + ": 端口[" + serverPort + "]绑定成功!");
                            } else {
                                System.err.println("端口[" + serverPort + "]绑定失败!");
                            }
                        }
                );
    

    在Netty中,所有的IO操作都是异步执行的,这就意味着任何一个IO操作会立刻返回,在返回的时候,异步任务还没有真正执行。什么时候执行完成呢?

    Netty中的IO操作,都会返回异步任务实例(如ChannelFuture实例),通过自我阻塞一直到ChannelFuture异步任务执行完成,或者为ChannelFuture增加事件监听器的两种方式,以获得Netty中的IO操作的真正结果。上面使用了第一种。至此,服务器正式启动。

  7. 自我阻塞,直到通道关闭

      // 7 等待通道关闭的异步任务结束
                // 服务监听通道会一直等待通道关闭的异步任务结束
                ChannelFuture closeFuture = channelFuture.channel().closeFuture();
                closeFuture.sync();
    

    如果要阻塞当前线程直到通道关闭,可以使用通道的closeFuture()方法,以获取通道关闭的异步任务。当通道被关闭时,closeFuture实例的sync()方法会返回。

    f.channel().closeFuture().sync();作用是产生一个wait()事件,保证main线程存活,否则main线程直接结束了。

    划重点,这部分刚接触的肯理解起来比较费劲:

    在这里面future.channel().closeFuture().sync();这个语句的主要目的是,如果缺失上述代码,则main方法所在的线程,即主线程会在执行完bind().sync()方法后,会进入finally 代码块,之前的启动的nettyserver也会随之关闭掉,整个程序都结束了。原文的例子有英文注释:

    让线程进入wait状态,也就是main线程暂时不会执行到finally里面,nettyserver也持续运行,如果监听到关闭事件,可以优雅的关闭通道和nettyserver,虽然这个例子中,永远不会监听到关闭事件。也就是说这个例子是仅仅为了展示存在api shutdownGracefully,可以优雅的关闭nettyserver。

  8. 关闭EventLoopGroup

       workerLoopGroup.shutdownGracefully();
                bossLoopGroup.shutdownGracefully();
    
  9. 关闭Reactor反应器线程组,同时会关闭内部的subReactor子反应器线程,也会关闭内部的Selector选择器、内部的轮询线程以及负责查询的所有的子通道。在子通道关闭后,会释放掉底层的资源,如TCP Socket文件描述符等。

五、详解Channel通道

通过上面的学习,通道是Netty网络中最重要的核心,代表着网络连接。通道是通信的主题,负责同对端进行网络通信,可以写入数据到对端,也可以从对端读取数据。所有有必要堆通道做个整理。

5.1 构造函数

 protected AbstractChannel(Channel parent) {
        this.parent = parent;//父通道
        this.unsafe = this.newUnsafe();//底层的NIO通道,完成实际的NIO操作
        this.pipeline = new DefaultChannelPipeline(this);//一个通道一个流水线
    }

从构造函数中我们可以明显看出AbstractChannel内部有一个pipeline属性,表示处理器的流水线。初始化通道时,pipeline属性会初始化成DefaultChannelPipeline实例。

其中parent属性表示通道的父通道。对于连接监听通道来说(如NioServerSocketChannel实例),他的父通道就是null,而对于每一条传输通道(如NioSocketChannel实例)他的父通道就是接收到该连接的服务连接监听通道。

几乎所有的通道实现类都继承了AbstractChannel抽象类,都拥有上面的parent和pipeline两个属性成员。

5.2 成员方法

看完构造函数,继续 了解他的成员方法。

  1. ChannelFuture connect(SocketAddress address)

    用于连接远程服务器。参数是远程服务器的地址,调用后会立即返回,返回值为负责连接操作的异步任务ChannelFuture。此方法在客户端的传输通道使用。

  2. ChannelFuture bind(SocketAddress address)

    绑定监听地址,开始监听新的客户端连接。此方法在服务器的新连接监听和接收通道使用。

  3. ChannelFuture close()

    关闭通道连接,返回连接关闭的ChannelFuture异步任务。如果需要在连接正式关闭后执行其他操作,则需要为异步任务设置回调方法;或者调用ChannelFuture异步任务的sync( ) 方法来阻塞当前线程,一直等到通道关闭的异步任务执行完毕。

  4. Channel read()

    读取通道数据,并且启动入站处理。具体来说,从内部的JavaNIO Channel通道读取数据,然后启动内部的Pipeline流水线,开启数据读取的入站处理。此方法的返回通道自身用于链式调用。

  5. ChannelFuture write(Object o)

    启程出站流水处理,把处理后的最终数据写到底层Java NIO通道。此方法的返回值为出站处理的异步处理任务。

  6. Channel flush()

    缓冲区中的数据立即写出到对端。并不是每一次write操作都是将数据直接写出到对端,write操作的作用在大部分情况下仅仅是写入到操作系统的缓冲区,操作系统会将根据缓冲区的情况,决定什么时候把数据写到对端。而执行flush()方法立即将缓冲区的数据写到对端。

5.3 嵌入式通道EmbeddedChannel

这是一个模拟入站和出站的通道,主要用于测试。

六、详解Handler业务处理器

Reactor反应器获取到IO事件后,分发到Handler业务处理器,由handler完成IO操作和业务处理。

整个IO处理的流程包括:通道读数据、数据包解码、业务处理、目标数据编码、数据包写入通道。

黑体字的四个流程是Netty底层负责完成,我们要做的就是业务处理。

前面已经介绍过,从应用程序开发人员的角度来看,有入站和出站两种类型操作。

· 入站处理,触发的方向为:自底向上,Netty的内部(如通道)到ChannelInboundHandler入站处理器。

· 出站处理,触发的方向为:自顶向下,从ChannelOutboundHandler出站处理器到Netty的内部(如通道)。按照这种方向来分,前面数据包解码、业务处理两个环节——属于入站处理器的工作;后面目标数据编码、把数据包写到通道中两个环节——属于出站处理器的工作。

6.1 入站处理器

到数据进入Netty通道后,netty就会触发入站处理器,这里我们就学习一下ChannelInboundHandler。

  1. channelRegistered

    当通道注册完后,Netty会调用fireChannelRegistered,触发通道注册事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelRegistered方法,会被调用到。

  2. channelActive

    当通道激活完成后,Netty会调用fireChannelActive,触发通道激活事件

  3. channelRead

    当通道缓冲区可读,Netty会调用fireChannelRead,触发通道可读事件

  4. channelReadComplete

    当通道缓冲区读完,Netty会调用fireChannelReadComplete,触发通道读完事件

  5. channelInactive

    当连接被断开或者不可用,Netty会调用fireChannelInactive,触发连接不可用事件

  6. exceptionCaught

    通道处理过程发生异常时,Netty会调用fireExceptionCaught,触发异常捕获事件

6.2 出站处理器

当业务处理完成后,需要操作Java NIO底层通道时,通过一系列的ChannelOutboundHandler通道出站处理器,完成Netty通道到底层通道的操作。比方说建立底层连接、断开底层连接、写入底层Java NIO通道等。ChannelOutboundHandler接口定义了大部分的出站操作

  1. bind

    监听地址(IP+端口)绑定:完成底层Java IO通道的IP地址绑定。如果使用TCP传输协议,这个方法用于服务器端。

  2. connect

    连接服务端:完成底层Java IO通道的服务器端的连接操作。如果使用TCP传输协议,这个方法用于客户端。

  3. write

    写数据到底层:完成Netty通道向底层Java IO通道的数据写入操作。此方法仅仅是触发一下操作而已,并不是完成实际的数据写入操作。

  4. flush

    腾空缓冲区中的数据,把这些数据写到对端:将底层缓存区的数据腾空,立即写出到对端。

  5. read

    从底层读数据:完成Netty通道从Java IO通道的数据读取。

  6. disConnect

    断开服务器连接

  7. close

    主动关闭通道:关闭底层的通道,例如服务器端的新连接监听通道

6.3 通道初始化处理器

通道和Handler业务处理器的关系是:一条Netty的通道拥有一条Handler业务处理器流水线,负责装配自己的Handler业务处理器。装配Handler的工作,发生在通道开始工作之前。

但是如果向流水线中装配业务处理器呢?这就得借助通道的初始化类——ChannelInitializer。

回顾一下代码

 //5 装配子通道流水线
            b.childHandler(new ChannelInitializer<SocketChannel>()
            {
                //有连接到达时会创建一个channel
                protected void initChannel(SocketChannel ch) throws Exception
                {
                    // pipeline管理子通道channel中的Handler
                    // 向子channel流水线添加一个handler处理器
                    ch.pipeline().addLast(new NettyDiscardHandler());
                }
            });

这里面使用了ChannelInitializer。

initChannel()方法是ChannelInitializer定义的一个抽象方法,在父通道调用initChannel()方法时,会将新接收的通道作为参数,传递给initChannel()方法。initChannel()方法内部大致的业务代码是:拿到新连接通道作为实际参数,往它的流水线中装配Handler业务处理器。

其他的方法就不介绍了类似于前面的两种处理器,感兴趣的可以自己去我的git上看。

七、详解Pipeline流水线

前面讲到,一条Netty通道需要很多的Handler业务处理器来处理业务。每条通道内部都有一条流水线(Pipeline)将Handler装配起来。Netty的业务处理器流水线ChannelPipeline是基于责任链设计模式(Chain of Responsibility)来设计的,内部是一个双向链表结构,能够支持动态地添加和删除Handler业务处理器。

关于他的流程感兴趣的可以去我的git上看看,这里就不做详细介绍了。

7.1 ChannelHandlerContext上下文

不管我们定义的是哪种类型的Handler业务处理器,最终它们都是以双向链表的方式保存在流水线中。

在Handler业务处理器被添加到流水线中时,会创建一个通道处理器上下文ChannelHandlerContext,它代表了ChannelHandler通道处理器和ChannelPipeline通道流水线之间的关联。

Channel、Handler、ChannelHandlerContext三者的关系为:

Channel通道拥有一条ChannelPipeline通道流水线,每一个流水线节点为一个ChannelHandlerContext通道处理器上下文对象,每一个上下文中包裹了一个ChannelHandler通道处理器。在ChannelHandler通道处理器的入站/出站处理方法中,Netty都会传递一个Context上下文实例作为实际参数。通过Context实例的实参,在业务处理中,可以获取ChannelPipeline通道流水线的实例或者Channel通道的实例。

7.2 截断流水线的处理

在入站/出站的过程中,如果由于业务条件不满足,需要截断流水线的处理,不让处理进入下一站。

出站处理流程只要开始执行,就不能被截断。强行截断的话,Netty会抛出异常。如果业务条件不满足,可以不启动出站处理。

7.3 Handler业务处理器的热拔插

Netty中的处理器流水线是一个双向链表。在程序执行过程中,可以动态进行业务处理器的热拔插:动态地增加、删除流水线上的业务处理器Handler。主要的Handler热拔插方法声明在ChannelPipeline接口中

八、ByteBuf缓冲区

Netty提供了ByteBuf来替代Java NIO的ByteBuffer缓冲区,以操纵内存缓冲区。

8.1 优势

与Java NIO的ByteBuffer相比,ByteBuf的优势如下:

· Pooling (池化,这点减少了内存复制和GC,提升了效率)·

· 复合缓冲区类型,支持零复制· 不需要调用flip()方法去切换读/写模式

· 扩展性好,例如StringBuffer

· 可以自定义缓冲区类型

· 读取和写入索引分开

· 方法的链式调用

· 可以进行引用计数,方便重复使用

第一个部分是已用字节,表示已经使用完的废弃的无效字节;

第二部分是可读字节,这部分数据是ByteBuf保存的有效数据从ByteBuf中读取的数据都来自这一部分;

第三部分是可写字节,写入到ByteBuf的数据都会写到这一部分中;

第四部分是可扩容字节,表示的是该ByteBuf最多还能扩容的大小。

什么是Pooled(池化)的ByteBuf缓冲区呢?

在通信程序的执行过程中,Buffer缓冲区实例会被频繁创建、使用、释放。大家都知道,频繁创建对象、内存分配、释放内存,系统的开销大、性能低,如何提升性能、提高Buffer实例的使用率呢?从Netty4版本开始,新增了对象池化的机制。即创建一个Buffer对象池,将没有被引用的Buffer对象,放入对象缓存池中;当需要时,则重新从对象缓存池中取出,而不需要重新创建。

8.2 重要属性

这三个属性定义在AbstractByteBuf抽象类中,分别是:

· readerIndex(读指针):指示读取的起始位置

· writerIndex(写指针):指示写入的起始位置。

· maxCapacity(最大容量):表示ByteBuf可以扩容的最大容量。

8.3 三组方法

  • 第一组:容量系列:capacity()、maxCapacity()

  • 第二组:写入系列:isWritable() 、writableBytes()、maxWritableBytes()、writeBytes(byte[] src) ......

  • 第三组:读取系列:isReadable( )、readableBytes( )、readType()、getTYPE(TYPE value)

    ......

这里就不做详细介绍了感兴趣的可以去查看源码。

8.4 实践案例

实际使用分三步(1)分配一个ByteBuf实例;(2)向ByteBuf写数据;(3)从ByteBuf读数据。

参考github代码

8.5 引用计数

Netty的ByteBuf的内存回收工作是通过引用计数的方式管理的。JVM中使用“计数器”(一种GC算法)来标记对象是否“不可达”进而收回。Netty也使用了这种手段来对ByteBuf的引用进行计数。Netty采用“计数器”来追踪ByteBuf的生命周期,一是对Pooled ByteBuf的支持,二是能够尽快地“发现”那些可以回收的ByteBuf(非Pooled),以便提升ByteBuf的分配和销毁的效率。

九、EchoServer回显服务器的实践案例

功能:从服务器端读取客户端输入的数据,然后将数据直接回显到Console控制台。

public class NettyEchoServer
{

    private final int serverPort;
    ServerBootstrap b = new ServerBootstrap();

    public NettyEchoServer(int port)
    {
        this.serverPort = port;
    }

    public void runServer()
    {
        //创建reactor 线程组
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try
        {
            //1 设置reactor 线程组
            b.group(bossLoopGroup, workerLoopGroup);
            //2 设置nio类型的channel
            b.channel(NioServerSocketChannel.class);
            //3 设置监听端口
            b.localAddress(serverPort);
            //4 设置通道的参数
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 装配子通道流水线
            b.childHandler(new ChannelInitializer<SocketChannel>()
            {
                //有连接到达时会创建一个channel
                protected void initChannel(SocketChannel ch) throws Exception
                {
                    // pipeline管理子通道channel中的Handler
                    // 向子channel流水线添加一个handler处理器
                    ch.pipeline().addLast(NettyEchoServerHandler.INSTANCE);
                }
            });
            // 6 开始绑定server
            // 通过调用sync同步方法阻塞直到绑定成功
            ChannelFuture channelFuture = b.bind().sync();
            Logger.info(" 服务器启动成功,监听端口: " +
                    channelFuture.channel().localAddress());

            // 7 等待通道关闭的异步任务结束
            // 服务监听通道会一直等待通道关闭的异步任务结束
            ChannelFuture closeFuture = channelFuture.channel().closeFuture();
            closeFuture.sync();
        } catch (Exception e)
        {
            e.printStackTrace();
        } finally
        {
            // 8 优雅关闭EventLoopGroup,
            // 释放掉所有资源包括创建的线程
            workerLoopGroup.shutdownGracefully();
            bossLoopGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws InterruptedException
    {
        int port = NettyDemoConfig.SOCKET_SERVER_PORT;
        new NettyEchoServer(port).runServer();
    }
}

9.1 共享NettyEchoServerHandler处理器

EchoServerHandler回显服务器处理器,继承自ChannelInboundHandlerAdapter,然后覆盖了channelRead方法,这个方法在可读IO事件到来时,被流水线回调。

第一步,从channelRead方法的msg参数。

第二步,调用ctx.channel().writeAndFlush() 把数据写回客户端。

@ChannelHandler.Sharable
public class NettyEchoServerHandler extends ChannelInboundHandlerAdapter
{
    public static final NettyEchoServerHandler INSTANCE = new NettyEchoServerHandler();
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
    {
        ByteBuf in = (ByteBuf) msg;
        Logger.info("msg type: " + (in.hasArray() ? "堆内存" : "直接内存"));

        int len = in.readableBytes();
        byte[] arr = new byte[len];
        in.getBytes(0, arr);
        Logger.info("server received: " + new String(arr, "UTF-8"));

        //写回数据,异步任务
        Logger.info("写回前,msg.refCnt:" + ((ByteBuf) msg).refCnt());
        ChannelFuture f = ctx.writeAndFlush(msg);
        f.addListener((ChannelFuture futureListener) ->
        {
            Logger.info("写回后,msg.refCnt:" + ((ByteBuf) msg).refCnt());
        });
    }
}

这里的NettyEchoServerHandler在前面加了一个特殊的Netty注解:@ChannelHandler.Sharable。这个注解的作用是标注一个Handler实例可以被多个通道安全地共享,如果没有加注解,试图将同一个Handler实例添加到多个ChannelPipeline通道流水线时,Netty将会抛出异常。

9.2 NettyEchoClient客户端代码

客户端Bootstrap的装配和使用,代码如下:

public class NettyEchoClient
{

    private int serverPort;
    private String serverIp;
    Bootstrap b = new Bootstrap();

    public NettyEchoClient(String ip, int port)
    {
        this.serverPort = port;
        this.serverIp = ip;
    }

    public void runClient()
    {
        //创建reactor 线程组
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try
        {
            //1 设置reactor 线程组
            b.group(workerLoopGroup);
            //2 设置nio类型的channel
            b.channel(NioSocketChannel.class);
            //3 设置监听端口
            b.remoteAddress(serverIp, serverPort);
            //4 设置通道的参数
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 装配子通道流水线
            b.handler(new ChannelInitializer<SocketChannel>()
            {
                //有连接到达时会创建一个channel
                protected void initChannel(SocketChannel ch) throws Exception
                {
                    // pipeline管理子通道channel中的Handler
                    // 向子channel流水线添加一个handler处理器
                    ch.pipeline().addLast(NettyEchoClientHandler.INSTANCE);
                }
            });
            ChannelFuture f = b.connect();
            f.addListener((ChannelFuture futureListener) ->
            {
                if (futureListener.isSuccess())
                {
                    Logger.info("EchoClient客户端连接成功!");

                } else
                {
                    Logger.info("EchoClient客户端连接失败!");
                }

            });

            // 阻塞,直到连接完成
            f.sync();
            Channel channel = f.channel();

            Scanner scanner = new Scanner(System.in);
            Print.tcfo("请输入发送内容:");

            while (scanner.hasNext())
            {
                //获取输入的内容
                String next = scanner.next();
                byte[] bytes = (Dateutil.getNow() + " >>" + next).getBytes("UTF-8");
                //发送ByteBuf
                ByteBuf buffer = channel.alloc().buffer();
                buffer.writeBytes(bytes);
                channel.writeAndFlush(buffer);
                Print.tcfo("请输入发送内容:");

            }
        } catch (Exception e)
        {
            e.printStackTrace();
        } finally
        {
            // 优雅关闭EventLoopGroup,
            // 释放掉所有资源包括创建的线程
            workerLoopGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws InterruptedException
    {
        int port = NettyDemoConfig.SOCKET_SERVER_PORT;
        String ip = NettyDemoConfig.SOCKET_SERVER_IP;
        new NettyEchoClient(ip, port).runClient();
    }
}

在上面的代码中,客户端在连接到服务器端成功后不断循环,获取控制台的输入,通过服务器端的通道发送到服务器。

9.3 NettyEchoClientHandler处理器

客户端的流水线不是空的,还需要装配一个回显处理器,功能很简单,就是接收服务器写过来的数据包,显示在Console控制台上。代码如下:

public class NettyDumpSendClient
{

    private int serverPort;
    private String serverIp;
    Bootstrap b = new Bootstrap();

    public NettyDumpSendClient(String ip, int port)
    {
        this.serverPort = port;
        this.serverIp = ip;
    }

    public void runClient()
    {
        //创建reactor 线程组
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try
        {
            //1 设置reactor 线程组
            b.group(workerLoopGroup);
            //2 设置nio类型的channel
            b.channel(NioSocketChannel.class);
            //3 设置监听端口
            b.remoteAddress(serverIp, serverPort);
            //4 设置通道的参数
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 装配子通道流水线
            b.handler(new ChannelInitializer<SocketChannel>()
            {
                //有连接到达时会创建一个channel
                protected void initChannel(SocketChannel ch) throws Exception
                {
                    // pipeline管理子通道channel中的Handler
                    // 向子channel流水线添加一个handler处理器
                    ch.pipeline().addLast(NettyEchoClientHandler.INSTANCE);
                }
            });
            ChannelFuture f = b.connect();
            f.addListener((ChannelFuture futureListener) ->
            {
                if (futureListener.isSuccess())
                {
                    Logger.info("EchoClient客户端连接成功!");

                } else
                {
                    Logger.info("EchoClient客户端连接失败!");
                }

            });

            // 阻塞,直到连接完成
            f.sync();
            Channel channel = f.channel();

            //6发送大量的文字
            byte[] bytes = "疯狂创客圈:高性能学习社群!".getBytes(Charset.forName("utf-8"));
            for (int i = 0; i < 1000; i++)
            {
                //发送ByteBuf
                ByteBuf buffer = channel.alloc().buffer();
                buffer.writeBytes(bytes);
                channel.writeAndFlush(buffer);
            }


            // 7 等待通道关闭的异步任务结束
            // 服务监听通道会一直等待通道关闭的异步任务结束
            ChannelFuture closeFuture = channel.closeFuture();
            closeFuture.sync();

        } catch (Exception e)
        {
            e.printStackTrace();
        } finally
        {
            // 优雅关闭EventLoopGroup,
            // 释放掉所有资源包括创建的线程
            workerLoopGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws InterruptedException
    {
        int port = NettyDemoConfig.SOCKET_SERVER_PORT;
        String ip = NettyDemoConfig.SOCKET_SERVER_IP;
        new NettyDumpSendClient(ip, port).runClient();
    }

通过代码可以看到,从服务器端发送过来的ByteBuf,被手动方式强制释放掉了。当然,也可以使用前面介绍的自动释放方式来释放ByteBuf。

总结

通过这几天的学习,收获还是不少了解了Netty基本原理:Reactor反应器模式在Netty中的应用,Netty中Reactor反应器、Handler业务处理器、Channel通道以及它们三者之间的相互关系。除此之外还学习了Pipeline流水线,他是Netty为了有效地管理通道和Handler业务处理器之间的关系。

这一节的内容还是挺多的,我自己读起来有时候也感觉晕晕的,但是其实读完把思路整理一下无外乎就几个主要的部分,反应器,通道,选择器,流水线等,建议跟着代码一起学习,一边debug一边理解他的执行流程和每个部分的作用,会更加直观简单。

Copyright: 采用 知识共享署名4.0 国际许可协议进行许可

Links: https://mxyblogs.club/archives/netty原理和基础

Buy me a cup of coffee ☕.