Netty-NIO


一、BIO 模型

1、简介

I/O 模型简单的理解:就是用什么样的通道进行数据的发送和接收,很大程度上决定了程序通信的性能。

  1. Java BIO 就是传统的 Java I/O 编程,其相关的类和接口在 java.io
  2. BIO(BlockingI/O)同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)。【后有应用实例】
  3. BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4 以前的唯一选择,程序简单易理解。

2、工作机制

image-20211108205541814
  1. 服务器端启动一个 ServerSocket
  2. 客户端启动 Socket 对服务器进行通信,默认情况下服务器端需要对每个客户建立一个线程与之通讯。
  3. 客户端发出请求后,先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝。
  4. 如果有响应,客户端线程会等待请求结束后,在继续执行。

3、应用实例

实例说明:

  1. 使用 BIO 模型编写一个服务器端,监听 6666 端口,当有客户端连接时,就启动一个线程与之通讯。
  2. 要求使用线程池机制改善,可以连接多个客户端。
  3. 服务器端可以接收客户端发送的数据(telnet 方式即可)。
  4. 代码演示:
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BIOServer {

    public static void main(String[] args) throws Exception {
        //线程池机制
        //思路
        //1. 创建一个线程池
        //2. 如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法)
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        //创建ServerSocket
        ServerSocket serverSocket = new ServerSocket(6666);
        System.out.println("服务器启动了");
        while (true) {
            System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());
            //监听,等待客户端连接
            System.out.println("等待连接....");
            final Socket socket = serverSocket.accept();
            System.out.println("连接到一个客户端");
            //就创建一个线程,与之通讯(单独写一个方法)
            newCachedThreadPool.execute(new Runnable() {
                public void run() {//我们重写
                    //可以和客户端通讯
                    handler(socket);
                }
            });
        }
    }

    //编写一个handler方法,和客户端通讯
    public static void handler(Socket socket) {
        try {
            System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());
            byte[] bytes = new byte[1024];
            //通过socket获取输入流
            InputStream inputStream = socket.getInputStream();
            //循环的读取客户端发送的数据
            while (true) {
                System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());
                System.out.println("read....");
                int read = inputStream.read(bytes);
                if (read != -1) {
                    System.out.println(new String(bytes, 0, read));//输出客户端发送的数据
                } else {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("关闭和client的连接");
            try {
                socket.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

4、BIO问题分析

  1. 每个请求都需要创建独立的线程,与对应的客户端进行数据 Read,业务处理,数据 Write
  2. 当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大。
  3. 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源浪费。

二、NIO 模型

1、基本介绍

  1. Java NIO 全称 Java non-blocking IO ,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 NewIO),是同步非阻塞的。
  2. NIO 相关类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写。【基本案例】
  3. NIO 有三大核心部分: Channel(通道)、Buffer(缓冲区)、Selector(选择器)
  4. NIO面向缓冲区,或者面向块编程的。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络。
  5. Java NIO 的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。【后面有案例说明】
  6. 通俗理解:NIO 是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况,可以分配 50 或者 100 个线程来处理。不像之前的阻塞 IO 那样,非得分配 10000 个。
  7. HTTP 2.0 使用了多路复用的技术,做到同一个连接并发处理多个请求,而且并发请求的数量比 HTTP 1.1 大了好几个数量级。
  8. 案例说明 NIOBuffer
import java.nio.IntBuffer;

public class BasicBuffer {

    public static void main(String[] args) {

        //举例说明 Buffer 的使用(简单说明)
        //创建一个 Buffer,大小为 5,即可以存放 5 个 int
        IntBuffer intBuffer = IntBuffer.allocate(5);

        //向buffer存放数据
        //intBuffer.put(10);
        //intBuffer.put(11);
        //intBuffer.put(12);
        //intBuffer.put(13);
        //intBuffer.put(14);
        for (int i = 0; i < intBuffer.capacity(); i++) {
            intBuffer.put(i * 2);
        }
        //如何从 buffer 读取数据
        //将 buffer 转换,读写切换(!!!)
        intBuffer.flip();
        while (intBuffer.hasRemaining()) {
            System.out.println(intBuffer.get());
        }
    }
}

2、三大核心组件关系

image-20211108213327784
  1. 每个 Channel 都会对应一个 Buffer
  2. Selector 对应一个线程,一个线程对应多个 Channel(连接)。
  3. 该图反应了有三个 Channel 注册到该 Selector //程序
  4. 程序切换到哪个 Channel 是由事件决定的,Event 就是一个重要的概念。
  5. Selector 会根据不同的事件,在各个通道上切换。
  6. Buffer 就是一个内存块,底层是有一个数组。
  7. 数据的读取写入是通过 Buffer,这个和 BIOBIO 中要么是输入流,或者是输出流,不能双向,但是 NIOBuffer 是可以读也可以写,需要 flip 方法切换 Channel 是双向的,可以返回底层操作系统的情况,比如 Linux,底层的操作系统通道就是双向的。

Channel与Buffer

Java NIO系统的核心在于:通道(Channel)和缓冲区(Buffer)**。通道表示打开到 IO 设备(例如:文件、套接字)的连接。若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区**。然后操作缓冲区,对数据进行处理

简而言之,通道负责传输,缓冲区负责存储

常见的Channel有以下四种,其中FileChannel主要用于文件传输,其余三种用于网络通信

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

Buffer有以下几种(用于将不同类型的数据存储到缓冲区),其中使用较多的是ByteBuffer,

  • ByteBuffer
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

3、NIO 和 BIO 比较

  1. BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,块 I/O 的效率比流 I/O 高很多。
  2. BIO 是阻塞的,NIO 则是非阻塞的。
  3. BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道。

三、NIO - 缓冲区

1、缓冲区(Buffer)

(1)基本介绍

缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer

使用缓冲区的好处:服务器的内存并不是无限大的,当读取大文件时,很可能造成内存占用过高,甚至不足。

(2)基本的使用方法

1、写入数据:向 buffer 写入数据,例如调用 channel.read(buffer)

2、切换至读模式:调用 flip() (flip会使得buffer中的limit变为position,position变为0)

3、读取数据:从 buffer 读取数据,例如调用 buffer.get()

4、切换至写模式:调用 clear() 或者compact()切换至写模式

  • 调用clear()方法时position=0,limit变为capacity
  • 调用compact()方法时,会将缓冲区中的未读数据压缩到缓冲区前面

5、重复以上步骤

案例:使用ByteBuffer读取文件中的内容

public class P6_ByteBuffer {

    public static void main(String[] args) {
        // FileChannel
        // 1、输入输出流,2、RandomAccessFile
        try(FileChannel channel = new FileInputStream("D:\\data.txt").getChannel()){
            // 准备缓冲区(创建10字节大小的缓冲区)
            ByteBuffer buffer = ByteBuffer.allocate(10);
            int len = 0;
            StringBuilder builder = new StringBuilder();
            while(true){
                // 从 channel 中读取数据,向 buffer 中写入
                len = channel.read(buffer);
                System.out.println("读取到的字节数:" + len);
                // 判断是否含有内容
                if (len == -1){
                    break;
                }
                // 切换至读模式
                buffer.flip();

                // 读取 buffer 的内容并打印
                while(buffer.hasRemaining()){
                    byte b = buffer.get();
                    builder.append((char)b);
                }

                buffer.clear(); // 切换为写模式
            }

            System.out.println(builder);
        }catch (IOException e){
            e.printStackTrace();
        }
    }
}

(3)Buffer 核心属性

字节缓冲区的父类Buffer中有几个核心属性,如下

// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
  • capacity:缓冲区的容量。通过构造函数赋予,一旦设置,无法更改
  • limit:缓冲区的界限。位于limit 后的数据不可读写。缓冲区的限制不能为负,并且不能大于其容量
  • position下一个读写位置的索引(类似PC)。缓冲区的位置不能为负,并且不能大于limit
  • mark:记录当前position的值。position被改变后,可以通过调用reset() 方法恢复到mark的位置。

以上四个属性必须满足以下要求

mark <= position <= limit <= capacity

(4)切换读写模式

切换读写模式的本质是改变 positionlimit 两个指针的位置

一开始

写模式下,position 是写入位置,limit 等于容量,下图表示写入了 4 个字节后的状态

flip 动作发生后,position 切换为读取位置,limit 切换为读取限制

读取 4 个字节后,状态

clear 动作发生后,状态

compact 方法,是把未读完的部分向前压缩,然后切换至写模式

(4)Buffer 核心方法

put() 方法
  • put()方法可以将一个数据放入到缓冲区中。
  • 进行该操作后,postition的值会+1,指向下一个可以放入的位置。capacity = limit ,为缓冲区容量的值。
image-20211109210748373
flip() 方法
  • flip()方法会切换对缓冲区的操作模式,由写->读 / 读->写
  • 进行该操作后
    • 如果是写模式->读模式,position = 0 , limit 指向最后一个元素的下一个位置,capacity不变
    • 如果是读->写,则恢复为put()方法中的值
image-20211109211057882
get() 方法
  • get() 方法会读取缓冲区中的一个值
  • 进行该操作后,position会+1,如果超过了limit则会抛出异常
  • 注意:get(i)方法不会改变position的值

image-20211109211419282

clean() 方法
  • clean()方法会将缓冲区中的各个属性恢复为最初的状态,position = 0, capacity = limit
  • 此时缓冲区的数据依然存在,处于“被遗忘”状态,下次进行写操作时会覆盖这些数据
image-20211109211614542
compact()方法

此方法为ByteBuffer的方法,而不是Buffer的方法

  • compact会把未读完的数据向前压缩,然后切换到写模式
  • 数据前移后,原位置的值并未清零,写时会覆盖之前的值

img

clear() VS compact()

clear只是对position、limit、mark进行重置,而compact在对position进行设置,以及limit、mark进行重置的同时,还涉及到数据在内存中拷贝(会调用arraycopy)。所以compact比clear更耗性能。但compact能保存你未读取的数据,将新数据追加到为读取的数据之后;而clear则不行,若你调用了clear,则未读取的数据就无法再读取到了

分配空间

可以使用 allocate 方法为 ByteBuffer 分配空间,其它 buffer 类也有该方法

Bytebuffer buf = ByteBuffer.allocate(16);
向 buffer 写入数据

有两种办法

  • 调用 channel 的 read 方法
  • 调用 buffer 自己的 put 方法
int readBytes = channel.read(buf);

buf.put((byte)127);
从 buffer 读取数据

同样有两种办法

  • 调用 channel 的 write 方法
  • 调用 buffer 自己的 get 方法
int writeBytes = channel.write(buf);

byte b = buf.get();

get 方法会让 position 读指针向后走,如果想重复读取数据

  • 可以调用 rewind 方法将 position 重新置为 0
  • 或者调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针
rewind() 方法
  • 该方法只能在读模式下使用
  • rewind()方法后,会恢复position、limit和capacity的值,变为进行get()前的值
mark 和 reset

mark 是在读取时,做一个标记,即使 position 改变,只要调用 reset 就能回到 mark 的位置,是对rewind() 方法的增强

  • mark()方法会将postion的值保存到mark属性中
  • reset()方法会将position的值改为mark中保存的值

注意

rewind 和 flip 都会清除 mark 位置

字符串转 ByteBuffer
// -------------------字符串转化为 ByteBuffer--------------------
// 1、put() 方法,转化后 byteBuffer 仍处于写模式
ByteBuffer buffer = ByteBuffer.allocate(16);
buffer.put("hello".getBytes());
ByteBufferUtil.debugAll(buffer);    // 用于观察调试Buffer指针位置

// 2、Charset,转化后 byteBuffer 自动切换到读模式
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("hello");
ByteBufferUtil.debugAll(buffer1);

// 3、wrap,转化后 byteBuffer 自动切换到读模式
ByteBuffer buffer2 = ByteBuffer.wrap("hello".getBytes());
ByteBufferUtil.debugAll(buffer2);


// ------------------ByteBuffer 转字符串-----------------------
String s1 = StandardCharsets.UTF_8.decode(buffer).toString();
System.out.println(s1);    // 输出为空,因为buffer处于写模式

buffer.flip();    // 转化为读模式
String s2 = StandardCharsets.UTF_8.decode(buffer).toString();
System.out.println(s2);    // 输出hello

String s3 = StandardCharsets.UTF_8.decode(buffer2).toString();
System.out.println(s3);    // 输出hello

从下面的输出中可以观察到第一种在转化为 byteBuffer 之后是处于写模式,而第二三种是自动转化到读模式

+--------+-------------------- all ------------------------+----------------+
position: [5], limit: [16]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 00 00 00 00 00 00 00 00 00 00 00 |hello...........|
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [5]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f                                  |hello           |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [5]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f                                  |hello           |
+--------+-------------------------------------------------+----------------+

2、Buffer 非线程安全

3、分段读取

将文件内容读取到多个 ByteBuffer 中

// new RandomAccessFile()用于获取channel,参数r为只读,文件内容:123456789
try(FileChannel channel = new RandomAccessFile("D:\\data.txt","r").getChannel()){
    ByteBuffer buffer1 = ByteBuffer.allocate(3);
    ByteBuffer buffer2 = ByteBuffer.allocate(3);
    ByteBuffer buffer3 = ByteBuffer.allocate(3);
    // 读取,参数为ByteBuffer数组,则为分段读取
    channel.read(new ByteBuffer[]{buffer1,buffer2,buffer3});
    // 转化为读模式
    buffer1.flip();
    buffer2.flip();
    buffer3.flip();
    System.out.println( StandardCharsets.UTF_8.decode(buffer1).toString() );//123
    System.out.println( StandardCharsets.UTF_8.decode(buffer2).toString() );//456
    System.out.println( StandardCharsets.UTF_8.decode(buffer3).toString() );//789
}catch (IOException e){
    e.printStackTrace();
}

4、集中写入

将多个 ByteBuffer 的内容写入到文件中

ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("hello");
ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("world");
ByteBuffer buffer3 = StandardCharsets.UTF_8.encode("你好");   //一个汉字3个字节

// rw:可读可写权限
try(FileChannel channel = new RandomAccessFile("2.txt","rw").getChannel()){
    // 写数据,生成文件内容:helloworld你好
    channel.write(new ByteBuffer[]{buffer1,buffer2,buffer3});
}catch (IOException e){
    e.printStackTrace();
}

5、粘包、半包

现象

网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔
但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为

  • Hello world\n
  • i like learn java\n
  • Hello netty\n

变成了下面的两个 byteBuffer (粘包,半包)

  • Hello world\ni like learn java\nhe
  • llo netty\n

出现原因

粘包

发送方在发送数据时,并不是一条一条地发送数据,而是将数据整合在一起,当数据达到一定的数量后再一起发送。这就会导致多条信息被放在一个缓冲区中被一起发送出去

半包

接收方的缓冲区的大小是有限的,当接收方的缓冲区满了以后,就需要将信息截断,等缓冲区空了以后再继续放入数据。这就会发生一段完整的数据最后被截断的现象

解决办法

  • 通过get(index)方法遍历ByteBuffer,遇到分隔符时进行处理。

    注意

    :get(index)不会改变position的值

    • 记录该段数据长度,以便于申请对应大小的缓冲区
    • 将缓冲区的数据通过get()方法写入到target中
  • 调用compact方法切换模式,因为缓冲区中可能还有未读的数据

/*
* 模拟粘包半包
* \n :为一个字符
* 一个字母一个字符,一个汉字3个字符
* */
public class P14_ByteBufferExam {

    public static void main(String[] args) {
        final ByteBuffer buffer = ByteBuffer.allocate(32);

        // 解析粘包
        buffer.put("hello world\ni like learn java\nhe".getBytes());
        split(buffer);
        ByteBufferUtil.debugAll(buffer);

        // 解析半包
        buffer.put("llo netty\n".getBytes());
        split(buffer);
        ByteBufferUtil.debugAll(buffer);
    }

    /**
     * 解决粘包、半包
     * 这种方式为学习使用,效率过低
     */
    private static void split(ByteBuffer buffer) {
        buffer.flip();  // 转化为读模式

        for (int i = 0;i < buffer.limit();i++){

            if (buffer.get(i) == '\n'){

                // 计算 buffer 中行的长度
                int len = (i + 1) - buffer.position();

                for (int j = 0 ; j < len ; j++){
                    System.out.print((char) buffer.get());
                }
                System.out.println();
            }
        }
        // 切换为写模式,但是缓冲区可能未读完,这里需要使用compact
        buffer.compact();
    }
}

四、NIO - 文件编程

1、FileChannel

(1)工作模式

FileChannel只能在阻塞模式下工作,所以无法搭配Selector

(2)获取

不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel 方法

  • 通过 FileInputStream 获取的 channel 只能读
  • 通过 FileOutputStream 获取的 channel 只能写
  • 通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定
// 1、通过 FileInputStream 获取只读的 FileChannel
FileInputStream is = new FileInputStream("2.txt");
FileChannel channel = is.getChannel();

// 2、通过 FileOutputStream 获取只写的 FileChannel
FileOutputStream os = new FileOutputStream("3.txt");
FileChannel channel1 = os.getChannel();

// 3、通过 RandomAccessFile 获取 可读可写rw、只读f 的FileChannel
RandomAccessFile raf = new RandomAccessFile("4.txt", "rw");
FileChannel channel2 = raf.getChannel();

(3)读取

通过 FileInputStream 获取channel,通过read方法将数据写入到ByteBuffer中

read方法的返回值表示读到了多少字节,若读到了文件末尾则返回-1

// 从 channel 读取数据
int readBytes = channel.read(buffer);

判断是否读取完毕

// 可根据返回值判断是否读取完毕
while(channel.read(buffer) > 0) {
    // 进行对应操作
    ...
}

(4)写入

因为channel也是有大小的,所以 write 方法并不能保证一次将 buffer 中的内容全部写入 channel。必须需要按照以下规则进行写入

// 通过hasRemaining()方法查看缓冲区中是否还有数据未写入到通道中
while(buffer.hasRemaining()) {
    channel.write(buffer);
}

(5)关闭

通道需要close,一般情况通过try-with-resource进行关闭,最好使用以下方法获取strea以及channel,避免某些原因使得资源未被关闭

JDK1.7之后有了try-with-resource处理机制。首先被自动关闭的资源需要实现Closeable或者AutoCloseable接口,因为只有实现了这两个接口才可以自动调用close()方法去自动关闭资源。写法为try(){}catch(){},将要关闭的外部资源在try()中创建,catch()捕获处理异常。其实try-with-resource机制是一种语法糖,其底层实现原理仍然是try{}catch(){}finally{}写法,不过在catch(){}代码块中有一个addSuppressed()方法,即异常抑制方法。如果业务处理和关闭连接都出现了异常,业务处理的异常会抑制关闭连接的异常,只抛出处理中的异常,仍然可以通过getSuppressed()方法获得关闭连接的异常。

// 在 try 后面创建的可关闭对象,会自动关闭
try (FileInputStream fis = new FileInputStream("stu.txt");
     FileOutputStream fos = new FileOutputStream("student.txt");
     FileChannel inputChannel = fis.getChannel();
     FileChannel outputChannel = fos.getChannel()) {

    // 执行对应操作

}catch (IOException e){
    e.printStackTrace();
}

(6)位置

channel也拥有一个保存读取数据位置的属性,即position

long pos = channel.position();

可以通过position(int pos)设置channel中position的值

long newPos = ...;
channel.position(newPos);

设置当前位置时,如果设置为文件的末尾

  • 这时读取会返回 -1
  • 这时写入,会追加内容,但要注意如果 position 超过了文件末尾,再写入时在新内容和原末尾之间会有空洞(00)

(7)强制写入

操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘,而是等到缓存满了以后将所有数据一次性的写入磁盘。可以调用 force(true) 方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘

inputChannel.force(true);

2、两个 channel 传输数据

transferTo底层使用了零拷贝技术,用于两个 channel 之间传输数据

try (FileInputStream fis = new FileInputStream("stu.txt");
     FileOutputStream fos = new FileOutputStream("student.txt");
     FileChannel inputChannel = fis.getChannel();
     FileChannel outputChannel = fos.getChannel()) {
    // 参数:inputChannel的起始位置,传输数据的大小,目的channel
    // 返回值为传输的数据的字节数
    // transferTo一次只能传输2G的数据
    inputChannel.transferTo(0, inputChannel.size(), outputChannel);
} catch (IOException e) {
    e.printStackTrace();
}

当传输的文件大于2G时,需要使用以下方法进行多次传输

try(
    FileChannel from = new FileInputStream("from.txt").getChannel();
    FileChannel to = new FileOutputStream("to.txt").getChannel()
){
    // 效率高,底层会利用操作系统的零拷贝进行优化,最大传输 2g 数据
    long size = from.size();
    // left 变量代表还剩余多少字节
    for (long left = size; left > 0; ) {
        System.out.println("position:" + (size - left) + " left:" + left);
        left -= from.transferTo((size - left), left, to);
    }
}catch (Exception e){
    e.printStackTrace();
}

3、Path与Paths

  • Path 用来表示文件路径
  • Paths 是工具类,用来获取 Path 实例
  • JDK1.7 引入的
Path source = Paths.get("1.txt"); // 相对路径 不带盘符 使用 user.dir 环境变量来定位1.txt

Path source = Paths.get("d:\\1.txt"); // 绝对路径 代表了  d:\1.txt 反斜杠需要转义

Path source = Paths.get("d:/1.txt"); // 绝对路径 同样代表了  d:\1.txt

Path projects = Paths.get("d:\\data", "projects"); // 代表了  d:\data\projects
  • . 代表了当前路径
  • .. 代表了上一级路径
Path path = Paths.get("d:\\data\\projects\\a\\..\\b");
System.out.println(path);
System.out.println(path.normalize()); // 正常化路径 会去除 . 以及 ..

4、Files

(1)查找文件是否存在

Path path = Paths.get("helloword/data.txt");
System.out.println(Files.exists(path));

(2)创建目录

创建一级目录

Path path = Paths.get("helloword/d1");
Files.createDirectory(path);
  • 如果目录已存在,会抛异常 FileAlreadyExistsException
  • 不能一次创建多级目录,否则会抛异常 NoSuchFileException

创建多级目录用

Path path = Paths.get("helloword/d1/d2");
Files.createDirectories(path);

(3)拷贝、移动文件

拷贝文件

Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/target.txt");

Files.copy(source, target);
  • 如果文件已存在,会抛异常 FileAlreadyExistsException

如果希望用 source 覆盖target,需要用 StandardCopyOption 来控制

Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);

移动文件

Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/data.txt");

Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
  • StandardCopyOption.ATOMIC_MOVE 保证文件移动的原子性

(4)删除

删除文件

Path target = Paths.get("helloword/target.txt");

Files.delete(target);
  • 如果文件不存在,会抛异常 NoSuchFileException

删除目录

Path target = Paths.get("helloword/d1");

Files.delete(target);
  • 如果目录还有内容,会抛异常 DirectoryNotEmptyException

(5)遍历

可以使用Files工具类中的walkFileTree(Path, FileVisitor)方法,其中需要传入两个参数

  • Path:文件起始路径

  • FileVisitor:文件访问器,

    使用访问者模式

    • 接口的实现类

      SimpleFileVisitor

      有四个方法

      • preVisitDirectory:访问目录前的操作
      • visitFile:访问文件的操作
      • visitFileFailed:访问文件失败时的操作
      • postVisitDirectory:访问目录后的操作
public class TestWalkFileTree {
    public static void main(String[] args) throws IOException {
        Path path = Paths.get("F:\\JDK 8");
        // 文件目录数目
        AtomicInteger dirCount = new AtomicInteger();
        // 文件数目
        AtomicInteger fileCount = new AtomicInteger();

        Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
            // 访问目录前的操作
            @Override
            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                System.out.println("===>"+dir);
                // 增加文件目录数
                dirCount.incrementAndGet();
                return super.preVisitDirectory(dir, attrs);
            }

            // 访问文件的操作
            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                System.out.println(file);
                // 增加文件数
                fileCount.incrementAndGet();
                return super.visitFile(file, attrs);
            }
        });
        // 打印数目
        System.out.println("文件目录数:"+dirCount.get());
        System.out.println("文件数:"+fileCount.get());
    }
}Copy

运行结果如下

...
===>F:\JDK 8\lib\security\policy\unlimited
F:\JDK 8\lib\security\policy\unlimited\local_policy.jar
F:\JDK 8\lib\security\policy\unlimited\US_export_policy.jar
F:\JDK 8\lib\security\trusted.libraries
F:\JDK 8\lib\sound.properties
F:\JDK 8\lib\tzdb.dat
F:\JDK 8\lib\tzmappings
F:\JDK 8\LICENSE
F:\JDK 8\README.txt
F:\JDK 8\release
F:\JDK 8\THIRDPARTYLICENSEREADME-JAVAFX.txt
F:\JDK 8\THIRDPARTYLICENSEREADME.txt
F:\JDK 8\Welcome.html
文件目录数:23
文件数:279

(6)判断文件后缀

boolean b = file.toString().endWith(".jar");

(7)删除多级目录

Path path = Paths.get("d:\\a");
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
    @Override
    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) 
        throws IOException {
        Files.delete(file);
        return super.visitFile(file, attrs);
    }

    @Override
    public FileVisitResult postVisitDirectory(Path dir, IOException exc) 
        throws IOException {
        Files.delete(dir);
        return super.postVisitDirectory(dir, exc);
    }
});

删除是危险操作,确保要递归删除的文件夹没有重要内容

(8)拷贝多级目录

long start = System.currentTimeMillis();
String source = "D:\\Snipaste-1.16.2-x64";
String target = "D:\\Snipaste-1.16.2-x64aaa";

Files.walk(Paths.get(source)).forEach(path -> {
    try {
        String targetName = path.toString().replace(source, target);
        // 是目录
        if (Files.isDirectory(path)) {
            Files.createDirectory(Paths.get(targetName));
        }
        // 是普通文件
        else if (Files.isRegularFile(path)) {
            Files.copy(path, Paths.get(targetName));
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
});
long end = System.currentTimeMillis();
System.out.println(end - start);

五、NIO - 网络编程

1、阻塞模式

  • 阻塞模式下,相关方法都会导致线程暂停
    • ServerSocketChannel.accept 会在没有连接建立时让线程暂停
    • SocketChannel.read 会在通道中没有数据可读时让线程暂停
    • 阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置
  • 单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持
  • 但多线程下,有新的问题,体现在以下方面
    • 32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
    • 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接

(1)服务端

public class Server {
    public static void main(String[] args) {
        // 创建缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(16);
        // 获得服务器通道
        try(ServerSocketChannel server = ServerSocketChannel.open()) {
            // 为服务器通道绑定端口
            server.bind(new InetSocketAddress(8080));
            // 用户存放连接的集合
            ArrayList<SocketChannel> channels = new ArrayList<>();
            // 循环接收连接
            while (true) {
                System.out.println("before connecting...");
                // 没有连接时,会阻塞线程
                SocketChannel socketChannel = server.accept();
                System.out.println("after connecting...");
                channels.add(socketChannel);
                // 循环遍历集合中的连接
                for(SocketChannel channel : channels) {
                    System.out.println("before reading");
                    // 处理通道中的数据
                    // 当通道中没有数据可读时,会阻塞线程
                    channel.read(buffer);
                    buffer.flip();
                    ByteBufferUtil.debugRead(buffer);
                    buffer.clear();
                    System.out.println("after reading");
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

(2)客户端

public class Client {
    public static void main(String[] args) {
        try (SocketChannel socketChannel = SocketChannel.open()) {
            // 建立连接
            socketChannel.connect(new InetSocketAddress("localhost", 8080));
            System.out.println("waiting...");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

(3)运行结果

  • 客户端-服务器建立连接前:服务器端因accept阻塞

  • 客户端-服务器建立连接后,客户端发送消息前:服务器端因通道为空被阻塞

  • 客户端发送数据后,服务器处理通道中的数据。再次进入循环时,再次被accept阻塞

  • 之前的客户端再次发送消息,服务器端因为被accept阻塞,无法处理之前客户端发送到通道中的信息

2、非阻塞模式

  • 可以通过 ServerSocketChannelconfigureBlocking(false) 方法将获得连接设置为非阻塞的。此时若没有连接,accept会返回null
  • 可以通过 SocketChannelconfigureBlocking(false) 方法将从通道中读取数据设置为非阻塞的。若此时通道中没有数据可读,read会返回-1

(1)服务端

public class Server {
    public static void main(String[] args) {
        // 创建缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(16);
        // 获得服务器通道
        try(ServerSocketChannel server = ServerSocketChannel.open()) {
            // 为服务器通道绑定端口
            server.bind(new InetSocketAddress(8080));
            // 用户存放连接的集合
            ArrayList<SocketChannel> channels = new ArrayList<>();
            // 循环接收连接
            while (true) {
                // 设置为非阻塞模式,没有连接时返回null,不会阻塞线程
                server.configureBlocking(false);
                SocketChannel socketChannel = server.accept();
                // 通道不为空时才将连接放入到集合中
                if (socketChannel != null) {
                    System.out.println("after connecting...");
                    channels.add(socketChannel);
                }
                // 循环遍历集合中的连接
                for(SocketChannel channel : channels) {
                    // 处理通道中的数据
                    // 设置为非阻塞模式,若通道中没有数据,会返回0,不会阻塞线程
                    channel.configureBlocking(false);
                    int read = channel.read(buffer);
                    if(read > 0) {
                        buffer.flip();
                        ByteBufferUtil.debugRead(buffer);
                        buffer.clear();
                        System.out.println("after reading");
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

(2)客户端

同阻塞模式

(3)结论

这样写虽然解决了阻塞模式下并发量的问题,但仍存在一个问题,因为设置为了非阻塞,会一直执行 while(true) 中的代码,CPU一直处于忙碌状态,会使得性能变低,所以实际情况中不使用这种方法处理请求

六、selector(选择器)

1、基本介绍

  1. JavaNIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到 Selector(选择器)。
  2. Selector 能够检测多个注册的通道上是否有事件发生(注意:多个 Channel 以事件的方式可以注册到同一个 Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
  3. 只有在连接/通道真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程。
  4. 避免了多线程之间的上下文切换导致的开销。

好处

  • 一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功(CPU 空转 - 死循环)
  • 让这个线程能够被充分利用
  • 节约了线程的数量
  • 减少了线程上下文切换导致的开销。
image-20211118204059106

说明如下:

  1. NettyIO 线程 NioEventLoop 聚合了 Selector(选择器,也叫多路复用器),可以同时并发处理成百上千个客户端连接。
  2. 当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。
  3. 线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。
  4. 由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
  5. 一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。

多路复用

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用

  • 多路复用仅针对网络 IO,普通文件 IO 无法利用多路复用
  • 如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证
    • 有可连接事件时才去连接
    • 有可读事件才去读取
    • 有可写事件才去写入
      • 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件

2、组件

【1】SelectionKey

SelectionKey,表示 Selector和网络通道的注册关系,共四种:

  • int OP_ACCEPT:有新的网络连接可以 accept,值为 16
  • int OP_CONNECT:代表连接已经建立,值为 8
  • int OP_READ:代表读操作,值为 1
  • int OP_WRITE:代表写操作,值为 4

源码中:

public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;

SelectionKey 相关方法

image-20211121213744817

【2】ServerSocketChannel

  1. ServerSocketChannel 在服务器端监听新的客户端 Socket 连接
  2. 相关方法如下

image-20211121213828847

【3】SocketChannel

  1. SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区。
  2. 相关方法如下

image-20211121213903476

3、Selector API

// 1. 创建 selector, 管理多个 channel
Selector selector = Selector.open();

// 2. 建立 selector 和 channel 的联系(注册)
// SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件
SelectionKey sscKey = ssc.register(selector, 0, null);

// 3. key 只关注 accept 事件,指定key只关注的事件,
// OP_ACCEPT、OP_CONNECT、OP_WRITE、OP_READ
sscKey.interestOps(SelectionKey.OP_ACCEPT);

// 4. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
// select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理
selector.select();

// 5. 处理事件, selectedKeys 内部包含了所有发生的事件
// selector.selectedKeys(): 获取所有可用的事件集
Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 

【1】创建

Selector selector = Selector.open();

【2】绑定 Channel 事件

也称之为注册事件,绑定的事件 selector 才会关心

channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 绑定事件);
  • channel 必须工作在非阻塞模式
  • FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用
  • 绑定的事件类型可以有
    • connect - 客户端连接成功时触发
    • accept - 服务器端成功接受连接时触发
    • read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
    • write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况

【3】监听 Channel 事件

可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件

方法1,阻塞直到绑定事件发生

int count = selector.select();

方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)

int count = selector.select(long timeout);

方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件

int count = selector.selectNow();

【4】select 何时不阻塞

  • 事件发生时
    • 客户端发起连接请求,会触发 accept 事件
    • 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
    • channel 可写,会触发 write 事件
    • 在 linux 下 nio bug 发生时
  • 调用 selector.wakeup()
  • 调用 selector.close()
  • selector 所在线程 interrupt

4、服务端 - 可读

服务端代码

public static void main(String[] args) {
    // 获得服务器通道
    try(ServerSocketChannel server = ServerSocketChannel.open()){
        // 绑定端口
        server.bind(new InetSocketAddress(8080));
        // 创建选择器
        Selector selector = Selector.open();
        // 设置为非阻塞模式
        server.configureBlocking(false);
        // 将通道注册到选择器中,并设置事件
        SelectionKey serverKey = server.register(selector, SelectionKey.OP_ACCEPT);

        while(true){
            // 事件监听,若没有事件准备就绪,线程会被阻塞,反之不会被阻塞,从而避免CPU空转
            // 返回值为事件个数
            int ready = selector.select();
            System.out.println("准备就绪的事件个数:" + ready);
            // 获取所有事件,SelectionKey:表示SelectableChannel在 Selector 中的注册的标记
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            // 使用迭代器遍历对象,因为会涉及元素的删除,所以要用迭代器
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                // 判断 key 的类型
                if (key.isAcceptable()){
                    // 连接事件
                    // 获得 key 对应的 channel
                    ServerSocketChannel channel = (ServerSocketChannel)key.channel();
                    System.out.println("before accepting ... ");
                    // 获取连接
                    SocketChannel socketChannel = channel.accept();
                    System.out.println("after accepting ...");
                    // 设置为非阻塞模式,同时将连接的通道也注册到选择器中,同时设置附件 为buffer
                    socketChannel.configureBlocking(false);
                    ByteBuffer buffer = ByteBuffer.allocate(16);
                    socketChannel.register(selector,SelectionKey.OP_READ,buffer);
                    // 处理完毕后移除
                    iterator.remove();
                }else if (key.isReadable()){
                    try{
                        // 可读事件
                        SocketChannel channel = (SocketChannel)key.channel();
                        System.out.println("before read ... ");
                        // 通过 key 获得附件(buffer)
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        // 读取客户端输入
                        int read = channel.read(buffer);
                        buffer.flip();
                        System.out.println(read);
                        ByteBufferUtil.debugAll(buffer);
                        System.out.println(StandardCharsets.UTF_8.decode(buffer).toString());
                        if (read == -1){
                            key.cancel();
                            channel.close();
                        }else{
                            // 通过分隔符来分隔buffer中的数据
                            split(buffer);
                            // 如果缓冲区太小,就进行扩容
                            if (buffer.position() == buffer.limit()){
                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                                // 将旧的buffer的内容放到新的buffer中
                                buffer.flip();
                                newBuffer.put(buffer);
                                // 将新的buffer放到key中作为附件
                                key.attach(newBuffer);
                            }
                        }
                        System.out.println("after reading...");
                        // 处理完毕 移除
                        iterator.remove();
                    }catch (IOException e){
                        e.printStackTrace();
                        // 客户端异常断开的情况下,移除key
                        key.cancel();
                    }

                }
            }
        }
    }catch (IOException e){
        e.printStackTrace();
    }
}

private static void split(ByteBuffer buffer) {
    buffer.flip();
    for(int i = 0; i < buffer.limit(); i++) {
        // 遍历寻找分隔符
        // get(i)不会移动position
        if (buffer.get(i) == '\n') {
            // 缓冲区长度
            int length = i+1-buffer.position();
            ByteBuffer target = ByteBuffer.allocate(length);
            // 将前面的内容写入target缓冲区
            for(int j = 0; j < length; j++) {
                // 将buffer中的数据写入target中
                target.put(buffer.get());
            }
            // 打印结果
            ByteBufferUtil.debugAll(target);
        }
    }
    // 切换为写模式,但是缓冲区可能未读完,这里需要使用compact
    buffer.compact();
}

对于上面的代码将根据事件拆分进行分析

客户端

public static void main(String[] args) throws InterruptedException {
    try (SocketChannel socketChannel = SocketChannel.open()) {
        // 建立连接
        socketChannel.connect(new InetSocketAddress("localhost", 8080));
        System.out.println("waiting...");

        // 写入数据
        socketChannel.write(Charset.defaultCharset().encode("1234567890"));

    } catch (IOException e) {
        e.printStackTrace();
    }
}

【1】Accpet事件

步骤解析

  • 获得选择器Selector
Selector selector = Selector.open();
  • 将通道设置为非阻塞模式,并注册到选择器中,并设置感兴趣的事件
    • channel 必须工作在非阻塞模式
    • FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用
    • 绑定的事件类型,可以有
      • connect - 客户端连接成功时触发
      • accept - 服务器端成功接受连接时触发
      • read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
      • write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况
// 通道必须设置为非阻塞模式
server.configureBlocking(false);
// 将通道注册到选择器中,并设置感兴趣的实践
server.register(selector, SelectionKey.OP_ACCEPT);
  • 通过Selector监听事件,并获得就绪的通道个数,若没有通道就绪,线程会被阻塞

    • 阻塞直到绑定事件发生

      int count = selector.select();
    • 阻塞直到绑定事件发生,或是超时(时间单位为 ms)

      int count = selector.select(long timeout);
    • 不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件

      int count = selector.selectNow();
  • 获取就绪事件并得到对应的通道,然后进行处理

// 获取所有事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();

// 使用迭代器遍历事件
Iterator<SelectionKey> iterator = selectionKeys.iterator();

while (iterator.hasNext()) {
    SelectionKey key = iterator.next();

    // 判断key的类型,此处为Accept类型
    if(key.isAcceptable()) {
        // 获得key对应的channel
        ServerSocketChannel channel = (ServerSocketChannel) key.channel();

        // 获取连接并处理,而且是必须处理,否则需要取消
        SocketChannel socketChannel = channel.accept();

        // 处理完毕后移除
        iterator.remove();
    }
}

事件发生后能否不处理

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发

【2】Read事件

  • 在Accept事件中,若有客户端与服务器端建立了连接,需要将其对应的SocketChannel设置为非阻塞,并注册到选择其中
  • 添加Read事件,触发后进行读取操作
// 通道必须设置为非阻塞模式
server.configureBlocking(false);
// 将通道注册到选择器中,并设置感兴趣的事件
server.register(selector, SelectionKey.OP_ACCEPT);

【3】删除事件

当处理完一个事件后,一定要调用迭代器的remove方法移除对应事件,否则会出现错误。原因如下

以我们上面的 Read事件 的代码为例

  • 当调用了 server.register(selector, SelectionKey.OP_ACCEPT) 后,Selector中维护了一个集合,用于存放SelectionKey以及其对应的通道
// WindowsSelectorImpl 中的 SelectionKeyImpl数组
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[8];
public class SelectionKeyImpl extends AbstractSelectionKey {
    // Key对应的通道
    final SelChImpl channel;
    ...
}

当选择器中的通道对应的事件发生后,selecionKey 会被放到另一个集合中,但是selecionKey不会自动移除,所以需要我们在处理完一个事件后,通过迭代器手动移除其中的selecionKey。否则会导致已被处理过的事件再次被处理,就会引发错误.

image-20211118215909188
💡 为何要 iter.remove()

因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

  • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
  • 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常
💡 cancel 的作用

cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件

【4】断开处理

当客户端与服务器之间的连接断开时,会给服务器端发送一个读事件,对异常断开和正常断开需要加以不同的方式进行处理

  • 正常断开
    • 正常断开时,服务器端的channel.read(buffer)方法的返回值为-1,所以当结束到返回值为-1时,需要调用key的cancel方法取消此事件,并在取消后移除该事件
int read = channel.read(buffer);
// 断开连接时,客户端会向服务器发送一个写事件,此时read的返回值为-1
if(read == -1) {
    // 取消该事件的处理
    key.cancel();
    channel.close();
} else {
    ...
}
// 取消或者处理,都需要移除key
iterator.remove();
  • 异常断开
    • 异常断开时,会抛出IOException异常, 在try-catch的catch块中捕获异常并调用key的cancel方法即可

【5】消息边界

不处理消息边界存在的问题

将缓冲区的大小设置为4个字节,发送2个汉字(你好),通过decode解码并打印时,会出现乱码

ByteBuffer buffer = ByteBuffer.allocate(4);
// 解码并打印
System.out.println(StandardCharsets.UTF_8.decode(buffer));

输出

你�
��

这是因为UTF-8字符集下,1个汉字占用3个字节,此时缓冲区大小为4个字节,一次读时间无法处理完通道中的所有数据,所以一共会触发两次读事件。这就导致 你好 字被拆分为了前半部分和后半部分发送,解码时就会出现问题。

传输的文本可能有以下三种情况

  • 文本大于缓冲区大小
    • 此时需要将缓冲区进行扩容
  • 发生半包现象
  • 发生粘包现象

img

解决思路大致有以下三种

  • 固定消息长度,数据包大小一样,服务器按预定长度读取,当发送的数据较少时,需要将数据进行填充,直到长度与消息规定长度一致。缺点是浪费带宽

  • 另一种思路是按分隔符拆分,缺点是效率低,需要一个一个字符地去匹配分隔符

  • TLV 格式,即 Type 类型、Length 长度、Value 数据(用的最多,Netty部分再介绍)

    (也就是在消息开头用一些空间存放后面数据的长度),如HTTP请求头中的Content-Type与Content-Length。类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量

    • Http 1.1 是 TLV 格式
    • Http 2.0 是 LTV 格式
    img

下文的消息边界处理方式为第二种:按分隔符拆分

附件与扩容

Channel的register方法还有第三个参数附件,可以向其中放入一个Object类型的对象,该对象会与登记的Channel以及其对应的SelectionKey绑定,可以从SelectionKey获取到对应通道的附件

public final SelectionKey register(Selector sel, int ops, Object att)

可通过SelectionKeyattachment()方法获得附件

ByteBuffer buffer = (ByteBuffer) key.attachment();Copy

可通过SelectionKeyattach() 设置新的附件,替换旧附件

scKey.attach(ByteBuffer byteBuffer);

我们需要在Accept事件发生后,将通道注册到Selector中时,对每个通道添加一个ByteBuffer附件,让每个通道发生读事件时都使用自己的通道,避免与其他通道发生冲突而导致问题

// 设置为非阻塞模式,同时将连接的通道也注册到选择其中,同时设置附件
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16);
// 添加通道对应的Buffer附件
socketChannel.register(selector, SelectionKey.OP_READ, buffer);

当Channel中的数据大于缓冲区时,需要对缓冲区进行扩容操作。此代码中的扩容的判定方法:Channel调用compact方法后,的position与limit相等,说明缓冲区中的数据并未被读取(容量太小),此时创建新的缓冲区,其大小扩大为两倍。同时还要将旧缓冲区中的数据拷贝到新的缓冲区中,同时调用SelectionKey的attach方法将新的缓冲区作为新的附件放入SelectionKey中

// 如果缓冲区太小,就进行扩容
if (buffer.position() == buffer.limit()) {
    ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);
    // 将旧buffer中的内容放入新的buffer中
    ewBuffer.put(buffer);
    // 将新buffer作为附件放到key中
    key.attach(newBuffer);
}

【6】ByteBuffer大小

  • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer
  • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer
  • 分配思路可以参考
    • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能
    • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

5、服务端 - 可写事件

服务器通过Buffer向通道中写入数据时,可能因为通道容量小于Buffer中的数据大小,导致无法一次性将Buffer中的数据全部写入到Channel中,这时便需要分多次写入,具体步骤如下

  • 执行一次写操作,向将buffer中的内容写入到SocketChannel中,然后判断Buffer中是否还有数据

  • 若Buffer中还有数据,则需要将SockerChannel注册到Seletor中,并关注写事件,同时将未写完的Buffer作为附件一起放入到SelectionKey中

     int write = socket.write(buffer);
    // 通道中可能无法放入缓冲区中的所有数据
    if (buffer.hasRemaining()) {
        // 注册到Selector中,关注可写事件,并将buffer添加到key的附件中
        socket.configureBlocking(false);
        socket.register(selector, SelectionKey.OP_WRITE, buffer);
    }Copy
  • 添加写事件的相关操作key.isWritable(),对Buffer再次进行写操作

    • 每次写后需要判断Buffer中是否还有数据(是否写完)。若写完,需要移除SelecionKey中的Buffer附件,避免其占用过多内存,同时还需移除对写事件的关注
    SocketChannel socket = (SocketChannel) key.channel();
    // 获得buffer
    ByteBuffer buffer = (ByteBuffer) key.attachment();
    // 执行写操作
    int write = socket.write(buffer);
    System.out.println(write);
    // 如果已经完成了写操作,需要移除key中的附件,同时不再对写事件感兴趣
    if (!buffer.hasRemaining()) {
        key.attach(null);
        key.interestOps(0);
    }Copy

服务端整体代码如下

public class WriteServer {
    public static void main(String[] args) {
        try(ServerSocketChannel server = ServerSocketChannel.open()) {
            server.bind(new InetSocketAddress(8080));
            server.configureBlocking(false);
            Selector selector = Selector.open();
            server.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    // 处理后就移除事件
                    iterator.remove();
                    if (key.isAcceptable()) {
                        // 获得客户端的通道
                        SocketChannel socket = server.accept();
                        // 写入数据
                        StringBuilder builder = new StringBuilder();
                        for(int i = 0; i < 500000000; i++) {
                            builder.append("a");
                        }
                        ByteBuffer buffer = StandardCharsets.UTF_8.encode(builder.toString());
                        // 先执行一次Buffer->Channel的写入,如果未写完,就添加一个可写事件
                        int write = socket.write(buffer);
                        System.out.println(write);
                        // 通道中可能无法放入缓冲区中的所有数据
                        if (buffer.hasRemaining()) {
                            // 注册到Selector中,关注可写事件,并将buffer添加到key的附件中
                            socket.configureBlocking(false);
                            socket.register(selector, SelectionKey.OP_WRITE, buffer);
                        }
                    } else if (key.isWritable()) {
                        SocketChannel socket = (SocketChannel) key.channel();
                        // 获得buffer
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        // 执行写操作
                        int write = socket.write(buffer);
                        System.out.println(write);
                        // 如果已经完成了写操作,需要移除key中的附件,同时不再对写事件感兴趣
                        if (!buffer.hasRemaining()) {
                            key.attach(null);
                            key.interestOps(0);
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客户端代码

public class WriteClient {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));

        ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
        // 3. 接收数据
        int count = 0;
        while (true) {
            count += sc.read(buffer);
            System.out.println(count);
            buffer.clear();
        }
    }
}

6、组件关系分析

NIO 非阻塞网络编程相关的(SelectorSelectionKeyServerScoketChannelSocketChannel)关系梳理图

image-20211118213240111

  1. 当客户端连接时,会通过 ServerSocketChannel 得到 SocketChannel
  2. Selector 进行监听 select 方法,返回有事件发生的通道的个数。
  3. socketChannel 注册到 Selector 上,register(Selector sel, int ops),一个 Selector 上可以注册多个 SocketChannel
  4. 注册后返回一个 SelectionKey,会和该 Selector 关联(集合)。
  5. 进一步得到各个 SelectionKey(有事件发生)。
  6. 在通过 SelectionKey 反向获取 SocketChannel,方法 channel()
  7. 可以通过得到的 channel,完成业务处理。

七、多线程下的 NIO

1、基本模型

image-20211120223924402

充分利用多核CPU,分两组选择器

  • 单线程配一个选择器(Boss),专门处理 accept 事件(建立连接)
  • 创建 cpu 核心数的线程(Worker),每个线程配一个选择器,轮流处理 read 事件

2、实现思路

  • 创建一个负责处理Accept事件的Boss线程,与多个负责处理Read事件的Worker线程

  • Boss线程执行的操作

    • 接受并处理Accepet事件,当Accept事件发生后,调用Worker的register(SocketChannel socket)方法,让Worker去处理Read事件,其中需要根据标识robin去判断将任务分配给哪个Worker

      // 创建固定数量的Worker
      Worker[] workers = new Worker[4];
      // 用于负载均衡的原子整数
      AtomicInteger robin = new AtomicInteger(0);
      // 负载均衡,轮询分配Worker
      workers[robin.getAndIncrement()% workers.length].register(socket);
    • register(SocketChannel socket)方法会通过同步队列完成Boss线程与Worker线程之间的通信,让SocketChannel的注册任务被Worker线程执行。添加任务后需要调用selector.wakeup()来唤醒被阻塞的Selector

      public void register(final SocketChannel socket) throws IOException {
          // 只启动一次
          if (!started) {
             // 初始化操作
          }
          // 向同步队列中添加SocketChannel的注册事件
          // 在Worker线程中执行注册事件
          queue.add(new Runnable() {
              @Override
              public void run() {
                  try {
                      socket.register(selector, SelectionKey.OP_READ);
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }
          });
          // 唤醒被阻塞的Selector
          // select类似LockSupport中的park,wakeup的原理类似LockSupport中的unpark
          selector.wakeup();
      }
  • Worker线程执行的操作

    • 从同步队列中获取注册任务,并处理Read事件

3、如何拿到 cpu 个数

  • Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数
  • 这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启

4、实现代码

public class ThreadsServer {
    public static void main(String[] args) {
        try (ServerSocketChannel server = ServerSocketChannel.open()) {
            // 当前线程为Boss线程
            Thread.currentThread().setName("Boss");
            server.bind(new InetSocketAddress(8080));
            // 负责轮询Accept事件的Selector
            Selector boss = Selector.open();
            server.configureBlocking(false);
            server.register(boss, SelectionKey.OP_ACCEPT);
            // 创建固定数量的Worker
            Worker[] workers = new Worker[4];
            // 用于负载均衡的原子整数
            AtomicInteger robin = new AtomicInteger(0);
            for(int i = 0; i < workers.length; i++) {
                workers[i] = new Worker("worker-"+i);
            }
            while (true) {
                boss.select();
                Set<SelectionKey> selectionKeys = boss.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    // BossSelector负责Accept事件
                    if (key.isAcceptable()) {
                        // 建立连接
                        SocketChannel socket = server.accept();
                        System.out.println("connected...");
                        socket.configureBlocking(false);
                        // socket注册到Worker的Selector中
                        System.out.println("before read...");
                        // 负载均衡,轮询分配Worker
                        workers[robin.getAndIncrement()% workers.length].register(socket);
                        System.out.println("after read...");
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    static class Worker implements Runnable {
        private Thread thread;
        private volatile Selector selector;
        private String name;
        private volatile boolean started = false;
        /**
         * 同步队列,用于Boss线程与Worker线程之间的通信
         */
        private ConcurrentLinkedQueue<Runnable> queue;

        public Worker(String name) {
            this.name = name;
        }

        public void register(final SocketChannel socket) throws IOException {
            // 只启动一次
            if (!started) {
                thread = new Thread(this, name);
                selector = Selector.open();
                queue = new ConcurrentLinkedQueue<>();
                thread.start();
                started = true;
            }

            // 向同步队列中添加SocketChannel的注册事件
            // 在Worker线程中执行注册事件
            queue.add(new Runnable() {
                @Override
                public void run() {
                    try {
                        socket.register(selector, SelectionKey.OP_READ);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
            // 唤醒被阻塞的Selector
            // select类似LockSupport中的park,wakeup的原理类似LockSupport中的unpark
            selector.wakeup();
        }

        @Override
        public void run() {
            while (true) {
                try {
                    selector.select();
                    // 通过同步队列获得任务并运行
                    Runnable task = queue.poll();
                    if (task != null) {
                        // 获得任务,执行注册操作
                        task.run();
                    }
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while(iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        // Worker只负责Read事件
                        if (key.isReadable()) {
                            // 简化处理,省略细节
                            SocketChannel socket = (SocketChannel) key.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            socket.read(buffer);
                            buffer.flip();
                            ByteBufferUtil.debugAll(buffer);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

八、UDP

  • UDP 是无连接的,client 发送数据不会管 server 是否开启
  • server 这边的 receive 方法会将接收到的数据存入 byte buffer,但如果数据报文超过 buffer 大小,多出来的数据会被默默抛弃

首先启动服务器端

public class UdpServer {
    public static void main(String[] args) {
        try (DatagramChannel channel = DatagramChannel.open()) {
            channel.socket().bind(new InetSocketAddress(9999));
            System.out.println("waiting...");
            ByteBuffer buffer = ByteBuffer.allocate(32);
            channel.receive(buffer);
            buffer.flip();
            debug(buffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

输出

waiting...

运行客户端

public class UdpClient {
    public static void main(String[] args) {
        try (DatagramChannel channel = DatagramChannel.open()) {
            ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");
            InetSocketAddress address = new InetSocketAddress("localhost", 9999);
            channel.send(buffer, address);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

接下来服务器端输出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f                                  |hello           |
+--------+-------------------------------------------------+----------------+

九、NIO vs BIO

1、stream vs channel

  • stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
  • stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用
  • 二者均为全双工,即读写可以同时进行

2、IO 模型

同步阻塞、同步非阻塞、同步多路复用、异步阻塞(没有此情况)、异步非阻塞

  • 同步:线程自己去获取结果(一个线程)
  • 异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)

当调用一次 channel.read 或 stream.read 后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

  • 等待数据阶段
  • 复制数据阶段

根据UNIX 网络编程 - 卷 I,IO模型主要有以下几种

(1)阻塞IO - 同步

  • 用户线程进行read操作时,需要等待操作系统执行实际的read操作,此期间用户线程是被阻塞的,无法执行其他操作

(2)非阻塞IO - 同步

  • 用户线程

    在一个循环中一直调用read方法

    ,若内核空间中还没有数据可读,立即返回

    • 只是在等待阶段非阻塞
  • 用户线程发现内核空间中有数据后,等待内核空间执行复制数据,待复制结束后返回结果

(3)多路复用 - 同步

Java中通过Selector实现多路复用

  • 当没有事件是,调用select方法会被阻塞住
  • 一旦有一个或多个事件发生后,就会处理对应的事件,从而实现多路复用

多路复用与阻塞IO的区别

  • 阻塞IO模式下,若线程因accept事件被阻塞,发生read事件后,仍需等待accept事件执行完成后,才能去处理read事件
  • 多路复用模式下,一个事件发生后,若另一个事件处于阻塞状态,不会影响该事件的执行

(4)异步IO

  • 线程1调用方法后理解返回,不会被阻塞也不需要立即获取结果
  • 当方法的运行结果出来以后,由线程2将结果返回给线程1

十、零拷贝

零拷贝指的是数据无需拷贝到 JVM 内存中,同时具有以下三个优点

  • 更少的用户态与内核态的切换
  • 不利用 cpu 计算,减少 cpu 缓存伪共享
  • 零拷贝适合小文件传输

1、传统 IO 问题

传统的 IO 将一个文件通过 socket 写出

File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");

byte[] buf = new byte[(int)f.length()];
file.read(buf);

Socket socket = ...;
socket.getOutputStream().write(buf);

内部工作流如下

img

  • Java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 Java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 CPU

    DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO

  • 内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 CPU 会参与拷贝,无法利用 DMA

  • 调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,CPU 会参与拷贝

  • 接下来要向网卡写数据,这项能力 Java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU

可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

  • 用户态与内核态的切换发生了 3 次,这个操作比较重量级
  • 数据拷贝了共 4 次

2、NIO 优化

通过 DirectByteBuf

  • ByteBuffer.allocate(10)

    • 底层对应 HeapByteBuffer,使用的还是 Java 内存
  • ByteBuffer.

    allocateDirect

    (10)

    • 底层对应DirectByteBuffer,使用的是操作系统内存

img

大部分步骤与优化前相同,唯有一点:Java 可以使用 DirectByteBuffer 将堆外内存映射到 JVM 内存中来直接访问使用

  • 这块内存不受 JVM 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
  • Java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
    • DirectByteBuffer 对象被垃圾回收,将虚引用加入引用队列
      • 当引用的对象ByteBuffer被垃圾回收以后,虚引用对象Cleaner就会被放入引用队列中,然后调用Cleaner的clean方法来释放直接内存
      • DirectByteBuffer 的释放底层调用的是 Unsafe 的 freeMemory 方法
    • 通过专门线程访问引用队列,根据虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

3、进一步优化1

以下两种方式都是零拷贝,即无需将数据拷贝到用户缓冲区中(JVM内存中)

底层采用了 linux 2.1 后提供的 sendFile 方法,Java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据

img

  • Java 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 CPU
  • 数据从内核缓冲区传输到 socket 缓冲区,CPU 会参与拷贝
  • 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU

这种方法下

  • 只发生了1次用户态与内核态的切换
  • 数据拷贝了 3 次

4、进一步优化2

linux 2.4 对上述方法再次进行了优化

img

  • Java 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 CPU
  • 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  • 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 CPU

整个过程仅只发生了1次用户态与内核态的切换,数据拷贝了 2 次

十一、AIO

AIO 用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

  • Windows 系统通过 IOCP 实现了真正的异步 IO
  • Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势

1、文件 AIO

@Slf4j
public class AioFileChannel {
    public static void main(String[] args) throws IOException {
        try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ)) {
            // 参数1 ByteBuffer
            // 参数2 读取的起始位置
            // 参数3 附件
            // 参数4 回调对象 CompletionHandler
            ByteBuffer buffer = ByteBuffer.allocate(16);
            log.debug("read begin...");
            channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                @Override // read 成功
                public void completed(Integer result, ByteBuffer attachment) {
                    log.debug("read completed...{}", result);
                    attachment.flip();
                    debugAll(attachment);
                }
                @Override // read 失败
                public void failed(Throwable exc, ByteBuffer attachment) {
                    exc.printStackTrace();
                }
            });
            log.debug("read end...");
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.in.read();
    }
}

默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read() 以避免守护线程意外结束

2、网络 AIO

public class AioServer {
    public static void main(String[] args) throws IOException {
        AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8080));
        ssc.accept(null, new AcceptHandler(ssc));
        System.in.read();
    }

    private static void closeChannel(AsynchronousSocketChannel sc) {
        try {
            System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
            sc.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
        private final AsynchronousSocketChannel sc;

        public ReadHandler(AsynchronousSocketChannel sc) {
            this.sc = sc;
        }

        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            try {
                if (result == -1) {
                    closeChannel(sc);
                    return;
                }
                System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
                attachment.flip();
                System.out.println(Charset.defaultCharset().decode(attachment));
                attachment.clear();
                // 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
                sc.read(attachment, attachment, this);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            closeChannel(sc);
            exc.printStackTrace();
        }
    }

    private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
        private final AsynchronousSocketChannel sc;

        private WriteHandler(AsynchronousSocketChannel sc) {
            this.sc = sc;
        }

        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            // 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
            if (attachment.hasRemaining()) {
                sc.write(attachment);
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            exc.printStackTrace();
            closeChannel(sc);
        }
    }

    private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
        private final AsynchronousServerSocketChannel ssc;

        public AcceptHandler(AsynchronousServerSocketChannel ssc) {
            this.ssc = ssc;
        }

        @Override
        public void completed(AsynchronousSocketChannel sc, Object attachment) {
            try {
                System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
            } catch (IOException e) {
                e.printStackTrace();
            }
            ByteBuffer buffer = ByteBuffer.allocate(16);
            // 读事件由 ReadHandler 处理
            sc.read(buffer, buffer, new ReadHandler(sc));
            // 写事件由 WriteHandler 处理
            sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
            // 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
            ssc.accept(null, this);
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            exc.printStackTrace();
        }
    }
}

十二、3种IO使用场景

  1. BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4 以前的唯一选择,但程序简单易理解。
  2. NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4 开始支持。
  3. AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。

  目录