本文共 7146 字,大约阅读时间需要 23 分钟。
之前已经说过了BIO模型的原理和实现,并根据其不足(阻塞,多线程资源消耗等),介绍了内核的升级实现了accpet和read不阻塞的方法,以及介绍了channel和buffer的模型和实现。
上篇结束的时候提到了NIO(os层面)不足之处 承接上文,如果有很多的链接进来,单纯的NIO的使用,我们程序需要对所有链接进行地毯式的遍历,监听所有链接事件,大致java实现模型如下:既然知道了上述模型的弊端,就会有解决的办法:如果程序不需要每次都地毯式的遍历所有链接,而是我们将所有链接(文件描述符)都告诉内核,内核只返回给我们其中有状态的IO,我们程序中只对这些链接进行遍历不就行了。
那接下来就说说这样实现思想的多路复用器的使用。
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
监视并等待多个文件描述符的属性变化(可读、可写或错误异常)。select()函数监视的文件描述符分 3 类,分别是writefds、readfds、和 exceptfds。调用后 select() 函数会阻塞,直到有描述符就绪(有数据可读、可写、或者有错误异常),或者超时( timeout 指定等待时间),函数才返回。当 select()函数返回后,可以通过遍历 fdset,来找到就绪的描述符。
大致模型如下:将需要监控的文件描述符作为参数传递给select,select会只返回有事件的资源信息,这样就避免了地毯式遍历,浪费资源。
那么select又是怎么知道哪些是有事件的资源呢?或者说内核又是怎么实现select的呢?select需要驱动程序的支持,驱动程序实现fops内的poll函数。select通过每个设备文件对应的poll函数提供的信息判断当前是否有资源可用(如可读或写),如果有的话则返回可用资源的文件描述符个数,没有的话则睡眠,等待有资源变为可用时再被唤醒继续执行。
支持阻塞操作的设备驱动通常会实现一组自身的等待队列如读/写等待队列用于支持上层(用户层)所需的BLOCK或NONBLOCK操作。当应用程序通过设备驱动访问该设备时(默认为BLOCK操作),若该设备当前没有数据可读或写,则将该用户进程插入到该设备驱动对应的读/写等待队列让其睡眠一段时间,等到有数据可读/写时再将该进程唤醒。
select就是巧妙的利用等待队列机制让用户进程适当在没有资源可读/写时睡眠,有资源可读/写时唤醒。
下面我们看看select睡眠的详细过程。
select会循环遍历它所监测的fd_set内的所有文件描述符对应的驱动程序的poll函数。驱动程序提供的poll函数首先会将调用select的用户进程插入到该设备驱动对应资源的等待队列(如读/写等待队列),然后返回一个bitmask告诉select当前资源哪些可用。当select循环遍历完所有fd_set内指定的文件描述符对应的poll函数后,如果没有一个资源可用(即没有一个文件可供操作),则select让该进程睡眠,一直等到有资源可用为止,进程被唤醒(或者timeout)继续往下执行。
通过上述的理解,select多路复用器有什么弊端呢?
我们发现针对上面问题,有什么改变的思想呢? 如果不用每次都将文件描述符拷贝给内核,不用每次都遍历所有文件描述符,而是让内核自己记录好所有的有事件的文件描述符,我们最终去取就行了。
epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。
epoll 的3个系统调用:
实现模型如下:
通过以上了解,我们对比下selet,epoll不在需要从用户端拷贝fd_set到内核空间,而且也不用对任何fd进行遍历操作,时间复杂度从O(n)直接降到了O(1),现实中,基本上都不使用select和poll,面对多连接的并发场景,基本上都选择使用epoll。以上讲些的多路复用器的实现原理实现都是C和部分系统调用相关的代码,那么JAVA是怎么使用的多路复用器呢?
java在1.4之后推出了NIO(new nio) 其中一个核心组件就是selector,它就是对多路复用器的封装实现。(注:selector中的多路复用器可能是select、poll、epoll其中任意一个)
Selector 一般称 为选择器 ,当然你也可以翻译为 多路复用器 。它是Java NIO核心组件中的一个,用于检查一个或多个NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络链接。
使用Selector的好处在于: 使用更少的线程来就可以来处理通道了, 相比使用多个线程,避免了线程上下文切换带来的开销。实现模型;
java代码演示,实现聊天功能:package main.java.com.nio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.util.Iterator;/** * Created by cmm on 2021/6/12. */public class NioServer { private ServerSocketChannel serverSocketChannel; private Selector selector ; NioServer(){ try { // 1. 创建服务器端通道 serverSocketChannel = ServerSocketChannel.open(); // 2. 设置成非阻塞状态 serverSocketChannel.configureBlocking(false); // 3. 创建一个seletor选择器 /** 创建多路复用器:优先epoll epoll : 调用epoll_create 创建epoll对象,并开辟一个内存空间 select | poll : 创建一个数组来存放文件描述符 */ selector = Selector.open(); // 4. 建立服务器端 serverSocketChannel.bind(new InetSocketAddress("127.0.0.1",9999)); // 5. 将该服务器通道注册到选择器,并开始监控accpet事件 /** select | poll : 将文件描述符fd 放入数组中。 epoll : 调用epoll_ctl,将文件描述符fd放入内存空间。 */ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.print("服务器建立成..."); lisent(selector); }catch (Exception e){ System.out.print("服务器建立异常"); } } // 实施监听 private void lisent(Selector selector) { while (true){ try { /** select | poll : 查看数组,返回所有fd epoll : 查看红黑树中的fd */ selector.keys() /** select | poll : 传入df,遍历查找有事件状态的IO epoll :调用epoll_wait */ if(selector.select(1000)>0){ // 1. 将活动的事件接收selector.selectedKeys()---获取所有活动的事件 Iterator iterator = selector.selectedKeys().iterator(); while(iterator.hasNext()){ // 2. 迭代遍历一个个有事件的selectionkey SelectionKey selectionKey = (SelectionKey) iterator.next(); if(selectionKey.isAcceptable()){ // 3.接收链接 SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); if(socketChannel.finishConnect()){ System.out.println("欢迎【客户端】"+socketChannel.getRemoteAddress()+"登录"); // 4.并将客户端也加入selector管理,同时开启read事件监听 socketChannel.register(selector,SelectionKey.OP_READ); } } // 5 处理可读事件 if(selectionKey.isReadable()){ ReadContent(selectionKey); } iterator.remove(); } } } catch (IOException e) { e.printStackTrace(); } } } // 读取客户端信息 private void ReadContent(SelectionKey selectionKey) throws IOException { // 1. 接收客户端channel、 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); // 2. 创建缓冲区 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 3. 读取信息 try { int read = socketChannel.read(byteBuffer); if(read>0){ System.out.println("【客户端】"+socketChannel.getRemoteAddress()+"说:"+new String(byteBuffer.array())); // 信息转发给其他客户端 for(SelectionKey selectionKeyother : selector.keys()){ Channel channel = selectionKeyother.channel(); // 屏蔽自己 if(channel instanceof SocketChannel && socketChannel!=channel){ SocketChannel socketChannelohter1 = (SocketChannel) channel; socketChannelohter1.write(ByteBuffer.wrap(byteBuffer.array())); } } } }catch (Exception e){ selectionKey.cancel(); System.out.println("【客户端】"+socketChannel.getRemoteAddress()+"已下线"); } } public static void main(String[] args) { new NioServer(); }}
服务端将ServerSocketChannel注册到Selector,Selector轮训,通过selector.select()阻塞判断是否有监听事件到达,如果有事件到达,获取到事件SelectionKey的集合,通过迭代SelectionKey集合,判断事件类型,如果是连接事件,则把当前Channel注册到Selector,如果当前Channel有数据可读,则执行相应的操作,执行完成后移除当前SelectionKey,继续迭代处理下一个SelectionKey。
既然如上面所说, Oracle JDK在Linux已经默认使用epoll方式, 为什么netty还要提供一个基于epoll的实现呢?
这是stackoverflow上的一个问题。 Netty的核心开发者 Norman Maurer这么说的:1. Netty的 epoll transport使用 epoll edge-triggered 而 java的 nio 使用 level-triggered。2.另外netty epoll transport 暴露了更多的nio没有的配置参数, 如 TCP_CORK, SO_REUSEADDR等等
所以后续开始说说Netty的模型和实现。
转载地址:http://wnhof.baihongyu.com/