Reactor反应器模式(一)

Reactor反应器模式(一)

文章发布于 2020-08-27 11:19:05,最后更新于 2020-09-08 16:12:23

学习Reactor反应器模式(一)

​ 以餐厅为例,每一个人就餐就是一个事件,顾客会先看下菜单,然后点餐,处理这些就餐事件需要服务人员。就像一个网络服务会有很多的请求,服务器会收到每个请求,然后指派工作线程去处理一样。

​ 在多线程处理方式下:

​ 一个人来就餐,一个服务员去服务,然后客人会看菜单,点菜。 服务员将菜单给后厨。

​ 二个人来就餐,二个服务员去服务……

​ 五个人来就餐,五个服务员去服务……

​ 这类似多线程的处理方式,一个事件到来,就会有一个线程为其服务。很显然这种方式在人少的情况下会有很好的用户体验,每个客人都感觉自己享有了最好的服务。如果这家餐厅一直这样同一时间最多来5个客人,这家餐厅是可以很好的服务下去的。

​ 由于这家店的服务好,吃饭的人多了起来。同一时间会来10个客人,老板很开心,但是只有5个服务员,这样就不能一对一服务了,有些客人就不能马上享有服务员为其服务了。老板为了挣钱,不得不又请了5个服务员。现在又好了,每位顾客都享受最好最快的待遇了。

​ 越来越多的人对这家餐厅满意,客源又多了,同时来吃饭的人到了20人,老板高兴但又高兴不起来了,再请服务员吧,占地方不说,还要开工钱,再请人就挣不到到钱了。

​ 怎么办呢?老板想了想,10个服务员对付20个客人也是能对付过来的,服务员勤快点就好了,伺候完一个客人马上伺候另外一个,还是来得及的。综合考虑了一下,老板决定就使用10个服务人员的线程池!

​ 但是这样又有一个比较严重的缺点:如果正在接受服务员服务的客人点菜很慢,其他的客人可能就要等好长时间了。有些脾气火爆的客人可能就等不了走人了。

​ 这样,我么那就引入了Reactor模式,那么,Reactor模式是如何处理这个问题呢?

​ 老板后来发现,客人点菜比较慢,大部服务员都在等着客人点菜,其实干的活不是太多。老板之所以能当老板当然有点不一样的地方,终于发现了一个新的方法,那就是:当客人点菜的时候,服务员就可以去招呼其他客人了,等客人点好了菜,直接招呼一声“服务员”,马上就有个服务员过去服务。在用了这个新方法后,老板进行了一次裁员,只留了一个服务员!这就是用单个线程来做多线程的事。实际的餐馆都是用的Reactor模式在服务。

首先我们这一节将学习反应器模式的概念,为什么要学习他,因为高性能网络编程都离不开反应器模式,而且很多著名的服务器软件或者中间件都是基于反应器模式实现的。所有从开发的角度来说,如果想完成高性能的服务器开发,反应器模式是必须学习和掌握的。

什么是Reactor反应器模式

反应器模式由Reactor反应器线程、Handlers处理器两大角色组成。

Reactor反应器:负责响应IO事件,当检查到IO事件就分发任务到Handlers处理器。

Handlers处理器:与IO事件绑定,负责IO事件的处理,非阻塞的执行业务处理逻辑。

多线程OIO的弊端

​ 在OIO编程中,最初的网络服务器程序是通过简单的while循环来不断地监听端口,判断是否有新的连接,如果有就调用函数处理。

while(true){
  socket=accept();//阻塞,接受连接
  handle(socket);//读取数据、业务处理、写入结果
}

从代码中我们可以清楚的看出如果前一个网络的handle未处理完,那么后面的请求无法被接收,于是后面的全部会阻塞。对于服务器来说这是致命的问题。

解决办法

​ 为了解决上述问题,有人提出了一个经典的模式就是Connection Per Thread(一个线程处理一个连接)模式,代码如下:

//...省略: 导入的Java类
class ConnectionPerThread implements Runnable {
    public void run() {
        try {
            //服务器监听socket
            ServerSocketserverSocket =
                    new ServerSocket(NioDemoConfig.SOCKET_SERVER_PORT);
            while (!Thread.interrupted()) {
                Socket socket = serverSocket.accept();
                //接收一个连接后,为socket连接,新建一个专属的处理器对象
                Handler handler = new Handler(socket);
                //创建新线程,专门负责一个连接的处理
                new Thread(handler).start();
            }
        } catch (IOException ex) { /* 处理异常 */ }
    }
    //处理器对象
    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) {
            socket = s;
        }
        public void run() {
            while (true) {
                try {
                    byte[] input = new byte[NioDemoConfig.SERVER_BUFFER_SIZE];
                    /* 读取数据 */
                    socket.getInputStream().read(input);
                    /* 处理业务逻辑,获取处理结果*/
                    byte[] output =null;
                    /* 写入结果 */
                    socket.getOutputStream().write(output);
                } catch (IOException ex) { /*处理异常*/ }
            }
        }
    }
}

​ 从代码上分析,优点是解决了前面连接被阻塞的问题。缺点也很明显,对于大量的连接就需要大量的线程资源。线程是系统比较昂贵的资源,频繁的使用线程在高并发场景下是致命的。

​ 综上分析,如何解决这些缺陷呢?就是使用Reactor反应器模式,用反应器模式来控制线程数量,做到一个线程处理大量连接。

单线程Reactor反应器模式

​ 之前说到过Reactor反应器模式类似于事件驱动。在事件驱动模式中,当有事件驱动时,事件源会将事件分发到handler处理器进行事件处理。反应器中模式中的反应器角色类似于事件驱动模式的事件分发角色。

方法一:void attach(Object o)

可以将JAVA POJO对象作为附件添加到SelectionKey实例中,相当于Setter方法

方法二:Object attachment()

是取出attach添加到SelectionKey选择键实例的附件,相当于Getter方法

​ 总之,在反应器模式中,需要进行attach和attachment结合使用:在选择键注册完成之后,调用attach方法,将Handler处理器绑定到选择键;当事件发生时,调用attachment方法,可以从选择键取出Handler处理器,将事件分发到Handler处理器中,完成业务处理。

单线程Reactor反应器参考代码

//...
class Reactor implements Runnable {
    Selector selector;
    ServerSocketChannel serverSocket;
    EchoServerReactor() throws IOException {
        //打开选择器、serverSocket连接监听通道
      	selector=Selector.open();
      	serverSocket=ServerSocketChannel.open()
        InetSocketAddress address =
                new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
                        NioDemoConfig.SOCKET_SERVER_PORT);
        serverSocket.socket().bind(address);
        //非阻塞
        serverSocket.configureBlocking(false);

        //分步处理,第一步,接收accept事件
        SelectionKey sk =
                serverSocket.register(selector, SelectionKey.OP_ACCEPT);
   //将新连接处理器作为附件,绑定到sk选择键
        sk.attach(new AcceptorHandler());
    }
    public void run() {
        //选择器轮询
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext()) {
                //反应器负责dispatch收到的事件
                    SelectionKeysk=it.next();
                    dispatch(sk);
                }
                selected.clear();
            }
        } catch (IOException ex) { ex.printStackTrace(); }
    }
    //反应器的分发方法
    void dispatch(SelectionKey k) {
        Runnable handler = (Runnable) (k.attachment());
        //调用之前绑定到选择键的handler处理器对象
        if (handler != null) {
            handler.run();
        }
    }
    // 新连接处理器
    class AcceptorHandler implements Runnable {
        public void run() {
         try
            {
                SocketChannel channel = serverSocket.accept();
                if (channel != null)
                    new EchoHandler(selector, channel);
            } catch (IOException e)
            {
                e.printStackTrace();
            }
        }
    }
    //….
}

​ 上面代码中,设计了新的Handler处理器也就是AcceptorHandler处理器,他是一个内部类。在注册serverSocket服务监听连接的接受事件之后,创建一个AcceptorHandler新连接处理器的实例,作为附件,被设置(attach)到了SelectionKey中。

​ 当新的连接发生后,取出之前acctach到selectionKey的Handler业务处理器,进行IO处理。

​ AcceptorHandler的功能有两个,一个是接收新的连接,另一个是为新连接创建一个新的输入输出的Handler处理器。

​ EchoHandler,顾名思义,就是负责socket的数据输入、业务处理、结果输出。示例代码如下:

class EchoHandler implements Runnable
{
    final SocketChannel channel;
    final SelectionKey sk;
    final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    static final int RECIEVING = 0, SENDING = 1;
    int state = RECIEVING;
    EchoHandler(Selector selector, SocketChannel c) throws IOException
    {
        channel = c;
        c.configureBlocking(false);
        //仅仅取得选择键,后设置感兴趣的IO事件
        sk = channel.register(selector, 0);

        //将Handler作为选择键的附件
        sk.attach(this);

        //第二步,注册Read就绪事件
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }
    public void run()
    {
        try
        {
            if (state == SENDING)
            {
                //写入通道
                channel.write(byteBuffer);
                //写完后,准备开始从通道读,byteBuffer切换成写模式
                byteBuffer.clear();
                //写完后,注册read就绪事件
                sk.interestOps(SelectionKey.OP_READ);
                //写完后,进入接收的状态
                state = RECIEVING;
            } else if (state == RECIEVING)
            {
                //从通道读
                int length = 0;
                while ((length = channel.read(byteBuffer)) > 0)
                {
                    Logger.info(new String(byteBuffer.array(), 0, length));
                }
                //读完后,准备开始写入通道,byteBuffer切换成读模式
                byteBuffer.flip();
                //读完后,注册write就绪事件
                sk.interestOps(SelectionKey.OP_WRITE);
                //读完后,进入发送的状态
                state = SENDING;
            }
            //处理结束了, 这里不能关闭select key,需要重复使用
            //sk.cancel();
        } catch (IOException ex)
        {
            ex.printStackTrace();
        }
    }
}

​ 在EchoHandler的构造器中,有两点比较重要:

​ (1)将新的SocketChannel传输通道,注册到了反应器Reactor类的同一个选择器中。这样保证了Reactor类和Handler类在同一个线程中执行。

​ (2)Channel传输通道注册完成后,将EchoHandler自身作为附件,attach到了选择键中。这样,在Reactor类分发事件(选择键)时,能执行到EchoHandler的run方法。

​ 在不熟悉网络通信的情况下理解起来还是挺费劲的,首先你得清楚每个方法,每个模块的含义,再通过实际的运行代码跑一跑才会加深印象。

Copyright: 采用 知识共享署名4.0 国际许可协议进行许可

Links: https://mxyblogs.club/archives/reactor1

Buy me a cup of coffee ☕.