掘金 后端 ( ) • 2024-03-27 09:52

theme: channing-cyan

1. 什么是IO模型?

I/O 模型:就是用什么样的通道或者说是通信模式和架构进行数据的传输和接收,很大程度上决定了程序通信的性能。

Java 共支持 3 种网络编程的/IO 模型:BIO、NIO、AIO

BIO同步阻塞的,NIO同步非阻塞的,AIO是异步非阻塞的。先来理解这三者之间的区别:

  • 同步阻塞:服务器在连接客户端时,如果当前没有客户端请求连接,服务器就需要一直等待,阻塞在这里。同理,服务器在接收客户端数据的时候,如果客户端一直没有发送过来,服务器也需要一直等待。

  • 同步非阻塞:服务器在连接客户端时,如果当前没有客户端请求连接,服务器可以先做其它事情,等一会再来看是否有连接请求。同理,服务器在接收客户端数据的时候,如果客户端一直没有发送过来,服务器可以先处理其它客户端的业务,但仍需要主动检查客户端有没有发送数据。

  • 异步非阻塞:服务器在连接客户端时,如果当前没有客户端请求连接,服务器可以先做其它事情,当客户端发送了IO请求,os先完成(或监听器)就会通知服务器去进行处理。同理,服务器在接收数据时,也无需等待,当有客户端发送了数据,os(或监听器)会通知服务器去接收数据。

简单来讲,阻塞与非阻塞的区别在于得到结果之前能否去做其它的事情同步和异步的区别在于是否需要主动去检查有没有产生结果

1.1 什么是BIO?

BIO(Blocking IO):同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。

image.png

1.2 什么是NIO?

NIO(No-blocking IO):同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有 I/O 请求就进行处理。

image.png

1.3 什么是AIO?

AIO(NIO.2) :异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理,一般适用于连接数较多且连接时间较长的应用。

1.4 适用场景

  • BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解。

  • NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4 开始支持。

  • AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。

2. BIO深入理解

2.1 BIO基本介绍

Java BIO 就是传统的 Java io 编程,其相关的类和接口在 java.io包下。

BIO(blocking I/O) : 同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)。

2.2 BIO工作流程

  1. 服务器端启动一个 ServerSocket,注册端口,调用accpet方法监听客户端的Socket连接。

  2. 客户端启动 Socket 对服务器进行通信,默认情况下服务器端需要对每个客户建立一个线程与之通讯。

2.3 基于BIO的通信案例

服务端代码

public class BIOServer {
    public static void main(String[] args) throws IOException {
        //1.创建一个ServerSocket,负责监听客户端请求
        ServerSocket serverSocket = new ServerSocket(8888);
        System.out.println("------服务器已启动-------");
        while (true) {
            //2.使用ServerSocket进行监听
            Socket socket = serverSocket.accept(); // 请求未到时,在此阻塞
            //3.创建一个新的线程来处理请求
            new Thread(() -> {
                try {
                    OutputStream out = socket.getOutputStream();
                    InputStream in = socket.getInputStream();
                    byte[] buf = new byte[1024];
                    int read = in.read(buf);
                    if (read == -1) {
                        throw new RuntimeException("连接已断开");
                    }
                    System.out.println("接收客户端的数据:" + new String(buf, 0, read));
                    out.write(buf, 0, read);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }).start();
        }
    }
}

客户端代码

public class BIOClient {
    public static void main(String[] args) throws IOException {
        // 建立Socket连接
        Socket socket = new Socket("127.0.0.1", 8888);
        OutputStream out = socket.getOutputStream();
        InputStream in = socket.getInputStream();
        byte[] send = "hello, world!".getBytes();
        out.write(send);
        byte[] buf = new byte[1024];
        int read = in.read(buf);
        if (read == -1) {
            throw new RuntimeException("连接已断开");
        }
        System.out.println("服务端返回的数据:" + new String(buf, 0, read));
    }
}

2.4 总结

BIO模型存在如下缺陷:

  • 每接收一个客户端的Socket,服务端都创建一个线程,线程的竞争、上下文切换影响性能;

  • 并不是每个socket都在进行IO操作,存在无意义的线程处理;

  • 客户端的并发访问增加时。服务端将呈现1:1的线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务。

3. NIO深入理解

3.1 NIO基本介绍

Java NIO(Non-blocking IO)是从Java 1.4版本开始引入的一个新的IO API,提供了一系列改进的输入/输出的新特性,是同步非阻塞的。

NIO 相关类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写。

NIO 有三大核心部分:Channel(通道), Buffer(缓冲区), Selector(选择器)

NIO 支持面向缓冲区的、基于通道的IO操作。传统IO的read和write只能阻塞执行,线程在读写IO期间不能干其他事情,比如调用socket.read()时,如果服务器一直没有数据传输过来,线程就一直阻塞,而NIO中可以配置socket为非阻塞模式。

Java NIO 的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。

通俗理解:NIO 是可以做到用一个线程来处理多个操作的。假设有 1000 个请求过来,根据实际情况,可以分配20 或者 80个线程来处理。不像之前的阻塞 IO 那样,非得分配 1000 个。

3.2 NIO工作流程

image.png

一个线程对应一个Selector一个Selector对应多个channel(连接)。程序切换到哪个channel是由事件决定的,Selector会根据不同的事件,在各个通道上切换。每个channel都会对应一个Buffer,数据的读取写入是通过Buffer完成的。Buffer就是一个内存块,底层是一个数组。BIO中要么是输入流,要么是输出流,不能双向,但是 NIO 的Buffer是可以读也可以写的

Java NIO系统的核心在于:通道(Channel)和缓冲区(Buffer)。通道表示打开到 IO 设备(例如:文件、 套接字)的连接。若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。简而言之,Channel 负责传输,Buffer负责存取数据

3.3 NIO 三大核心

3.3.1 缓冲区Buffer

基本介绍:

Buffer(缓冲区)是一个可以读取数据的内存块,可以理解成一个容器对象(含数组),该对象提供了一组方法,可以轻松的使用内存块。缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。channel提供从文件、网络读取数据的渠道,但是读取和写入数据必须都经由buffer。

image.png

Buffer是一个抽象类,根据数据类型不同,有以下 Buffer 常用子类:ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。上述 Buffer 类他们都采用相似的方法进行管理数据,只是各自管理的数据类型不同而已。

基本属性:

Buffer类定义了四个属性来提供关于其所存储的数据的信息:

  • capacity:容量,即可以容纳的最大数据量。在缓冲区被创建时确定,并且不能更改。
  • limit:表示缓冲区中可以操作数据的大小(limit后数据不能进行读写)。缓冲区的limit不能为负,并且不能大于其容量。写入模式,limit等于buffer的容量。读取模式下,limit等于写入的数据量。
  • position:下一个要读取或写入的数据的索引。缓冲区的位置不能为负,并且不能大于其limit。
  • mark:标记,是一个索引,通过 Buffer 中的mark()方法指定 Buffer 中一个特定的position,之后可以通过调用 reset() 方法恢复到这个position。

标记、位置、限制、容量遵守以下不变式:0 <= mark <= position <= limit <= capacity

image.png

基本方法:

  • int capacity():返回 Buffer 的 capacity 大小;
  • int position():返回缓冲区的当前位置 position;
  • Buffer position(int n):将设置缓冲区的当前位置为 n,并返回修改后的 Buffer 对象;
  • int limit():返回 Buffer 的界限(limit)的位置;
  • Buffer limit(int n):将设置缓冲区界限为 n,并返回一个具有新的 limit 的缓冲区对象;
  • Buffer mark():对缓冲区设置标记;
  • Buffer reset():将位置 position 转到以前设置的 mark 所在的位置;
  • Buffer clear():清空缓冲区并返回对缓冲区的引用;
  • Buffer flip():为将缓冲区的界限设置为当前位置,并将当前位置充值为 0;
  • boolean hasRemaining():判断当前位置和limit之间是否还有元素;
  • boolean isReadOnly():缓冲区是否为只读;
  • boolean hasArray():缓冲区是否有可以访问的底层实现数组;
  • Object array():返回缓冲区的底层实现数组;

缓冲区读写操作通过get()和put()方法来进行:

  • get():读取单个字节;
  • get(byte[] dst):批量读取多个字节到dst中;
  • get(int index):读取指定索引位置的字节(不会移动 position);
  • put(byte b):将给定单个字节写入缓冲区的当前位置;
  • put(byte[] src):将 src 中的字节写入缓冲区的当前位置;
  • put(int index, byte b):将指定字节写入缓冲区的索引位置(不会移动 position)

使用案例: Buffer的子类中最常用的是ByteBuffer类,ByteBuffer常用方法使用案例如下:

/**
 * Buffers所有子类的管理方式几乎一致,通过 allocate() 获取缓冲区
 * 获取Buffer的方式有两种:
 *   非直接缓冲区:通过 allocate() 方法分配缓冲区,将缓冲区建立在 JVM 的内存中
 *   直接缓冲区:通过 allocateDirect() 方法分配直接缓冲区,将缓冲区建立在物理内存中。可以提高读写效率,
 *   但是直接缓冲区的创建和销毁会有更大的开销
 */
public class BufferCase {
    public static void main(String[] args) {
        String str = "abcde";

        // 分配缓冲区
        //1. 分配一个指定大小的缓冲区
        ByteBuffer buf = ByteBuffer.allocate(1024);
        // 此时 position = 0; limit = capacity = 1024
        System.out.println("-----------------allocate()----------------");
        System.out.println(buf.position());
        System.out.println(buf.limit());
        System.out.println(buf.capacity());

        //2. 利用 put() 存入数据到缓冲区中
        buf.put(str.getBytes());
        // 此时 position = 5; limit = capacity = 1024
        System.out.println("-----------------put()----------------");
        System.out.println(buf.position());
        System.out.println(buf.limit());
        System.out.println(buf.capacity());

        //3. 切换读取数据模式
        buf.flip();
        // 此时 position = 0; limit = 5; capacity = 1024
        System.out.println("-----------------flip()----------------");
        System.out.println(buf.position());
        System.out.println(buf.limit());
        System.out.println(buf.capacity());

        //4. 利用 get() 读取缓冲区中的数据
        byte[] dst = new byte[buf.limit()];
        buf.get(dst);
        // 此时 position = 5; limit = 5; capacity = 1024
        System.out.println("-----------------get()----------------");
        System.out.println(new String(dst, 0, dst.length));
        System.out.println(buf.position());
        System.out.println(buf.limit());
        System.out.println(buf.capacity());

        //5. rewind() : 可重复读
        buf.rewind();
        // 此时 position = 0; limit = 5; capacity = 1024
        System.out.println("-----------------rewind()----------------");
        System.out.println(buf.position());
        System.out.println(buf.limit());
        System.out.println(buf.capacity());

        //6. clear() : 清空缓冲区. 但是缓冲区中的数据依然存在,但是处于“被遗忘”状态
        buf.clear();
        // 此时 position = 0; limit = capacity = 1024
        System.out.println("-----------------clear()----------------");
        System.out.println(buf.position());
        System.out.println(buf.limit());
        System.out.println(buf.capacity());
        // 原有的数据仍然存在,可以读出
        System.out.println((char)buf.get());
    }
}

3.3.2 通道Channel

基本介绍:

Channel(通道)由java.nio.channels包定义。Channel 表示 IO 源与目标打开的连接。Channel本身不能直接访问数据,Channel只能与 Buffer 进行交互。

1)Channel类似于传统的流,但有些区别:通道可以同时进行读写,而流只能读或者只能写;通道可以实现异步读写数据;通道可以从缓冲读数据,也可以写数据到缓冲。

2)BIO 中的 stream 是单向的,例如 FileInputStream 对象只能进行读取数据的操作,而 NIO 中的通道(Channel)是双向的,可以读操作,也可以写操作。

3)Channel 在NIO中是一个接口 public interface Channel extends Closeable{},常用的Channel实现类有:

  • FileChannel:用于读取、写入、映射和操作文件的通道。
  • DatagramChannel:通过 UDP 读写网络中的数据。
  • SocketChannel:通过 TCP 读写网络中的数据。
  • ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。 注意:ServerSocketChannel 类似 ServerSocket , SocketChannel 类似 Socket

使用案例:

这里案例介绍FileChannel的基本使用,后面NIO通信案例会涉及网络相关的通道。

/**
 * 使用Buffer和FileChannel,将 ”你好,世界!“ 写入到file.txt中
 */
public class FileChannelCase {
    public static void main(String[] args) throws IOException {
        String str = "你好,世界!";

        // 创建一个文件输出流
        FileOutputStream fileOutputStream = new FileOutputStream("d:\file.txt");

        // 通过 fileOutputStream 获得对应的 FileChannel
        FileChannel channel = fileOutputStream.getChannel();

        // 创建一个缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);

        // 将数据写入缓冲区
        buffer.put(str.getBytes());

        // 转化Buffer为读取模式
        buffer.flip();

        // 将Buffer中的数据写入到channel
        channel.write(buffer);

        // 关闭文件流,通道会随之关闭
        fileOutputStream.close();
    }
}

3.3.3 选择器Selector

基本介绍:

Java 的 NIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到 Selector(选择器)

Selector 能够检测多个注册的通道上是否有事件发生(注意:多个 Channel 以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。

只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程。避免了多线程之间的上下文切换导致的开销。

基本方法:

Selector类是一个抽象类,有如下相关方法:

  • Selector open():得到一个选择器对象;
  • int select(long timeout):监控所有注册的通道,当其中有IO操作可以进行时,将对应的SelectionKey加入内部集合并返回。参数用来设置超时时间(可以不设置)。
  • Set<SeletionKey> selectedKeys():从内部集合中得到所有的SeletionKey。

NIO网络编程中 Selector、SeletionKey、ServerSocketChanne、SocketChannel之间的关系说明:

image.png

NIO网络通信基本流程:

1)将 ServerSocketChanne 注册到Selector上,Selector会监听客户端连接事件,当有连接事件发生时,会为客户端生成对应的SocketChannel。

2)将SocketChannel使用register(Selector sel, int ops)方法注册到Selector上,一个Selector上可以注册多个Channel。注册后返回一个SeletionKey,一个SeletionKey对应一个SocketChannel。

3)Selector使用select()方法进行监听,会返回有事件发生的通道的个数,进一步可以得到对应的SeletionKey(有事件发生的)。

4)通过SeletionKey的channel()方法反向获得SocketChannel,通过channel通道完成对应的业务处理。

SeletionKey相关:

属性:

  • int ON_ACCEP:有新的网络连接可以accept,值为16;
  • int ON_CONNECT:连接已经建立,值为8;
  • int ON_READ:有读操作事件发生,值为1;
  • int ON_WRITE:有写操作事件发生,值为4。

方法:

  • Selector selector():得到与之相关的Selector对象;
  • SelectableChannel channel():得到与之相关的通道;
  • Object attachment():得到与之关联的共享数据;
  • SeletionKey interestOps(int ops):设置或改变监听事件;
  • boolean isAcceptable():是否可以accept
  • boolean isReadable():是否可以读
  • boolean isWritable():是否可以写

补充ServerSocketChannel和SocketChannel相关:

ServerSocketChannel在服务端监听新的客户端Socket连接。相关方法:

  • ServerSocketChannel open():得到一个ServerSocketChannel通道;
  • ServerSocketChannel bind(SocketAddress local):设置服务器端口号;
  • SelectableChannel configureBlocking(boolean block):设置阻塞或非阻塞模式,false代表非阻塞模式;
  • SocketChannel accept():接收一个socket连接,返回对应的SocketChannel通道;
  • SeletionKey register(Selector sel, int ops):注册到选择器并设置监听事件。

SocketChannel,网络IO通道,具体负责读写操作。NIO把缓冲区的数据写入通道,或者把通道的数据读取到缓冲区。相关方法:

  • SocketChannel open():得到一个SocketChannel通道;
  • SelectableChannel configureBlocking(boolean block):设置阻塞或非阻塞模式,false代表非阻塞模式;
  • boolean connect(SocketAddress remote):连接服务器;
  • boolean finishConnect():如果上面方法失败,通过这个方法连接服务器;
  • int write(ByteBuffer src):往通道里写数据;
  • int read(ByteBuffer dst):从通道里读数据;
  • SeletionKey register(Selector sel, int ops, Object att):注册到选择器并设置监听事件,最后一个参数可以设置共享数据(Buffer);
  • void close():关闭通道。

3.4 基于NIO的通信案例

NIO非阻塞网络通信案例,实现服务端和客户端之间的简单通信(非阻塞)。

public class NIOServer {
    public static void main(String[] args) throws IOException {
        // 创建ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        // 设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);

        // ServerSocketChannel 绑定端口
        serverSocketChannel.socket().bind(new InetSocketAddress(8888));

        // ServerSocketChannel 注册到 Selector, 监听事件为 accept
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        // Selector监听事件
        while (true) {
            // 阻塞 1s,如果没有事件发生,输出结果并进入下一个循环; 可以不设置select()方法的参数,就是非阻塞
            if (selector.select(1000) == 0) {
                //System.out.println("等待 1s,没有连接请求。");
                continue;
            }

            // 获取发生事件的集合 selectionKeys
            Set<SelectionKey> selectionKeys = selector.selectedKeys();

            // 使用迭代器遍历 selectionKeys
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();

                // 根据 selectionKey 对应通道发生的事件做处理
                if (selectionKey.isAcceptable()) {
                    // 处理连接事件,为新的客户端生成一个SocketChannel
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    System.out.println("客户端连接成功。");
                    // 设置为非阻塞模式
                    socketChannel.configureBlocking(false);
                    // 将socketChannel 注册到selector,监听事件为 read,同时给socketChannel关联一个buffer
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                }

                if (selectionKey.isReadable()) {
                    // 处理读事件,通过 selectionKey 反向获取到channel
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    // 获取到改channel关联的Buffer
                    ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                    // 从channel中读取数据到buffer,并打印
                    channel.read(buffer);
                    System.out.println("接收到客户端的数据:" + new String(buffer.array(), 0, buffer.position()));
                }

                // 手动从集合中移除selectionKey,防止重复操作
                iterator.remove();
            }
        }
    }
}
public class NIOClient {
    public static void main(String[] args) throws IOException {
        // 创建一个SocketChannel
        SocketChannel socketChannel = SocketChannel.open();

        // 设置为非阻塞模式
        socketChannel.configureBlocking(false);

        // 连接服务器
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
        if (!socketChannel.connect(address)) {
            // 如果第一次连接失败, 使用finishConnect() 连接
            while (!socketChannel.finishConnect()) {
                System.out.println("连接没有立刻成功,但是客户端不会阻塞,可以执行其它语句!");
            }
        }

        // 发送数据
        Scanner input = new Scanner(System.in);
        while (input.hasNext()) {
            String str = input.nextLine();
            // Wraps a byte array into a buffer.
            ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
            // 将buffer的数据写入到channel
            socketChannel.write(buffer);
        }
    }
}

AIO简单介绍

JDK 7 引入了 Asynchronous I/O,即AIO。在进行IO编程中,常用到两种模式ReactorProactor。Java的NIO就是Reactor,当有事件触发时,服务器端得到通知,进行相应的处理。

AIO又称为NIO2.0,是异步非阻塞的IO。AIO引入异步通道的概念,采用了Proactor模式,简化了程序编写,有效的请求才启动线程,它的特点是现有OS完成后才通知服务器启动线程去处理,一般适用于连接数较多且连接时间较长的应用。AIO尚未广泛应用,不再详细介绍。

写在最后

本文主要总结了Java BIO和NIO通信模型。文章总结便于理解和回顾。新手读者亲自实现案例能够更加深刻的理解。后面会陆续总结Netty系列RPC项目系列,喜欢的同学可以点赞收藏,支持继续更新。