Netty-基础


一、Netty简介

1、Netty 是什么

  • 异步时间驱动框架,用于快速开发高性能服务端和客户端
  • 封装了 JDK 底层的 BIO 和 NIO 模型,提供了高度可用的 API
  • 自带编解码器解决拆包粘包问题,用户只用关心业务逻辑
  • Reactor 线程模型支持高并发海量连接
  • 自带各种协议栈

2、Netty 的应用

  • Dubbo
  • RocketMQ
  • Spark
  • Elasticsearch
  • Flink
  • Spring5

二、线程模型

  1. 不同的线程模式,对程序的性能有很大影响,为了搞清 Netty 线程模式,我们来系统的讲解下各个线程模式,最后看看 Netty 线程模型有什么优越性。
  2. 目前存在的线程模型有:传统阻塞 I/O 服务模型、 Reactor 模式
  3. 根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现单 Reactor 单线程;单 Reactor多线程;主从 Reactor多线程
  4. Netty 线程模式(Netty 主要基于主从 Reactor 多线程模型做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor

1、传统阻塞I/O服务模型

(1)工作原理

  1. 黄色的框表示对象,蓝色的框表示线程
  2. 白色的框表示方法(API
img

(2)模型特点

  1. 采用阻塞 IO 模式获取输入的数据
  2. 每个连接都需要独立的线程完成数据的输入,业务处理,数据返回

(3)缺点

  1. 当并发数很大,就会创建大量的线程,占用很大系统资源
  2. 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 read 操作,造成线程资源浪费

2、Reactor 模式

(1)解决传统模型缺点

  1. 基于 I/O 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理 Reactor 对应的叫法:
    1. 反应器模式
    2. 分发者模式(Dispatcher)
    3. 通知者模式(notifier)
  2. 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。

(2)模型优点

  1. 响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的
  2. 可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销
  3. 扩展性好,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源
  4. 复用性好,Reactor 模型本身与具体事件处理逻辑无关,具有很高的复用性

(3)工作原理图

img

对上图说明:

  1. Reactor 模式,通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动)
  2. 服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程,因此 Reactor 模式也叫 Dispatcher 模式
  3. Reactor 模式使用 IO 复用监听事件,收到事件后,分发给某个线程(进程),这点就是网络服务器高并发处理关键

(4)核心组件

  1. ReactorReactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO 事件做出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人;
  2. Handlers:处理程序执行 I/O 事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作。

(5)三种Reactor模式

根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现

  1. Reactor 单线程
  2. Reactor 多线程
  3. 主从 Reactor 多线程

3、单Reactor单线程

(1)原理

img

  1. Select 是前面 I/O 复用模型介绍的标准网络编程 API,可以实现应用程序通过一个阻塞对象监听多路连接请求
  2. Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发
  3. 如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接完成后的后续业务处理
  4. 如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响应
  5. Handler 会完成 Read → 业务处理 → Send 的完整业务流程

结合实例:服务器端用一个线程通过多路复用搞定所有的 IO 操作(包括连接,读、写等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑

(2)方案优缺点

  1. 优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
  2. 缺点:性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈
  3. 缺点:可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
  4. 使用场景:客户端的数量有限,业务处理非常快速,比如 Redis 在业务处理的时间复杂度 O(1) 的情况

4、单Reactor多线程

(1)原理

img

  1. Reactor 对象通过 Select 监控客户端请求事件,收到事件后,通过 Dispatch 进行分发
  2. 如果建立连接请求,则右 Acceptor 通过 accept 处理连接请求,然后创建一个 Handler 对象处理完成连接后的各种事件
  3. 如果不是连接请求,则由 Reactor 分发调用连接对应的 handler 来处理
  4. handler 只负责响应事件,不做具体的业务处理,通过 read 读取数据后,会分发给后面的 worker 线程池的某个线程处理业务
  5. worker 线程池会分配独立线程完成真正的业务,并将结果返回给 handler
  6. handler 收到响应后,通过 send 将结果返回给 client

(2)优缺点

  1. 优点:可以充分的利用多核 cpu 的处理能力
  2. 缺点:多线程数据共享和访问比较复杂,Reactor 处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈。

5、主从Reactor多线程

(1)运行原理

针对单 Reactor 多线程模型中,Reactor 在单线程中运行,高并发场景下容易成为性能瓶颈,可以让 Reactor 在多线程中运行

img

  1. Reactor 主线程 MainReactor 对象通过 select 监听连接事件,收到事件后,通过 Acceptor 处理连接事件
  2. Acceptor 处理连接事件后,MainReactor 将连接分配给 SubReactor
  3. subreactor 将连接加入到连接队列进行监听,并创建 handler 进行各种事件处理
  4. 当有新事件发生时,subreactor 就会调用对应的 handler 处理
  5. handler 通过 read 读取数据,分发给后面的 worker 线程处理
  6. worker 线程池分配独立的 worker 线程进行业务处理,并返回结果
  7. handler 收到响应的结果后,再通过 send 将结果返回给 client
  8. Reactor 主线程可以对应多个 Reactor 子线程,即 MainRecator 可以关联多个 SubReactor

(2)优缺点

  1. 优点:父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。
  2. 优点:父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据。
  3. 缺点:编程复杂度较高
  4. 结合实例:这种模型在许多项目中广泛使用,包括 Nginx 主从 Reactor 多进程模型,Memcached 主从多线程,Netty 主从多线程模型的支持

6、Netty模型

(1)工作原理

Netty 主要基于主从 Reactors 多线程模型(如图)做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor

简单流程

img

  1. BossGroup 线程维护 Selector,只关注 Accecpt
  2. 当接收到 Accept 事件,获取到对应的 SocketChannel,封装成 NIOScoketChannel 并注册到 Worker 线程(事件循环),并进行维护
  3. Worker 线程监听到 Selector 中通道发生自己感兴趣的事件后,就进行处理(就由 handler),注意 handler 已经加入到通道

详细流程

img

  1. Netty 抽象出两组线程池 BossGroup 专门负责接收客户端的连接,WorkerGroup 专门负责网络的读写
  2. BossGroupWorkerGroup 类型都是 NioEventLoopGroup
  3. NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是 NioEventLoop
  4. NioEventLoop 表示一个不断循环的执行处理任务的线程,每个 NioEventLoop 都有一个 Selector,用于监听绑定在其上的 socket 的网络通讯
  5. NioEventLoopGroup 可以有多个线程,即可以含有多个 NioEventLoop
  6. 每个BossNioEventLoop循环执行的步骤有3步
    • 轮询 accept 事件
    • 处理 accept 事件,与 client 建立连接,生成 NioScocketChannel,并将其注册到某个 worker NIOEventLoop 上的 Selector
    • 处理任务队列的任务,即 runAllTasks
  7. 每个Worker NIOEventLoop 循环执行的步骤
    • 轮询 readwrite 事件
    • 处理 I/O 事件,即 readwrite 事件,在对应 NioScocketChannel 处理
    • 处理任务队列的任务,即 runAllTasks
  8. 每个 Worker NIOEventLoop 处理业务时,会使用 pipeline(管道),pipeline 中包含了 channel,即通过 pipeline 可以获取到对应通道,管道中维护了很多的处理器

7、异步模型

(1)基本介绍

  1. 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
  2. Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture
  3. 调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。
  4. Netty 的异步模型是建立在 futurecallback 的之上的。callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun 返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future 去监控方法 fun 的处理过程(即:Future-Listener 机制)

(2)Future说明

  1. 表示异步的执行结果,可以通过它提供的方法来检测执行是否完成,比如检索计算等等。
  2. ChannelFuture 是一个接口:public interface ChannelFuture extends Future<Void> 我们可以添加监听器,当监听的事件发生时,就会通知到监听器。

(3)Future-Listener 机制

  1. Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作。
  2. 常见有如下操作
    • 通过 isDone 方法来判断当前操作是否完成;
    • 通过 isSuccess 方法来判断已完成的当前操作是否成功;
    • 通过 getCause 方法来获取已完成的当前操作失败的原因;
    • 通过 isCancelled 方法来判断已完成的当前操作是否被取消;
    • 通过 addListener 方法来注册监听器,当操作已完成(isDone方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则通知指定的监听器

举例说明 演示:绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑

//绑定一个端口并且同步,生成了一个ChannelFuture对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//给cf注册监听器,监控我们关心的事件
cf.addListener(new ChannelFutureListener() {
   @Override
   public void operationComplete (ChannelFuture future) throws Exception {
      if (cf.isSuccess()) {
         System.out.println("监听端口6668成功");
      } else {
         System.out.println("监听端口6668失败");
      }
   }
});

三、Hello World

1、依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.39.Final</version>
</dependency>

2、服务端(简易版)

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LoggingHandler;

public class HelloServer {
    public static void main(String[] args) {
        // 1. 启动器,负责组装 netty 组件,启动服务器
        new ServerBootstrap()
            // 2. group 组,包含线程和选择器
            .group(new NioEventLoopGroup())
            // 3. 选择 服务器的 ServerSocketChannel 实现
            .channel(NioServerSocketChannel.class) // OIO BIO
            // 4. boss 负责处理连接 worker(child) 负责处理读写,
            // 决定了 worker(child) 能执行哪些操作(handler)
            .childHandler(
                // 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,
                //负责添加别的 handler
                //ChannelInitializer 在程序启动时就新建,只有一个
                new ChannelInitializer<NioSocketChannel>() {

                // 每次客户端连接建立后都会执行    
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    // 6. 添加具体 handler,多个 handler 顺序执行
                    // 此处仅用于添加处理器,处理器实际执行是在指定事件发生时执行
                    // pipeline()流水线(处理器执行流水线),addLast()添加到最后,
                    ch.pipeline().addLast(new LoggingHandler());
                    // 将 ByteBuf 转换为字符串 的handler
                    ch.pipeline().addLast(new StringDecoder());
                    // 自定义 handler
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { 
                        @Override // 读事件
                        public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
                             // 打印上一步转换好的字符串
                            System.out.println(msg);
                        }
                    });
                }
            })
            // 7. 绑定监听端口
            .bind(8080);
    }
}

3、客户端

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        // 1. 启动器
        Channel channel = new Bootstrap()
            // 2. 添加 EventLoop
            .group(new NioEventLoopGroup())
            // 3. 选择客户端 channel 实现
            .channel(NioSocketChannel.class)
            // 4. 添加处理器,ChannelInitializer初始化器
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override // 在连接建立后被调用
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    // 添加具体 handler
                    // 把 hello world 转化为 ByteBuf 的 handler
                    ch.pipeline().addLast(new StringEncoder());
                }
            })
            // 5. 连接到服务器
            .connect(new InetSocketAddress("localhost", 8080))
            .sync()    // 阻塞方法,直到连接建立前阻塞
            .channel();    // 获取 channel
            //.writeAndFlush("hello, world");

        // 向服务器发送数据
        channel.writeAndFlush("hello world!!!");
        channel.writeAndFlush("hello world!!!");
    }
}

一定要记住这些格式,都是固定写法,后面全部都是这种格式,不同的就是使用哪些处理器

4、执行流程

image-20211122213355559

  1. Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。
  2. NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 Selector,用于监听绑定在其上的 socket网络通道。
  3. NioEventLoop 内部采用串行化设计,从消息的 读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责NioEventLoopGroup 下包含多个 NioEventLoop
  • 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue
  • 每个 NioEventLoopSelector 上可以注册监听多个 NioChannel
  • 每个 NioChannel 只会绑定在唯一的 NioEventLoop
  • 每个 NioChannel 都绑定有一个自己的 ChannelPipeline

5、组件理解

  • channel 可以理解为数据的通道
  • msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 中的各个 handler 加工,会变成其它类型对象,最后输出又变成 ByteBuf
  • handler 可以理解为数据的处理工序
    • 工序有多道,合在一起就是 pipeline(传递途径),pipeline 负责发布事件(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
      • pipeline 中有多个 handler,处理时会依次调用其中的 handler
    • handler 分 Inbound 和 Outbound 两类
      • Inbound 入站
      • Outbound 出站
  • eventLoop 可以理解为处理数据的工人
    • eventLoop 可以管理多个 channel 的 io 操作,并且一旦 eventLoop 负责了某个 channel,就会将其与channel进行绑定,以后该 channel 中的 io 操作都由该 eventLoop 负责
    • eventLoop 既可以执行 io 操作,也可以进行任务处理,每个 eventLoop 有自己的任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
    • eventLoop 按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每个 handler 指定不同的 eventLoop

6、服务端模板代码

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class WebSocketServer {

    private final ServerBootstrap server;

    // netty 启动端口
    private static final Integer port = 9999;

    public WebSocketServer() {
        //构造函数可传线程数,默认是CPU核心数的2倍
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        server = new ServerBootstrap();

        // 配置为主从 reactor 线程模型
        server.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 10240) // 服务端可连接队列大小
                .option(ChannelOption.SO_REUSEADDR, true) // 参数表示允许重复使用本地地址和端口
                .childOption(ChannelOption.TCP_NODELAY, true) // 是否禁用Nagle算法 简单点说是否批量发送数据 true关闭 false开启。 开启的话可以减少一定的网络开销,但影响消息实时性
                .childOption(ChannelOption.SO_KEEPALIVE, true) // 保活开关2h没有数据服务端会发送心跳包
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        //获取管道(pipeline)
                        ChannelPipeline pipeline = channel.pipeline();

                        // addLast() 方法第一个参数是处理器别名,唯一即可,也可不写
                        // 日志打印所有事件,默认级别DEBUG
                        pipeline.addLast("logging", new LoggingHandler());
                        //webSocket 基于http协议,所需要的http 编解码器
                        //HttpServerCodec是netty针对http编解码的处理类,但是这些只能处理像http get的请求,也就是数据带在url问号后面的http请求
                        pipeline.addLast("http-server", new HttpServerCodec());
                        //在http上有一些数据流产生,有大有小,我们对其进行处理,既然如此,我们需要使用netty 对下数据流写 提供支持
                        pipeline.addLast("chunked-write", new ChunkedWriteHandler());
                        //对httpMessage 进行聚合处理,聚合成request或 response
                        //HttpObjectAggregator这个netty的处理器就是为了处理post请求的body.它把HttpMessage和HttpContent聚合成为一个FullHttpRquest或者FullHttpRsponse
                        pipeline.addLast("aggregator", new HttpObjectAggregator(1024*64));

                        //=================增加心跳支持===================
                        //针对客户端,如果在1分钟时间内没有向服务端发送读写心跳(ALL),则主动断开连接
                        //如果有读空闲和写空闲,则不做任何处理
                        //pipeline.addLast(new IdleStateHandler(60,60,60));


                        //=================WebSocket支持================
                        /**
                         * websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws,此处使用/im
                         * 本handler 会帮你处理一些繁重复杂的事情
                         * 会帮你处理握手动作:handshaking(close、ping、pong) ping+pong = 心跳
                         * 对于websocket 来讲,都是以frams 进行传输的,不同的数据类型对应的frams 也不同
                         */
                        //pipeline.addLast(new WebSocketServerProtocolHandler("/im"));

                        //================自定义的handler=================
                        //pipeline.addLast(chatHandler);
                    }
                });
    }

    public void start() {
        server.bind(port);
    }

    public static void main(String[] args) {
        new WebSocketServer().start();
    }
}

四、组件

1、Bootstrap、ServerBootstrap

  1. Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,NettyBootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。
  2. 常见的方法有
    • public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup),该方法用于服务器端,用来设置两个 EventLoop(主从线程模型)
    • public B group(EventLoopGroup group),该方法用于客户端,用来设置一个 EventLoop
    • public B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道实现
    • public <T> B option(ChannelOption<T> option, T value),用来给 ServerChannel 添加配置
    • public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value),用来给接收到的通道添加配置
    • public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的handler
    • public ChannelFuture bind(int inetPort),该方法用于服务器端,用来设置占用的端口号
    • public ChannelFuture connect(String inetHost, int inetPort),该方法用于客户端,用来连接服务器端

2、EventLoopGroup

事件循环组 EventLoopGroup

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroupregister 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

  • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop
  1. EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
  2. EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroupWorkerEventLoopGroup
  3. 通常一个服务端口即一个 ServerSocketChannel 对应一个 Selector 和一个 EventLoop 线程。BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进行 IO 处理,如下图所示

img

  • 常用方法 public NioEventLoopGroup(),构造方法 public Future<?> shutdownGracefully(),断开连接,关闭线程

3、EventLoop

事件循环对象 EventLoop

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理一个或多个 Channel 上源源不断的 io 事件

它的继承关系如下

  • 继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 继承自 netty 自己的 OrderedEventExecutor,(有序的处理执行器)
    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
    • 提供了 EventLoopGroup parent() 方法来看看自己属于哪个 EventLoopGroup

(1)普通、定时任务

EventLoop :使用execute()方法提交任务无法获取返回值

要获取返回结果要用submit()方法

public static void main(String[] args) {
    // 1、创建事件循环对象,内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
    EventLoopGroup group = new NioEventLoopGroup(2); // io 事件,普通任务,定时任务
    //EventLoopGroup group = new DefaultEventLoop();  // 普通任务,定时任务

    // 2、获取下一个事件对象,因为只有两个线程,所以1和3一样,2和4一样
    System.out.println(group.next());
    System.out.println(group.next());
    System.out.println(group.next());
    System.out.println(group.next());

    // 3、执行普通任务
    group.next().execute( () -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("执行普通任务");
    });

    // 4、执行定时任务
    // 参数: 任务,初始化等待时间(从几秒开始执行),每次循环间隔,时间单位
    // 此处执行间隔为5秒,而不是7秒
    group.next().scheduleAtFixedRate( () -> {

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("执行定时任务");
        System.out.println(new Date());
    },3,5, TimeUnit.SECONDS );

    System.out.println("主线程...");
}

(2)io 任务

public static void main(String[] args) {
    // 细分2:创建一个独立的 EventLoopGroup
    EventLoopGroup group = new DefaultEventLoopGroup();

    new ServerBootstrap()
        // boss 和 worker
        // 细分1:boss 只负责 ServerSocketChannel 上 accept 事件
        // worker 只负责 socketChannel 上的读写
        .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ch.pipeline().addLast("handler1", 
                                      new ChannelInboundHandlerAdapter() {
                    @Override                                         // ByteBuf
                    public void channelRead(ChannelHandlerContext ctx, Object msg) 
                        throws Exception {
                        ByteBuf buf = (ByteBuf) msg;
                        log.debug(buf.toString(Charset.defaultCharset()));
                        ctx.fireChannelRead(msg); // 让消息传递给下一个handler
                        // 细分2:指定其他的用于执行的事件循环组,一般用于耗时较长的业务代码
                        // 第一个参数:指定额外的事件循环组;第二个参数:组名
                    }
                }).addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
                    @Override                                         // ByteBuf
                    public void channelRead(ChannelHandlerContext ctx, Object msg) 
                        throws Exception {
                        ByteBuf buf = (ByteBuf) msg;
                        log.debug(buf.toString(Charset.defaultCharset()));
                    }
                });
            }
        })
        .bind(8080);
}

4、Channel

Channel 的常用方法

  • close() 可以用来关闭Channel
  • pipeline() 方法用于添加处理器
  • write() 方法将数据写入
    • 因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
    • 只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
  • writeAndFlush() 方法将数据写入并立即发送(刷出)

5、ChannelFuture

ChannelFuture() 作用是用来保存Channel异步操作的结果。(原理见异步模型)

  • sync 方法作用是同步等待 Channel 异步操作的结果(由主线程执行结果处理)。
  • addListener 方法是异步获取 Channel 异步操作的结果(由nio线程执行回调函数处理结果)。
  • 优先使用addListener(GenericFutureListener),而非await()sync()
  • 获取方法:通过connect()方法返回
@Slf4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        // 2. 带有 Future,Promise 的类型都是和异步方法配套使用,用来处理结果
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override // 在连接建立后被调用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                // 1. 连接到服务器
                // 异步非阻塞, main 发起了调用,真正执行 connect 是 nio 线程
                .connect(new InetSocketAddress("localhost", 8080)); // 1s 秒后

        // 2.1 使用 sync 方法同步处理结果
        /*channelFuture.sync(); // 阻塞住当前线程,直到nio线程连接建立完毕
        Channel channel = channelFuture.channel();
        log.debug("{}", channel);
        channel.writeAndFlush("hello, world");*/

        // 2.2 使用 addListener(回调对象) 方法异步处理结果
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            // 在 nio 线程连接建立好之后,会调用 operationComplete
            public void operationComplete(ChannelFuture future) throws Exception {
                // 此处的future就是调用addListener()的channelFuture
                Channel channel = future.channel();
                log.debug("{}", channel);
                channel.writeAndFlush("hello, world");
            }
        });
    }
}

6、ClosedFuture

当我们要关闭channel时,可以调用channel.close()方法进行关闭。但是该方法也是一个异步方法。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作

如果我们想在channel真正关闭以后,执行一些额外的操作,可以选择以下两种方法来实现

  • 通过channel.closeFuture()方法获得对应的ChannelFuture对象,然后调用sync()方法阻塞执行操作的线程,等待channel真正关闭后,再执行其他操作(同步处理
  // 获得closeFuture对象
  ChannelFuture closeFuture = channel.closeFuture();

  // 同步等待NIO线程执行完close操作
  closeFuture.sync();
  • 调用closeFuture.addListener方法,添加close的后续操作(异步处理,推荐使用
NioEventLoopGroup group = new NioEventLoopGroup();

closeFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        // 等待channel关闭后才执行的操作
        System.out.println("关闭之后执行一些额外操作...");
        // 关闭EventLoopGroup
        group.shutdownGracefully();
    }
});

优雅地关闭事件循环组

先停止接收新任务,没运行完的任务运行完后再关闭

// 关闭EventLoopGroup
group.shutdownGracefully();

7、Future与Promise

(1)概念

netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口

netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 jdk Future netty Future Promise
cancel 取消任务 - -
isCanceled 任务是否取消 - -
isDone 任务是否完成,不能区分成功失败 - -
get 获取任务结果,阻塞等待 - -
getNow - 获取任务结果,非阻塞,还未产生结果时返回 null -
await - 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 -
sync - 等待任务结束,如果任务失败,抛出异常 -
isSuccess - 判断任务是否成功 -
cause - 获取失败信息,非阻塞,如果没有失败,返回null -
addLinstener - 添加回调,异步接收结果 -
setSuccess - - 设置成功结果
setFailure - - 设置失败结果

(2)JDK Future

public class JdkFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadFactory factory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "JdkFuture");
            }
        };
        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), factory);

        // 获得Future对象
        Future<Integer> future = executor.submit(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                TimeUnit.SECONDS.sleep(1);
                return 50;
            }
        });

        // 通过阻塞的方式,获得运行结果
        System.out.println(future.get());
    }
}

(3)Netty Future

public class NettyFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();

        // 获得 EventLoop 对象
        EventLoop eventLoop = group.next();
        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return 50;
            }
        });

        // 主线程中获取结果
        System.out.println(Thread.currentThread().getName() + " 获取结果");
        System.out.println("getNow " + future.getNow());
        System.out.println("get " + future.get());

        // NIO线程中异步获取结果
        future.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                System.out.println(Thread.currentThread().getName() + " 获取结果");
                System.out.println("getNow " + future.getNow());
            }
        });
    }
}

Netty中的Future对象,可以通过EventLoop的sumbit()方法得到

  • 可以通过Future对象的get方法,阻塞地获取返回结果
  • 也可以通过getNow方法,获取结果,若还没有结果,则返回null,该方法是非阻塞的
  • 还可以通过future.addListener方法,在Callable方法执行的线程中,异步获取返回结果

(4)Netty Promise

Promise相当于一个容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果

public class NettyPromise {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建EventLoop
        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();

        // 创建Promise对象,用于存放结果
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 自定义线程向Promise中存放结果
            promise.setSuccess(50);
        }).start();

        // 主线程从Promise中获取结果
        System.out.println(Thread.currentThread().getName() + " " + promise.get());
    }
}

8、Handler与Pipeline

(1)ChannelPipeline

ChannelPipeline 是一个重点:

  1. ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的链。(也可以这样理解:ChannelPipeline 是保存 ChannelHandlerList,用于处理或拦截 Channel 的入站事件和出站操作)
  2. ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互
  3. Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下

img

  • 一个 Channel包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由ChannelHandlerContext组成的双向链表,并且每个ChannelHandlerContext 中又关联着一个ChannelHandler

  • ChannelPipeline 中的双向链表拥有2个初始节点,head(头)tail(尾)

  • 入站事件和出站事件在一个双向链表中,入站事件会从链表head往后传递到最后一个入站的 handler,出站事件会从链表tail往前传递到最前一个出站的handler,两种类型的 handler互不干扰

  • 入站事件由读操作触发,出站事件由写操作触发

常用方法

  • ChannelPipeline addFirst(ChannelHandler... handlers),把一个业务处理类(handler)添加到链中的第一个位置
  • ChannelPipeline addLast(ChannelHandler... handlers),把一个业务处理类(handler)添加到链中的最后一个位置
public static void main(String[] args) {
    new ServerBootstrap()
        .group(new NioEventLoopGroup())
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                // 在socketChannel的pipeline中添加handler
                // pipeline中handler是带有head与tail节点的双向链表,的实际结构为
                // head <-> handler1 <-> ... <-> handler4 <->tail
                // Inbound主要处理入站操作,一般为读操作,发生入站操作时会触发Inbound方法
                // 入站时,handler是从head向后调用的
                socketChannel.pipeline().addLast("handler1" ,new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        System.out.println(Thread.currentThread().getName() + " Inbound handler 1");
                        // 父类该方法内部会调用fireChannelRead
                        // 将数据传递给下一个handler
                        super.channelRead(ctx, msg);
                    }
                });
                socketChannel.pipeline().addLast("handler2", new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        System.out.println(Thread.currentThread().getName() + " Inbound handler 2");
                        // 执行write操作,使得Outbound的方法能够得到调用
                        socketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("Server...".getBytes(StandardCharsets.UTF_8)));
                        super.channelRead(ctx, msg);
                    }
                });
                // Outbound主要处理出站操作,一般为写操作,发生出站操作时会触发Outbound方法
                // 出站时,handler的调用是从tail向前调用的
                socketChannel.pipeline().addLast("handler3" ,new ChannelOutboundHandlerAdapter(){
                    @Override
                    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                        System.out.println(Thread.currentThread().getName() + " Outbound handler 1");
                        super.write(ctx, msg, promise);
                    }
                });
                socketChannel.pipeline().addLast("handler4" ,new ChannelOutboundHandlerAdapter(){
                    @Override
                    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                        System.out.println(Thread.currentThread().getName() + " Outbound handler 2");
                        super.write(ctx, msg, promise);
                    }
                });
            }
        })
        .bind(8080);
}

通过channel.pipeline().addLast(name, handler)添加handler时,记得给handler取名字。这样可以调用pipeline的addAfter、addBefore等方法更灵活地向pipeline中添加handler

handler需要放入通道的pipeline中,才能根据放入顺序来使用handler

  • pipeline是结构是一个带有head与tail指针的双向链表,其中的节点为handler
    • 要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler
  • 当有入站(Inbound)操作时,会从head开始向后调用handler,直到handler不是处理Inbound操作为止
  • 当有出站(Outbound)操作时,会从tail开始向前调用handler,直到handler不是处理Outbound操作为止

ctx.channel().write(msg) 与 ctx.write(msg)

  • 都是触发出站处理器的执行
  • ctx.channel().write(msg) 从尾部开始查找出站处理器
  • ctx.write(msg) 是从当前节点找上一个出站处理器

socketChannel.writeAndFlush()

当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从tail向前寻找OutboundHandler

img

ctx.writeAndFlush()

当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从当前handler向前寻找OutboundHandler

img

(2)EmbeddedChannel

EmbeddedChannel 可以用于测试各个handler,通过其构造函数按顺序传入需要测试handler,然后调用对应的 InboundOutbound 方法即可

public class TestEmbeddedChannel {
    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("1");
                super.channelRead(ctx, msg);
            }
        };

        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("2");
                super.channelRead(ctx, msg);
            }
        };

        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                System.out.println("3");
                super.write(ctx, msg, promise);
            }
        };

        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                System.out.println("4");
                super.write(ctx, msg, promise);
            }
        };

        // 用于测试Handler的Channel
        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);

        // 执行Inbound操作 
        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
        // 执行Outbound操作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
    }
}

(3)ChannelHandler

  1. ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。
  2. ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类
  3. ChannelHandler 及其实现类一览图(后)

img

  1. 我们经常需要自定义一个 Handler 类去继承 ChannelInboundHandlerAdapter,然后通过重写相应方法实现业务逻辑,我们接下来看看一般都需要重写哪些方法
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
    public ChannelInboundHandlerAdapter() {
    }

    @Skip
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }

    @Skip
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();
    }

    // 通道就绪事件
    @Skip
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

    @Skip
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    // 通道读取数据事件
    @Skip
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

    // 读取数据完毕事件
    @Skip
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    }

    @Skip
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

    @Skip
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelWritabilityChanged();
    }

    // 通道发生异常事件
    @Skip
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

9、ChannelHandlerContext

  1. 保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象
  2. ChannelHandlerContext 中包含一个具体的事件处理器 ChannelHandler,同时 ChannelHandlerContext 中也绑定了对应的 pipelineChannel 的信息,方便对 ChannelHandler 进行调用。
  3. 常用方法
    • ChannelFuture close(),关闭通道
    • ChannelOutboundInvoker flush(),刷新
    • ChannelFuture writeAndFlush(Object msg),将数据写到
    • ChannelPipeline 中当前 ChannelHandler 的下一个 ChannelHandler 开始处理(出站)

10、ChannelOption

Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。ChannelOption 参数如下:

(1)通用参数

CONNECT_TIMEOUT_MILLIS:Netty参数,连接超时毫秒数,默认值30000毫秒即30秒

MAX_MESSAGES_PER_READ:Netty参数,一次Loop读取的最大消息数,对于ServerChannel或者NioByteChannel,默认值为16,其他Channel默认值为1。默认值这样设置,是因为:ServerChannel需要接受足够多的连接,保证大吞吐量,NioByteChannel可以减少不必要的系统调用select。

WRITE_SPIN_COUNT:Netty参数,一个Loop写操作执行的最大次数,默认值为16。也就是说,对于大数据量的写操作至多进行16次,如果16次仍没有全部写完数据,此时会提交一个新的写任务给EventLoop,任务将在下次调度继续执行。这样,其他的写请求才能被响应不会因为单个大数据量写请求而耽误。

ALLOCATOR:Netty参数,ByteBuf的分配器,默认值为ByteBufAllocator.DEFAULT,4.0版本为UnpooledByteBufAllocator,4.1版本为PooledByteBufAllocator。该值也可以使用系统参数io.netty.allocator.type配置,使用字符串值:”unpooled”,”pooled”。

RCVBUF_ALLOCATOR:Netty参数,用于Channel分配接受Buffer的分配器,默认值为AdaptiveRecvByteBufAllocator.DEFAULT,是一个自适应的接受缓冲区分配器,能根据接受到的数据自动调节大小。可选值为FixedRecvByteBufAllocator,固定大小的接受缓冲区分配器。

AUTO_READ:Netty参数,自动读取,默认值为True。Netty只在必要的时候才设置关心相应的I/O事件。对于读操作,需要调用 channel.read() 设置关心的I/O事件为OP_READ,这样若有数据到达才能读取以供用户处理。该值为True时,每次读操作完毕后会自动调用 channel.read() ,从而有数据到达便能读取;否则,需要用户手动调用 channel.read() 。需要注意的是:当调用 config.setAutoRead(boolean) 方法时,如果状态由false变为true,将会调用 channel.read() 方法读取数据;由true变为false,将调用 config.autoReadCleared() 方法终止数据读取。

WRITE_BUFFER_HIGH_WATER_MARK:Netty参数,写高水位标记,默认值64KB。如果Netty的写缓冲区中的字节超过该值,Channel的isWritable()返回False。

WRITE_BUFFER_LOW_WATER_MARK:Netty参数,写低水位标记,默认值32KB。当Netty的写缓冲区中的字节超过高水位之后若下降到低水位,则Channel的 isWritable() 返回True。写高低水位标记使用户可以控制写入数据速度,从而实现流量控制。推荐做法是:每次调用 channl.write(msg) 方法首先调用 channel.isWritable() 判断是否可写。

MESSAGE_SIZE_ESTIMATOR:Netty参数,消息大小估算器,默认为DefaultMessageSizeEstimator.DEFAULT。估算ByteBuf、ByteBufHolder和FileRegion的大小,其中ByteBuf和ByteBufHolder为实际大小,FileRegion估算值为0。该值估算的字节数在计算水位时使用,FileRegion为0可知FileRegion不影响高低水位。

SINGLE_EVENTEXECUTOR_PER_GROUP:Netty参数,单线程执行ChannelPipeline中的事件,默认值为True。该值控制执行ChannelPipeline中执行ChannelHandler的线程。如果为Trye,整个pipeline由一个线程执行,这样不需要进行线程切换以及线程同步,是Netty4的推荐做法;如果为False,ChannelHandler中的处理过程会由Group中的不同线程执行。

(2)SocketChannel参数

SO_RCVBUF:Socket参数,TCP数据接收缓冲区大小。该缓冲区即TCP接收滑动窗口,linux操作系统可使用命令:cat /proc/sys/net/ipv4/tcp_rmem查询其大小。一般情况下,该值可由用户在任意时刻设置,但当设置值超过64KB时,需要在连接到远端之前设置。

SO_SNDBUF:Socket参数,TCP数据发送缓冲区大小。该缓冲区即TCP发送滑动窗口,linux操作系统可使用命令:cat /proc/sys/net/ipv4/tcp_smem查询其大小。

TCP_NODELAY:TCP参数,立即发送数据,默认值为Ture(Netty默认为True而操作系统默认为False)。该值设置Nagle算法的启用,改算法将小的碎片数据连接成更大的报文来最小化所发送的报文的数量,如果需要发送一些较小的报文,则需要禁用该算法。Netty默认禁用该算法,从而最小化报文传输延时。

SO_KEEPALIVE:Socket参数,连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。可以将此功能视为TCP的心跳机制,需要注意的是:默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。

SO_REUSEADDR:Socket参数,地址复用,默认值False。有四种情况可以使用:

(1).当有一个有相同本地地址和端口的socket1处于TIME_WAIT状态时,而你希望启动的程序的socket2要占用该地址和端口,比如重启服务且保持先前端口。

(2).有多块网卡或用IP Alias技术的机器在同一端口启动多个进程,但每个进程绑定的本地IP地址不能相同。

(3).单个进程绑定相同的端口到多个socket上,但每个socket绑定的ip地址不同。

(4).完全相同的地址和端口的重复绑定。但这只用于UDP的多播,不用于TCP。

SO_LINGER:Socket参数,关闭Socket的延迟时间,默认值为-1,表示禁用该功能。-1表示socket.close()方法立即返回,但OS底层会将发送缓冲区全部发送到对端。0表示socket.close()方法立即返回,OS放弃发送缓冲区的数据直接向对端发送RST包,对端收到复位错误。非0整数值表示调用socket.close()方法的线程被阻塞直到延迟时间到或发送缓冲区中的数据发送完毕,若超时,则对端会收到复位错误。

IP_TOS:IP参数,设置IP头部的Type-of-Service字段,用于描述IP包的优先级和QoS选项。

ALLOW_HALF_CLOSURE:Netty参数,一个连接的远端关闭时本地端是否关闭,默认值为False。值为False时,连接自动关闭;为True时,触发ChannelInboundHandler的userEventTriggered()方法,事件为ChannelInputShutdownEvent。

(3)ServerSocketChannel参数

SO_RCVBUF:已说明,需要注意的是:当设置值超过64KB时,需要在绑定到本地端口前设置。该值设置的是由ServerSocketChannel使用accept接受的SocketChannel的接收缓冲区。

SO_REUSEADDR:已说明

SO_BACKLOG:Socket参数,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows为200,其他为128。

(4)DatagramChannel参数

SO_BROADCAST:Socket参数,设置广播模式。

SO_RCVBUF:已说明

SO_SNDBUF:已说明

SO_REUSEADDR:已说明

IP_MULTICAST_LOOP_DISABLED:对应IP参数 IP_MULTICAST_LOOP,设置本地回环接口的多播功能。由于 IP_MULTICAST_LOOP 返回True表示关闭,所以Netty加上后缀 _DISABLED 防止歧义。

IP_MULTICAST_ADDR:对应IP参数 IP_MULTICAST_IF,设置对应地址的网卡为多播模式。

IP_MULTICAST_IF:对应IP参数 IP_MULTICAST_IF2 ,同上但支持IPV6。

IP_MULTICAST_TTL:IP参数,多播数据报的 time-to-live 即存活跳数。

IP_TOS:已说明

DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION:Netty参数,DatagramChannel注册的EventLoop即表示已激活。

11、Unpooled

Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作

  1. Netty 提供一个专门用来操作缓冲区(即 Netty 的数据容器)的工具类
  2. 常用方法如下所示
// 通过给定的数据和字符集编码获取 byteBuf 对象
public static ByteBuf copiedBuffer(CharSequence string, Charset charset)
  1. 举例说明 Unpooled 获取 Netty 的数据容器 ByteBuf 的基本使用【案例演示】
public static void main(String[] args) {

    //创建一个ByteBuf
    //说明
    //1. 创建 对象,该对象包含一个数组arr , 是一个byte[10]
    //2. 在netty 的buffer中,不需要使用flip 进行反转
    //   底层维护了 readerindex 和 writerIndex
    //3. 通过 readerindex 和  writerIndex 和  capacity, 将buffer分成三个区域
    // 0---readerindex 已经读取的区域
    // readerindex---writerIndex , 可读的区域
    // writerIndex -- capacity, 可写的区域
    ByteBuf buffer = Unpooled.buffer(10);

    for (int i = 0; i < 10; i++) {
        buffer.writeByte(i);
    }

    System.out.println("capacity=" + buffer.capacity());//10
    //输出
    //        for(int i = 0; i<buffer.capacity(); i++) {
    //            System.out.println(buffer.getByte(i));
    //        }
    for (int i = 0; i < buffer.capacity(); i++) {
        System.out.println(buffer.readByte());
    }
    System.out.println("执行完毕");
}

这里介绍其跟【零拷贝】相关的 wrappedBuffer 方法,可以用来包装 ByteBuf

ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});

// 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));

也可以用来包装普通字节数组,底层也不会有拷贝操作

ByteBuf buf4 = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});
System.out.println(buf4.getClass());
System.out.println(ByteBufUtil.prettyHexDump(buf4));

12、ByteBuf

(1)调试工具

private static void log(ByteBuf buffer) {
    int length = buffer.readableBytes();
    int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
    StringBuilder buf = new StringBuilder(rows * 80 * 2)
        .append("read index:").append(buffer.readerIndex())
        .append(" write index:").append(buffer.writerIndex())
        .append(" capacity:").append(buffer.capacity())
        .append(NEWLINE);
    appendPrettyHexDump(buf, buffer);
    System.out.println(buf.toString());
}

该方法可以帮助我们更为详细地查看ByteBuf中的内容

(2)创建

// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);

ByteBuf 通过 ByteBufAllocator选择allocator并调用对应的 buffer() 方法来创建的,默认使用直接内存作为ByteBuf ,容量为256个字节,可以指定初始容量的大小

当ByteBuf的容量无法容纳所有数据时,ByteBuf会进行扩容操作

如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建

直接内存与堆内存

通过该方法创建的ByteBuf,使用的是基于直接内存的ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);

可以使用下面的代码来创建池化基于堆的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);

也可以使用下面的代码来创建池化基于直接内存的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
  • 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
  • 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放

池化与非池化

池化的最大意义在于可以重用 ByteBuf(类似线程池、数据库连接池等),优点有

  • 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
  • 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
  • 高并发时,池化功能更节约内存,减少内存溢出的可能

池化功能是否开启,可以通过下面的系统环境变量(jvm参数)来设置

// 开启池化
-Dio.netty.allocator.type=unpooled

// 关闭池化
-Dio.netty.allocator.type=pooled
  • 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
  • 4.1 之前,池化功能还不成熟,默认是非池化实现

(3)组成部分

ByteBuf 主要有以下几个组成部分

  • 最大容量与当前容量

    • 在构造ByteBuf时,可传入两个参数,分别代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默认为Integer.MAX_VALUE
    • 当ByteBuf容量无法容纳所有数据时,会进行扩容操作,若超出最大容量,会抛出java.lang.IndexOutOfBoundsException异常
  • 读写操作不同于ByteBuffer只用position进行控制,ByteBuf分别由读指针和写指针两个指针控制。进行读写操作时,无需进行模式的切换

    • 读指针前的部分被称为废弃部分,是已经读过的内容
  • 读指针与写指针之间的空间称为可读部分

    • 写指针与当前容量之间的空间称为可写部分

img

(4)写入

常用方法如下

方法签名 含义 备注
writeBoolean(boolean value) 写入 boolean 值 用一字节 01|00 代表 true|false
writeByte(int value) 写入 byte 值
writeShort(int value) 写入 short 值
writeInt(int value) 写入 int 值 Big Endian(大端写入),即 0x250,写入后 00 00 02 50
writeIntLE(int value) 写入 int 值 Little Endian(小端写入),即 0x250,写入后 50 02 00 00
writeLong(long value) 写入 long 值
writeChar(int value) 写入 char 值
writeFloat(float value) 写入 float 值
writeDouble(double value) 写入 double 值
writeBytes(ByteBuf src) 写入 netty 的 ByteBuf
writeBytes(byte[] src) 写入 byte[]
writeBytes(ByteBuffer src) 写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset) 写入字符串 CharSequence为字符串类的父类,第二个参数为对应的字符集

注意

  • 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用来写入不同的数据
  • 网络传输中,默认习惯是 Big Endian,使用 writeInt(int value)

还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置

(5)扩容

当ByteBuf中的容量无法容纳写入的数据时,会进行扩容操作

扩容规则

  • 如何写入后数据大小未超过 512 字节,则选择下一个 16 的整数倍进行扩容
    • 例如写入后大小为 12 字节,则扩容后 capacity 是 16 字节
  • 如果写入后数据大小超过 512 字节,则选择下一个 2^n
    • 例如写入后大小为 513 字节,则扩容后 capacity 是 210=1024 字节(29=512 已经不够了)
  • 扩容不能超过 maxCapacity,否则会抛出java.lang.IndexOutOfBoundsException异常

(6)读取

读取主要是通过一系列read方法进行读取,读取时会根据读取数据的字节数移动读指针

读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分

如果需要重复读取,需要调用buffer.markReaderIndex()对读指针进行标记,并通过buffer.resetReaderIndex()将读指针恢复到mark标记的位置

如果需要判断可读长度,需要调用 buffer.readableBytes(),返回值为 writerIndex - readerIndex

public class ByteBufStudy {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        buffer.writeInt(5);

        // 读取4个字节
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        ByteBufUtil.log(buffer);

        // 通过mark与reset实现重复读取
        buffer.markReaderIndex();
        System.out.println(buffer.readInt());
        ByteBufUtil.log(buffer);    // 调试工具

        // 恢复到mark标记处
        buffer.resetReaderIndex();
        ByteBufUtil.log(buffer);
    }
}
1
2
3
4
read index:4 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05                                     |....            |
+--------+-------------------------------------------------+----------------+
5
read index:8 write index:8 capacity:16

read index:4 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05                                     |....            |
+--------+-------------------------------------------------+----------------+

还有以 get 开头的一系列方法,这些方法不会改变读指针的位置

(7)内存释放

由于 Netty 中有堆外内存(直接内存)的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口

  • 每个 ByteBuf 对象的初始计数为 1
  • 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
  • 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用

释放规则

因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)

基本规则是,谁是最后使用者,谁负责 release

  • 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))

  • 入站 ByteBuf 处理原则

    • 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
    • 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
    • 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
    • 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
    • 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
  • 出站 ByteBuf 处理原则

    • 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
  • 异常处理原则

    • 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true

      while (!buffer.release()) {}

当ByteBuf被传到了pipeline的head与tail时,ByteBuf会被其中的方法彻底释放,但前提是ByteBuf被传递到了head与tail中

TailConext中释放ByteBuf的源码

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
    } finally {
        // 具体的释放方法
        ReferenceCountUtil.release(msg);
    }
}

判断传过来的是否为ByteBuf,是的话才需要释放

public static boolean release(Object msg) {
    return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}

(9)copy - 复制

会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关

(10)setIndex - 设置读写索引

int errorLen = fileNameLen + fileLen;
// 获取当前读索引
int readerIndex = in.readerIndex();
// 获取当前写索引
int writerIndex = in.writerIndex();
// 设置读指针,跳过当前数据
in.setIndex(readerIndex + errorLen, writerIndex);

(11)int 和 byte[] 互转

/**
* int 转 byte数组
*/
public static byte[] intToByteArr(int i){
    // 创建指定长度为4的ByteBuffer
    ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
    // 指定字节顺序为小端
    //ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
    buffer.putInt(i); // 写入int值
    return buffer.array(); // 获取字节数组
}

/**
* byte[] 转 int
*/
public static int byteArrToInt(byte[] bytes){
    // 将字节数组包装成ByteBuffer
    ByteBuffer buffer = ByteBuffer.wrap(bytes);
    // 指定字节顺序为小端
    //ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); 
    return buffer.getInt(); // 读取int值
}

其他基本数据类型转 byte[] 也可以这个思路

ByteBuf 优势

  • 池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf

13、ByteBuf零拷贝

(1)slice - 切片

【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针

例,原始 ByteBuf 进行一些初始操作

public class TestSlice {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});

        // 将buffer分成两部分,切片过程中并没有发生数据的复制
        ByteBuf slice1 = buffer.slice(0, 5);
        ByteBuf slice2 = buffer.slice(5, 5);

        // 需要让分片的buffer引用计数加一
        // 避免原Buffer释放导致分片buffer无法使用
        slice1.retain();
        slice2.retain();

        ByteBufUtil.log(slice1);
        ByteBufUtil.log(slice2);

        // 更改原始buffer中的值
        System.out.println("===========修改原buffer中的值===========");
        buffer.setByte(0,5);

        System.out.println("===========打印slice1===========");
        ByteBufUtil.log(slice1);

        buffer.release();
        slice1.release();
        slice2.release();
    }
}

注意

  • 调用 slice 进行切片,无参 slice 是从原始 ByteBuf 的 read index 到 write index 之间的内容进行切片,切片后的 max capacity 被固定为这个区间的大小,因此不能追加 write
ByteBuf slice = origin.slice();
System.out.println(ByteBufUtil.prettyHexDump(slice));
// slice.writeByte(5); 如果执行,会报 IndexOutOfBoundsException 异常
  • 如果 slice 的内容发生了更改,这时,原始 ByteBuf 也会受影响,因为底层都是同一块内存

(2)duplicate

【零拷贝】的体现之一,就好比截取了原始 ByteBuf 所有内容,并且没有 max capacity 的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的

(3)CompositeByteBuf

【零拷贝】的体现之一,逻辑合并,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝

有两个 ByteBuf 如下

ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
System.out.println(ByteBufUtil.prettyHexDump(buf1));
System.out.println(ByteBufUtil.prettyHexDump(buf2));

现在需要一个新的 ByteBuf,内容来自于刚才的 buf1 和 buf2,如何实现?

方法1:

ByteBuf buf3 = ByteBufAllocator.DEFAULT
    .buffer(buf1.readableBytes()+buf2.readableBytes());

buf3.writeBytes(buf1);
buf3.writeBytes(buf2);

System.out.println(ByteBufUtil.prettyHexDump(buf3));

这种方法好不好?回答是不太好,因为进行了数据的内存复制操作

方法2:

CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();
// true 表示增加新的 ByteBuf 自动递增 write index, 否则 write index 会始终为 0
buf3.addComponents(true, buf1, buf2);

CompositeByteBuf 是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。

  • 优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
  • 缺点,复杂了很多,多次操作会带来性能的损耗

五、处理器 Handler

1、ChannelHandler

(1)介绍

从服务端的角度,数据从客户端发送到服务端,称之为入站,当数据处理完成返回给客户端,称之为出站。

不论是入站还是出站,handler从一端开始,到另一端结束,以责任链的模式依次执行。

ChannelHandler 是所有出入站处理器的最顶层接口。

  1. Netty的处理器可以分为两类:入站处理器出站处理器
  2. 入站处理器的顶层是 ChannelInboundHandler
    出站处理器的顶层是 ChannelOutboundHandler

image-20230530152355470

(2)官方文档

ChannelHandler 用于处理I/O事件或拦截I/O操作,并将其转发到ChannelPipeline中的下一个处理程序。

ChannelHandler 本身不提供很多方法,但你通常必须实现它的一个子类型:

  • ChannelInboundHandler 处理入站I/O事件

  • ChannelOutboundHandler 处理出站I/O操作。

另外,为了方便起见,提供了以下适配器类:

  • ChannelInboundHandlerAdapter 处理入站I/O事件;

  • ChannelOutboundHandlerAdapter 处理出站I/O操作

  • ChannelDuplexHandler 来处理入站和出站事件

上下文对象

ChannelHandlerChannelHandlerContext 对象提供。ChannelHandler应该通过上下文对象与它所属的 ChannelPipeline 进行交互。使用上下文对象,ChannelHandler 可以向上游或下游传递事件,动态修改管道,或者存储特定于处理程序的信息(使用AttributeKeys)。

状态管理

ChannelHandler 经常需要存储一些有状态信息。最简单和推荐的方法是使用成员变量:

public interface Message {
    // your methods here
}

public class DataServerHandler extends SimpleChannelInboundHandler<Message> {

    private boolean loggedIn;

    @Override
    public void channelRead0(ChannelHandlerContext ctx, Message message) {
        if (message instanceof LoginMessage) {
            authenticate((LoginMessage) message);
            loggedIn = true;
        } else (message instanceof GetDataMessage) {
            if (loggedIn) {
                ctx.writeAndFlush(fetchSecret((GetDataMessage) message));
            } else {
                fail();
            }
        }
    }
    ...
}

因为处理程序实例有一个专用于一个连接的状态变量,你必须为每个新通道创建一个新的处理程序实例,以避免出现竞争条件,否则未经身份验证的客户端可以获得机密信息:

// Create a new handler instance per channel.
// See ChannelInitializer.initChannel(Channel).
public class DataServerInitializer extends ChannelInitializer<Channel> {
    @Override
    public void initChannel(Channel channel) {
        channel.pipeline().addLast("handler", new DataServerHandler());
    }
}

使用AttributeKeys

尽管建议使用成员变量来存储处理程序的状态,但由于某些原因,您可能不希望创建许多处理程序实例。在这种情况下,你可以使用 ChannelHandlerContext 提供的 AttributeKeys:

public interface Message {
    // your methods here
}

@Sharable
public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
    private final AttributeKey<Boolean> auth = AttributeKey.valueOf("auth");

    @Override
    public void channelRead(ChannelHandlerContext ctx, Message message) {
        // 获取值
        Attribute<Boolean> attr = ctx.attr(auth);
        if (message instanceof LoginMessage) {
            authenticate((LoginMessage) o);
            // 设置值
            attr.set(true);
        } else (message instanceof GetDataMessage) {
            if (Boolean.TRUE.equals(attr.get())) {
                ctx.writeAndFlush(fetchSecret((GetDataMessage) o));
            } else {
                fail();
            }
        }
    }
    ...
}

现在处理程序的状态被附加到 ChannelHandlerContext 中,你可以将相同的处理程序实例添加到不同的管道中:

public class DataServerInitializer extends ChannelInitializer<Channel> {

    private static final DataServerHandler SHARED = new DataServerHandler();

    @Override
    public void initChannel(Channel channel) {
        channel.pipeline().addLast("handler", SHARED);
    }
}

扩展可见 @Sharable 注解

(3)@Sharable 注解

官方解释

在上面使用 AttributeKey 的示例中,您可能已经注意到 @Sharable 注释。

如果 ChannelHandler@Sharable 注释,这意味着你可以只创建一个处理程序的实例,并将其多次添加到一个或多个 ChannelPipelines 中,而不需要竞争条件。

如果未指定此注释,则每次将处理程序实例添加到管道时都必须创建一个新的处理程序实例,因为它具有成员变量等非共享状态。

白话解释

当netty尝试往多个channel的pipeline中添加同一个ChannelHandlerAdapter实例时,会判断该实例类是否添加了@Sharable,没有则抛出异常

  • 如果你添加的不是单例Handler(即每次都是new新对象),你加不加@Sharable没有任何区别

  • 如果你添加的是单例Handler,只要它会被添加到多个channel的pipeline,那就必须加上@Sharable

网上有些资料说netty会将注解了 @Sharable 的Handler单例化,实在误导人。@Sharable 只是一个标注,告诉 netty 这个类对象是线程安全的,同时也提示 netty 使用者要注意线程安全问题。

多个channel公用单例ChannelHandler,那它就必须是线程安全的,@Sharable就是用于告诉netty,我这个Handler是线程安全的,可以被多个channel安全的share,假如你搞了个线程不安全的类,你对它用上了单例,还加个@Sharable,这时候netty拿你也没办法

那是不是所有Handler都可以加上@Sharable呢?

不是,netty中,解码器有关的Handler都是不安全的,因为粘包拆包的缘故,Decoder必须要保存一些解析过程的中间状态,比如ByteToMessageDecoder类中维护的一个字节累加器cumulation,每次读到当前channel的消息后都会将消息累加到cumulation中,然后再调用子类实现的decode方法。

所以它不能被多个channel安全的共享,netty明确的禁止了使用单例Decoder,即使你自作聪明的给他加上@Sharable,也会被禁止掉,抛出… is not allowed to be shared错误,所以我们在添加Decoder时,1必须不是单例,2不要添加@Sharable

(4)源码

public interface ChannelHandler {

    /**
     * 在ChannelHandler添加到实际上下文中并准备好处理事件后调用。
     */
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;

    /**
     * 在ChannelHandler从实际上下文中删除后被调用,并且它不再处理事件。
     */
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

    /**
     * 如果异常被抛出,将被调用。
     *
     * 过时,如果需要则可以实现ChannelInboundHandler中的方法
     */
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

    /**
     * 指示可以将带注释的ChannelHandler的相同实例多次添加到一个或多个channelpipeline中,
     * 而不需要竞争条件。
     *
     * 如果未指定此注释,则每次将处理程序实例添加到管道时都必须创建一个新的处理程序实例,
     * 因为它具有成员变量等非共享状态。
     *
     * This annotation is provided for documentation purpose, just like
     * <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>.
     */
    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
        // no value
    }
}

2、ChannelHandlerContext

(1)介绍

通道处理器上下文。 当ChannelHandler添加到ChannelPipeline时,每一个处理器都会分配一个上下文与之绑定,生死不离。上下文可以自身处理器与其他的处理器进行交互,因为上下文并不会改变处理器本身,所以上下文是安全的。

处理器与上下文和管道的关系图

这里写图片描述

上下文中的方法有一个特点 : 它的事件传播是从它本身作为起点,它不需要流经整个管道。

这里写图片描述

而通道和管道的方法:都是需要从管道的一端传播到另一端,需要流经整个管道。

这里写图片描述

这样一来导致调用ChannelHandler处理器的数量是不同的,当然也会产生不同的效果。

在上下文中可以调用管道来动态增、删、改处理器,可以保存上下文供后面使用。

处理器和上下文的高级用法:

  • 将处理器添加到管道中实现动态协议切换
  • 缓存上下文

一个处理器是可以供多个管道使用的,但是要添加@Sharable标记该处理是安全共享的,共享一个处理器可以收集不同channel的统计信息或者使用一个ChannelHandler来统计连接数、处理全局数据等。

(2)官方文档

ChannelHandlerContext (通道处理器上下文【字面翻译】)使 ChannelHandler 能够与其ChannelPipeline 和其他处理程序交互。处理程序可以通知 ChannelPipeline 中的下一个ChannelHandler,也可以动态修改它所属的 ChannelPipeline

通知

您可以通过调用这里提供的各种方法之一来通知同一个 ChannelPipeline 中最近的处理程序。请参考ChannelPipeline来了解事件的流程。

修改管道

你可以通过调用 pipeline() 来获取你的处理程序所属的 ChannelPipeline。重要的应用程序可以在运行时动态地插入、删除或替换管道中的处理程序。

检索供以后使用

您可以保留 ChannelHandlerContext 以供以后使用,例如触发处理程序方法之外的事件,甚至可以从不同的线程触发。

public class MyHandler extends ChannelDuplexHandler {

    private ChannelHandlerContext ctx;

    public void beforeAdd(ChannelHandlerContext ctx) {
        this.ctx = ctx;
    }

    public void login(String username, password) {
        ctx.write(new LoginMessage(username, password));
    }
    ...
}

存储状态信息

attr(AttributeKey) 允许您存储和访问与处理程序及其上下文相关的有状态信息。请参考ChannelHandler了解管理有状态信息的各种推荐方法。

一个处理程序可以有多个上下文

请注意,ChannelHandler 实例可以被添加到多个 ChannelPipeline 中。这意味着单个 ChannelHandler实例可以有多个 ChannelHandlerContext,因此,如果将单个实例不止一次地添加到一个或多个ChannelPipelines 中,则可以使用不同的 ChannelHandlerContexts调用该实例。

例如,下面的处理程序将拥有与它被添加到管道的次数一样多的独立 attributekey,无论它是被多次添加到同一个管道还是多次添加到不同的管道:

public class FactorialHandler extends ChannelInboundHandlerAdapter {

    private final AttributeKey<Integer> counter = AttributeKey.valueOf("counter");

    // This handler will receive a sequence of increasing integers starting
    // from 1.
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        Integer a = ctx.attr(counter).get();

        if (a == null) {
            a = 1;
        }

        attr.set(a * (Integer) msg);
    }
}

// 不同的上下文对象被赋予"f1", "f2", "f3"和"f4",即使它们引用相同的处理程序实例。
// 由于FactorialHandler将其状态存储在上下文对象中(使用AttributeKey),
// 因此一旦两个管道(p1和p2)处于活动状态,阶乘将被正确计算4次。
FactorialHandler fh = new FactorialHandler();

ChannelPipeline p1 = Channels.pipeline();
p1.addLast("f1", fh);
p1.addLast("f2", fh);

ChannelPipeline p2 = Channels.pipeline();
p2.addLast("f3", fh);
p2.addLast("f4", fh);

(3)源码

image-20230530155936156

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

    /**
     * 返回绑定到当前上下文的 Channel
     */
    Channel channel();

    /**
     * 返回EventExecutor,用于执行任意任务。
     */
    EventExecutor executor();

    /**
     * ChannelHandlerContext的唯一名称。
     * 当ChannelHandler被添加到ChannelPipeline时使用这个名称。
     * 该名称还可以用于从ChannelPipeline访问已注册的ChannelHandler。
     * 通过下面方法设置别名
     * ChannelPipeline addLast(String name, ChannelHandler handler);
     */
    String name();

    /**
     * 绑定了当前上下文的ChannelHandler
     */
    ChannelHandler handler();

    /**
     * 如果从ChannelPipeline中删除了属于此上下文的ChannelHandler,则返回true。
     * 注意,这个方法只能在EventLoop中调用
     */
    boolean isRemoved();

    /**
     * 返回指定的ChannelPipeline
     */
    ChannelPipeline pipeline();

    /**
     * 返回分配的{@link ByteBufAllocator},它将用于分配{@link ByteBuf}。
     */
    ByteBufAllocator alloc();

    @Override
    ChannelHandlerContext fireChannelRegistered();

    @Override
    ChannelHandlerContext fireChannelUnregistered();

    @Override
    ChannelHandlerContext fireChannelActive();

    @Override
    ChannelHandlerContext fireChannelInactive();

    @Override
    ChannelHandlerContext fireExceptionCaught(Throwable cause);

    @Override
    ChannelHandlerContext fireUserEventTriggered(Object evt);

    @Override
    ChannelHandlerContext fireChannelRead(Object msg);

    @Override
    ChannelHandlerContext fireChannelReadComplete();

    @Override
    ChannelHandlerContext fireChannelWritabilityChanged();

    @Override
    ChannelHandlerContext read();

    @Override
    ChannelHandlerContext flush();

    /**
     * @deprecated Use {@link Channel#attr(AttributeKey)}
     */
    @Deprecated
    @Override
    <T> Attribute<T> attr(AttributeKey<T> key);

    /**
     * @deprecated Use {@link Channel#hasAttr(AttributeKey)}
     */
    @Deprecated
    @Override
    <T> boolean hasAttr(AttributeKey<T> key);
}

(4)ChannelOutboundInvoker

ChannelOutboundInvoker是Netty框架中的一种封装,它的作用是将网络请求消息从客户端发送到服务器端,属于Netty的出站消息处理机制。具体来说,它提供了一系列的方法,可以让开发者将数据写入当前的Channel中,同时也提供了一些控制方法,可以控制出站的流量和数据的传输情况。

在Netty框架中,一个Channel可以表示一个TCP连接。在发送数据时,ChannelOutboundInvoker通常会被应用于ChannelPipeline的尾部,负责处理最后的ChannelOutboundHandler。它主要用于了解待发送的消息是否已经成功地被写入到Channel中,并且允许开发者根据实际情况进行更细粒度的控制,比如优化网络传输效率或处理与传输相关的异常情况等。

public interface ChannelOutboundInvoker {

    /**
     * 请求绑定到给定的 SocketAddress
     */
    ChannelFuture bind(SocketAddress localAddress);

    /**
     * 请求连接到给定的 SocketAddress
     */
    ChannelFuture connect(SocketAddress remoteAddress);

    /**
     * 请求连接到给定的{@link SocketAddress},同时绑定到localAddress
     */
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);

    /**
     * 请求断开与远程对等点的连接,并在操作完成后通知{@link ChannelFuture}
     */
    ChannelFuture disconnect();

    /**
     * 请求关闭{@link Channel}
     */
    ChannelFuture close();

    /**
     * 请求从之前分配的{@link EventExecutor}取消注册
     *
     */
    ChannelFuture deregister();

    /**
     * 请求绑定到给定的{@link SocketAddress}
     */
    ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);

    /**
     * 请求连接到给定的{@link SocketAddress}
     */
    ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);

    /**
     * 请求连接到给定的{@link SocketAddress},同时绑定到localAddress
     */
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

    /**
     * 请求断开与远程对等点的连接
     */
    ChannelFuture disconnect(ChannelPromise promise);

    /**
     * 请求关闭{@link Channel}
     */
    ChannelFuture close(ChannelPromise promise);

    /**
     * 请求从之前分配的{@link EventExecutor}取消注册
     */
    ChannelFuture deregister(ChannelPromise promise);

    /**
     * 请求从Channel读取数据到第一个入站缓冲区,如果读取数据,则触发{@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)事件,并触发{@link channelboundhandler #channelReadComplete(ChannelHandlerContext) channelReadComplete}事件,以便处理程序可以决定继续读取。如果已经有一个挂起的读操作,这个方法什么也不做。
     */
    ChannelOutboundInvoker read();


    // =====上面所有方法导致Channel的ChannelPipeline中的下一个ChannelOutboundHandler的同一方法被调用==========

    /**
     * 请求通过这个{@link ChannelHandlerContext}通过{@link ChannelPipeline}写入消息。此方法不会请求实际刷新,因此请确保在希望请求将所有挂起数据刷新到实际传输时调用{@link #flush()}。
     */
    ChannelFuture write(Object msg);

    /**
     * 请求通过这个{@link ChannelHandlerContext}通过{@link ChannelPipeline}写入消息。此方法不会请求实际刷新,因此请确保在希望请求将所有挂起数据刷新到实际传输时调用{@link #flush()}。
     */
    ChannelFuture write(Object msg, ChannelPromise promise);

    /**
     * 请求通过此ChannelOutboundInvoker刷新所有挂起的消息。
     */
    ChannelOutboundInvoker flush();

    /**
     * 调用{@link #write(Object, ChannelPromise)}和{@link #flush()}的快捷方式。
     */
    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);

    /**
     * 调用{@link #write(Object)} and {@link #flush()}的快捷方式。
     */
    ChannelFuture writeAndFlush(Object msg);

    /**
     * 返回一个新的{@link ChannelPromise}。
     */
    ChannelPromise newPromise();

    /**
     * 返回一个新的{@link ChannelProgressivePromise}
     */
    ChannelProgressivePromise newProgressivePromise();

    /**
     * 创建一个新的{@link ChannelFuture},标记为已成功。因此{@link ChannelFuture#isSuccess()}将返回{@code true}。所有添加到它的{@link FutureListener}将被直接通知。而且阻塞方法的每个调用都将返回,而不会阻塞。
     */
    ChannelFuture newSucceededFuture();

    /**
     * 创建一个新的{@link ChannelFuture},它已经被标记为失败。因此{@link ChannelFuture#isSuccess()}将返回{@code false}。所有添加到它的{@link FutureListener}将被直接通知。而且阻塞方法的每个调用都将返回,而不会阻塞。
     */
    ChannelFuture newFailedFuture(Throwable cause);

    /**
     * 返回可用于不同操作的特殊ChannelPromise。它只支持用于{@link ChannelOutboundInvoker#write(Object, ChannelPromise)}。注意,返回的{@link ChannelPromise}将不支持大多数操作,只有在您希望为每个写操作保存对象分配时才应该使用。您将无法检测操作是否完成,除非它失败,因为在这种情况下,实现将调用{@link ChannelPipeline#fireExceptionCaught(Throwable)}。请注意这是一个专家特性,应该小心使用!
     */
    ChannelPromise voidPromise();
}

(5)ChannelInboundInvoker

ChannelInboundInvoker是Netty框架中的一种组件,它的作用是负责将从服务器端接收到的网络数据流读取并进行处理,属于Netty的入站消息处理机制。具体来说,它提供了一系列的方法,可以将数据读取到当前的Channel中,并且可以对读取到的数据进行解码或者处理。

当有新的数据到达服务器端时,ChannelInboundInvoker会自动被触发,它会将接收到的数据交给ChannelPipeline中的第一个ChannelInboundHandler进行处理,并将处理结果继续传递给Pipeline中的下一个ChannelInboundHandler。因此,它充当了消息处理链的入口。

在处理入站数据时,开发人员可以自由地在ChannelInboundHandler中编写业务逻辑,比如解密数据、解析请求、进行鉴权等操作。另外,在处理过程中,ChannelInboundInvoker还可以协助开发人员处理异常情况、优化网络传输等问题,从而实现高效、稳定的网络数据传输。

public interface ChannelInboundInvoker {

    /**
     * 通知下一个ChannelHandler当前Channel已经注册到它的{@link EventLoop}
     */
    ChannelInboundInvoker fireChannelRegistered();

    /**
     * 通知下一个ChannelHandler当前Channel已经从它的{@link EventLoop}中注销。
     */
    ChannelInboundInvoker fireChannelUnregistered();

    /**
     * 通知下一个ChannelHandler当前Channel已经活跃
     */
    ChannelInboundInvoker fireChannelActive();

    /**
     * 通知下一个ChannelHandler当前Channel已经不活跃
     */
    ChannelInboundInvoker fireChannelInactive();

    /**
     * 在其入站操作之一中, {@link Channel} 接收到{@link Throwable}这意味着它已经连接。
     */
    ChannelInboundInvoker fireExceptionCaught(Throwable cause);

    /**
     * 通知下一个ChannelHandler当前Channel接收到一个用户定义的事件。
     */
    ChannelInboundInvoker fireUserEventTriggered(Object event);

    /**
     * 通知下一个ChannelHandler当前Channel接收到一条消息。
     */
    ChannelInboundInvoker fireChannelRead(Object msg);

    /**
     * 
     */
    ChannelInboundInvoker fireChannelReadComplete();

    /**
     * 通知下一个ChannelHandler当前Channel的可写状态发生变化。
     */
    ChannelInboundInvoker fireChannelWritabilityChanged();
}

方法执行完后都将触发下一个

(6)实现类

image-20230530170553658

3、ChannelInboundHandler

public interface ChannelInboundHandler extends ChannelHandler {

    /**
     * 当Channel已经被注册到EventLoop时被调用。
     */
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * 当Channel已经被取消注册时被调用。
     */
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * 当Channel已经连接成功后被调用,表示这条连接是活跃的。
     */
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    /**
     * 当Connection被关闭时被调用,当Connection不活跃时被调用。
     */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    /**
     * 当从对方收到消息时被调用,即IO入站数据的入口。
     */
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    /**
     * 当一次读取操作完成时被调用,在其中可以对消息进行解码或者编码。
     */
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    /**
     * 当用户定义的事件被触发时被调用。
     */
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    /**
     * 当Channel变为可写状态时被调用。
     * 由于Channel有一个写缓冲区,只有当写缓冲区的可用空间大小超过阈值时才会被标记为可写状态。
     *
     * 当Channel的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(以避免发生 
     * OutOfMemoryError)或者可以在 Channel 变为再次可写时恢复写入。可以通过调用Channel的
     * isWritable()方法来检测Channel 的可写性。与可写性相关的阈值可以通过
     * Channel.config().setWriteHighWaterMark()和 
     * Channel.config().setWriteLowWaterMark()方法来设置
     */
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    /**
     * 当IO异常被抛出时被调用。
     */
    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

ChannelInboundHandlerAdapter

ChannelInboundHandler 实现的抽象基类,它提供了其所有方法的实现。

这个实现只是将操作转发给 ChannelPipeline 中的下一个 ChannelHandler。子类可以重写方法实现来改变这一点。

请注意,在 channelRead(ChannelHandlerContext, Object) 方法自动返回之后,消息不会被释放。如果您正在寻找自动释放接收到的消息的 ChannelInboundHandler 实现,请参阅SimpleChannelInboundHandler

4、ChannelOutboundHandler

public interface ChannelOutboundHandler extends ChannelHandler {
    /**
     * 将Channel绑定到指定的本地地址和端口号时调用
     */
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * 建立到远程地址的连接时调用
     */
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * 关闭此Channel时调用
     */
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * 关闭与远程对等点的连接时调用
     */
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * 取消此Channel到它的EventLoop的注册时调用
     */
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * 从Channel读取数据时调用
     */
    void read(ChannelHandlerContext ctx) throws Exception;

    /**
    * 在进行写操作时调用。写操作将通过ChannelPipeline写入消息。
    * 一旦调用了Channel#flush(),它们就可以被刷新到实际的Channel
    */
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    /**
     * 在执行刷新操作时调用。清除操作将尝试清除所有以前写的挂起的消息。
     */
    void flush(ChannelHandlerContext ctx) throws Exception;
}

5、SimpleChannelInboundHandler

作用:支持泛型的消息处理,默认情况下消息处理完之后将会自动释放,无法提供fire*方法传递给ChannelPipeline中的下一个ChannelHandler,如果想要传递给下一个ChannelHandler需要调用ReferenceCountUtil#retain方法。

它允许显式地只处理特定类型的消息。例如,这里有一个实现,它只处理字符串消息。

public class StringHandler extends
    SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String message)
        throws Exception {
        System.out.println(message);
    }
}

请注意,根据构造函数参数的不同,它将通过传递给 ReferenceCountUtil.release(Object) 来释放所有已处理的消息。在这种情况下,如果您将对象传递给 ChannelPipeline 中的下一个处理程序,则可能需要使用 ReferenceCountUtil.retain(Object)

基本上只需要实现 channelRead0 方法即可

public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {

    private final TypeParameterMatcher matcher;
    private final boolean autoRelease;

    protected SimpleChannelInboundHandler() {
        this(true);
    }

    protected SimpleChannelInboundHandler(boolean autoRelease) {...}

    protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) {
        this(inboundMessageType, true);
    }

    protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, boolean autoRelease) {
        matcher = TypeParameterMatcher.get(inboundMessageType);
        this.autoRelease = autoRelease;
    }

    /**
     * 判断是否是需要处理的消息类型
     */
    public boolean acceptInboundMessage(Object msg) throws Exception {
        return matcher.match(msg);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }

    /**
     * 当从对方收到消息时被调用,即IO入站数据的入口。
     */
    protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
}

6、ChannelDuplexHandler

ChannelDuplexHandler 是除了入站和出站handler之外的,另一个常用子类。

它同时实现了 ChannelInboundHandlerChannelOutboundHandler 接口,如果需要既处理入站事件又处理出站事件,可以继承此类。

public void channelRegistered(ChannelHandlerContext ctx) throws Exception;

当Channel已经注册到EventLoop时被调用。

public void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

当Channel已经从EventLoop中注销时被调用。

public void channelActive(ChannelHandlerContext ctx) throws Exception;

当Channel已经处于活跃状态时被调用。

public void channelInactive(ChannelHandlerContext ctx) throws Exception;

当Channel已经处于非活跃状态时被调用。

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

当从对方收到一条消息时被调用。

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

当一次读取操作完成时被调用,在其中可以对消息进行解码或者编码。

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

将消息通过Channel发送给对等方。

public void flush(ChannelHandlerContext ctx) throws Exception;

请求将尚未发送的队列数据通过Channel发送给对等方。

public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

当Channel的可写状态发生变化时被调用。

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

当IO异常被抛出时被调用。

7、事件传播

public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerA: " + msg);
        super.channelRead(ctx, msg);
    }
}

channelRead() 方法里面,我们打印当前 handler 的信息,然后调用父类的 channelRead() 方法,而这里父类的 channelRead() 方法会自动调用到下一个 inBoundHandler 的 channelRead() 方法,并且会把当前 inBoundHandler 里处理完毕的对象传递到下一个 inBoundHandler,我们例子中传递的对象都是同一个 msg。

public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutBoundHandlerA: " + msg);
        super.write(ctx, msg, promise);
    }
}

write() 方法里面,我们打印当前 handler 的信息,然后调用父类的 write() 方法,而这里父类的 write() 方法会自动调用到下一个 outBoundHandlerwrite() 方法,并且会把当前 outBoundHandler 里处理完毕的对象传递到下一个 outBoundHandler

六、常用处理器

1、编解码器

(1)编解码器接口

ByteToMessageCodec

使用方法同下面的编码器接口、解码器接口;案例见字符串编解码器

public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler {

   /**
    * 将可处理的写入消息编码为ByteBuf。
    */
    protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;

    /**
    * 解码从一个字节串到另一个字节串。
    * 此方法将被调用,直到从该方法返回时输入ByteBuf没有任何可读取的内容,
    * 或者直到没有从输入ByteBuf读取任何内容
    */
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
}

(2)编码器接口

MessageToByteEncoder

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
    /**
    * 将可处理的写入消息编码为ByteBuf。
    */
    protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;    
}

(3)解码器接口

ByteToMessageDecoder

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {

    /**
    * 解码从一个字节串到另一个字节串。
    * 此方法将被调用,直到从该方法返回时输入ByteBuf没有任何可读取的内容,
    * 或者直到没有从输入ByteBuf读取任何内容
    */
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
}

注意

不能在 decode() 方法中调用 ctx.writeAndFlush() 方法将数据传输给客户端!!!

其中 ByteToMessageDecoder 处理解码结果时,调用下一个handler 的代码如下,需要先判断 out 集合才会调用下一个 handler ,如果 out 集合中没有 add() 数据就调用 ctx.writeAndFlush() ,那么就不会调用下一个handler 。

 public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {   
    // numElements 是 List<Object> out 的长度
    static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
        for(int i = 0; i < numElements; ++i) {
            ctx.fireChannelRead(msgs.getUnsafe(i));
        }

    }
 }

(4)字符串编解码器

Charset charset = StandardCharsets.UTF_8;
// 行解码器,用于处理粘包半包
ch.pipeline().addLast(
    new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer("_".getBytes()))
);
// 字符串编解码器
// netty 收发数据类型都是 byteBuf ,所以需要转为字符串进行操作
ch.pipeline().addLast("encoder", new StringEncoder(charset));
ch.pipeline().addLast("decoder", new StringDecoder(charset));

解码器源码

@Sharable
public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {

    // TODO Use CharsetDecoder instead.
    private final Charset charset;

    /**
     * Creates a new instance with the current system character set.
     */
    public StringDecoder() {
        this(Charset.defaultCharset());
    }

    /**
     * Creates a new instance with the specified character set.
     */
    public StringDecoder(Charset charset) {
        if (charset == null) {
            throw new NullPointerException("charset");
        }
        this.charset = charset;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        out.add(msg.toString(charset));
    }
}

编码器源码

@Sharable
public class StringEncoder extends MessageToMessageEncoder<CharSequence> {

    // TODO Use CharsetEncoder instead.
    private final Charset charset;

    /**
     * Creates a new instance with the current system character set.
     */
    public StringEncoder() {
        this(Charset.defaultCharset());
    }

    /**
     * Creates a new instance with the specified character set.
     */
    public StringEncoder(Charset charset) {
        if (charset == null) {
            throw new NullPointerException("charset");
        }
        this.charset = charset;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
        if (msg.length() == 0) {
            return;
        }

        out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
    }
}

2、心跳机制

// 心跳机制处理器,参数说明如下:
// 参数1:读超时(秒),超过指定秒数没收到客户端数据,则抛出读超时事件
// 参数2:写超时(秒),超过指定秒数没写到客户端数据,则抛出写超时事件
// 参数3:全超时(秒),超过指定秒数没读/写到客户端数据,则抛出全超时事件
ch.pipeline().addLast(new IdleStateHandler(3,0,0));

当通道有一段时间没有执行读、写或两个操作时触发 IdleStateEvent

  • 当在指定的时间段内没有执行读操作时,将触发 READER_IDLE
  • 当在指定的时间段内没有执行写操作时,将触发 WRITER_IDLE
  • 当在指定的时间段内没有进行读写操作时,将触发 ALL_IDLE

案例代码

// An example that sends a ping message when there is no outbound traffic
// for 30 seconds.  The connection is closed when there is no inbound traffic
// for 60 seconds.

public class MyChannelInitializer extends ChannelInitializer<Channel> {
    @Override
    public void initChannel(Channel channel) {
        channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(60, 30, 0));
        channel.pipeline().addLast("myHandler", new MyHandler());
    }
}

// Handler should handle the IdleStateEvent triggered by IdleStateHandler.
public class MyHandler extends ChannelDuplexHandler {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            if (e.state() == IdleState.READER_IDLE) {
                ctx.close();
            } else if (e.state() == IdleState.WRITER_IDLE) {
                ctx.writeAndFlush(new PingMessage());
            }
        }
    }
}

底层原理

当触发超时事件时,通过调用 fireUserEventTriggered() 方法将事件传到下一个 handler 供用户处理。

七、双向通讯案例

客户端输入什么数据就发送什么数据,服务端读取并返回该数据

1、服务端

new ServerBootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf buffer = (ByteBuf) msg;
                    System.out.println(buffer.toString(Charset.defaultCharset()));

                    // 建议使用 ctx.alloc() 创建 ByteBuf
                    ByteBuf response = ctx.alloc().buffer();
                    response.writeBytes(buffer);
                    ctx.writeAndFlush(response);

                    // 思考:需要释放 buffer 吗
                    // 思考:需要释放 response 吗
                    // 需要,因为并没有向后传递了
                    buffer.release();
                    response.release();
                }
            });
        }
    }).bind(8080);

2、客户端

NioEventLoopGroup group = new NioEventLoopGroup();
Channel channel = new Bootstrap()
    .group(group)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringEncoder());
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf buffer = (ByteBuf) msg;
                    System.out.println(buffer.toString(Charset.defaultCharset()));

                    // 思考:需要释放 buffer 吗
                    buffer.release();
                }
            });
        }
    }).connect("127.0.0.1", 8080).sync().channel();

channel.closeFuture().addListener(future -> {
    group.shutdownGracefully();
});

new Thread(() -> {
    Scanner scanner = new Scanner(System.in);
    while (true) {
        String line = scanner.nextLine();
        if ("q".equals(line)) {
            channel.close();
            break;
        }
        channel.writeAndFlush(line);
    }
}).start();

3、读写误区

Java Socket 是全双工的:在任意时刻,线路上存在A 到 BB 到 A 的双向信号传输。即使是阻塞 IO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读


  目录