学习Reactor反应器模式(一)
以餐厅为例,每一个人就餐就是一个事件,顾客会先看下菜单,然后点餐,处理这些就餐事件需要服务人员。就像一个网络服务会有很多的请求,服务器会收到每个请求,然后指派工作线程去处理一样。
在多线程处理方式下:
一个人来就餐,一个服务员去服务,然后客人会看菜单,点菜。 服务员将菜单给后厨。
二个人来就餐,二个服务员去服务……
五个人来就餐,五个服务员去服务……
这类似多线程的处理方式,一个事件到来,就会有一个线程为其服务。很显然这种方式在人少的情况下会有很好的用户体验,每个客人都感觉自己享有了最好的服务。如果这家餐厅一直这样同一时间最多来5个客人,这家餐厅是可以很好的服务下去的。
由于这家店的服务好,吃饭的人多了起来。同一时间会来10个客人,老板很开心,但是只有5个服务员,这样就不能一对一服务了,有些客人就不能马上享有服务员为其服务了。老板为了挣钱,不得不又请了5个服务员。现在又好了,每位顾客都享受最好最快的待遇了。
越来越多的人对这家餐厅满意,客源又多了,同时来吃饭的人到了20人,老板高兴但又高兴不起来了,再请服务员吧,占地方不说,还要开工钱,再请人就挣不到到钱了。
怎么办呢?老板想了想,10个服务员对付20个客人也是能对付过来的,服务员勤快点就好了,伺候完一个客人马上伺候另外一个,还是来得及的。综合考虑了一下,老板决定就使用10个服务人员的线程池!
但是这样又有一个比较严重的缺点:如果正在接受服务员服务的客人点菜很慢,其他的客人可能就要等好长时间了。有些脾气火爆的客人可能就等不了走人了。
这样,我么那就引入了Reactor模式,那么,Reactor模式是如何处理这个问题呢?
老板后来发现,客人点菜比较慢,大部服务员都在等着客人点菜,其实干的活不是太多。老板之所以能当老板当然有点不一样的地方,终于发现了一个新的方法,那就是:当客人点菜的时候,服务员就可以去招呼其他客人了,等客人点好了菜,直接招呼一声“服务员”,马上就有个服务员过去服务。在用了这个新方法后,老板进行了一次裁员,只留了一个服务员!这就是用单个线程来做多线程的事。实际的餐馆都是用的Reactor模式在服务。
首先我们这一节将学习反应器模式的概念,为什么要学习他,因为高性能网络编程都离不开反应器模式,而且很多著名的服务器软件或者中间件都是基于反应器模式实现的。所有从开发的角度来说,如果想完成高性能的服务器开发,反应器模式是必须学习和掌握的。
什么是Reactor反应器模式
反应器模式由Reactor反应器线程、Handlers处理器两大角色组成。
Reactor反应器:负责响应IO事件,当检查到IO事件就分发任务到Handlers处理器。
Handlers处理器:与IO事件绑定,负责IO事件的处理,非阻塞的执行业务处理逻辑。
多线程OIO的弊端
在OIO编程中,最初的网络服务器程序是通过简单的while循环来不断地监听端口,判断是否有新的连接,如果有就调用函数处理。
while(true){
socket=accept();//阻塞,接受连接
handle(socket);//读取数据、业务处理、写入结果
}
从代码中我们可以清楚的看出如果前一个网络的handle未处理完,那么后面的请求无法被接收,于是后面的全部会阻塞。对于服务器来说这是致命的问题。
解决办法
为了解决上述问题,有人提出了一个经典的模式就是Connection Per Thread(一个线程处理一个连接)模式,代码如下:
//...省略: 导入的Java类
class ConnectionPerThread implements Runnable {
public void run() {
try {
//服务器监听socket
ServerSocketserverSocket =
new ServerSocket(NioDemoConfig.SOCKET_SERVER_PORT);
while (!Thread.interrupted()) {
Socket socket = serverSocket.accept();
//接收一个连接后,为socket连接,新建一个专属的处理器对象
Handler handler = new Handler(socket);
//创建新线程,专门负责一个连接的处理
new Thread(handler).start();
}
} catch (IOException ex) { /* 处理异常 */ }
}
//处理器对象
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) {
socket = s;
}
public void run() {
while (true) {
try {
byte[] input = new byte[NioDemoConfig.SERVER_BUFFER_SIZE];
/* 读取数据 */
socket.getInputStream().read(input);
/* 处理业务逻辑,获取处理结果*/
byte[] output =null;
/* 写入结果 */
socket.getOutputStream().write(output);
} catch (IOException ex) { /*处理异常*/ }
}
}
}
}
从代码上分析,优点是解决了前面连接被阻塞的问题。缺点也很明显,对于大量的连接就需要大量的线程资源。线程是系统比较昂贵的资源,频繁的使用线程在高并发场景下是致命的。
综上分析,如何解决这些缺陷呢?就是使用Reactor反应器模式,用反应器模式来控制线程数量,做到一个线程处理大量连接。
单线程Reactor反应器模式
之前说到过Reactor反应器模式类似于事件驱动。在事件驱动模式中,当有事件驱动时,事件源会将事件分发到handler处理器进行事件处理。反应器中模式中的反应器角色类似于事件驱动模式的事件分发角色。
方法一:void attach(Object o)
可以将JAVA POJO对象作为附件添加到SelectionKey实例中,相当于Setter方法
方法二:Object attachment()
是取出attach添加到SelectionKey选择键实例的附件,相当于Getter方法
总之,在反应器模式中,需要进行attach和attachment结合使用:在选择键注册完成之后,调用attach方法,将Handler处理器绑定到选择键;当事件发生时,调用attachment方法,可以从选择键取出Handler处理器,将事件分发到Handler处理器中,完成业务处理。
单线程Reactor反应器参考代码
//...
class Reactor implements Runnable {
Selector selector;
ServerSocketChannel serverSocket;
EchoServerReactor() throws IOException {
//打开选择器、serverSocket连接监听通道
selector=Selector.open();
serverSocket=ServerSocketChannel.open()
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
serverSocket.socket().bind(address);
//非阻塞
serverSocket.configureBlocking(false);
//分步处理,第一步,接收accept事件
SelectionKey sk =
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//将新连接处理器作为附件,绑定到sk选择键
sk.attach(new AcceptorHandler());
}
public void run() {
//选择器轮询
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
//反应器负责dispatch收到的事件
SelectionKeysk=it.next();
dispatch(sk);
}
selected.clear();
}
} catch (IOException ex) { ex.printStackTrace(); }
}
//反应器的分发方法
void dispatch(SelectionKey k) {
Runnable handler = (Runnable) (k.attachment());
//调用之前绑定到选择键的handler处理器对象
if (handler != null) {
handler.run();
}
}
// 新连接处理器
class AcceptorHandler implements Runnable {
public void run() {
try
{
SocketChannel channel = serverSocket.accept();
if (channel != null)
new EchoHandler(selector, channel);
} catch (IOException e)
{
e.printStackTrace();
}
}
}
//….
}
上面代码中,设计了新的Handler处理器也就是AcceptorHandler处理器,他是一个内部类。在注册serverSocket服务监听连接的接受事件之后,创建一个AcceptorHandler新连接处理器的实例,作为附件,被设置(attach)到了SelectionKey中。
当新的连接发生后,取出之前acctach到selectionKey的Handler业务处理器,进行IO处理。
AcceptorHandler的功能有两个,一个是接收新的连接,另一个是为新连接创建一个新的输入输出的Handler处理器。
EchoHandler,顾名思义,就是负责socket的数据输入、业务处理、结果输出。示例代码如下:
class EchoHandler implements Runnable
{
final SocketChannel channel;
final SelectionKey sk;
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
EchoHandler(Selector selector, SocketChannel c) throws IOException
{
channel = c;
c.configureBlocking(false);
//仅仅取得选择键,后设置感兴趣的IO事件
sk = channel.register(selector, 0);
//将Handler作为选择键的附件
sk.attach(this);
//第二步,注册Read就绪事件
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
public void run()
{
try
{
if (state == SENDING)
{
//写入通道
channel.write(byteBuffer);
//写完后,准备开始从通道读,byteBuffer切换成写模式
byteBuffer.clear();
//写完后,注册read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//写完后,进入接收的状态
state = RECIEVING;
} else if (state == RECIEVING)
{
//从通道读
int length = 0;
while ((length = channel.read(byteBuffer)) > 0)
{
Logger.info(new String(byteBuffer.array(), 0, length));
}
//读完后,准备开始写入通道,byteBuffer切换成读模式
byteBuffer.flip();
//读完后,注册write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
//读完后,进入发送的状态
state = SENDING;
}
//处理结束了, 这里不能关闭select key,需要重复使用
//sk.cancel();
} catch (IOException ex)
{
ex.printStackTrace();
}
}
}
在EchoHandler的构造器中,有两点比较重要:
(1)将新的SocketChannel传输通道,注册到了反应器Reactor类的同一个选择器中。这样保证了Reactor类和Handler类在同一个线程中执行。
(2)Channel传输通道注册完成后,将EchoHandler自身作为附件,attach到了选择键中。这样,在Reactor类分发事件(选择键)时,能执行到EchoHandler的run方法。
在不熟悉网络通信的情况下理解起来还是挺费劲的,首先你得清楚每个方法,每个模块的含义,再通过实际的运行代码跑一跑才会加深印象。