Netty 源码剖析与实战

仍需实践.

# I/O 模式

# 阻塞/非阻塞

  • 阻塞:没有数据传输过来时,读操作会阻塞直到有数据;缓冲区满时,写操作也会阻塞
  • 非阻塞:遇到上述情形时,直接返回

# 同步/异步

  • 数据就绪后需要自己去读是同步
  • 数据就绪直接读好再回调给程序是异步

# reactor 模式

  • 单 reactor 单线程模式
  • 单 reactor 多线程模式
  • 主从 reactor 模式
1
2
3
4
5
6
7
8
9
// single reactor
NioEventLoopGroup g = new NioEventLoopGroup(1);

// multiple threads reactor
NioEventLoopGroup g = new NioEventLoopGroup(); // default core size * 2

// boss-worker
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

# Codec

# 粘包、半包? TCP 对上层数据进行拆分发送, 接收端需要重组

ByteToMessageDecoder <- ChannelInboundHandlerAdapter

Cumulator,解决半包的利器。

customize your protocol

  • 固定长度 - FixedLengthFrameDecoder
  • 特定分隔符 - DelimieterBasedFrameDecoder
  • 指定长度存储长度,后续填充数据 - LengthFieldBasedFrameDecoder, LengthFieldPrepender
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
 * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
 */
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
    // cumulation 已积攒的数据,in 当次数据
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        if (cumulation == in) {
            // when the in buffer is the same as the cumulation it is doubly retained, release it once
            in.release();
            return cumulation;
        }
        if (!cumulation.isReadable() && in.isContiguous()) {
            // If cumulation is empty and input buffer is contiguous, use it directly
            cumulation.release();
            return in;
        }
        try {
            final int required = in.readableBytes();
            if (required > cumulation.maxWritableBytes() ||
                required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1 ||
                cumulation.isReadOnly()) {
                // Expand cumulation (by replacing it) under the following conditions:
                // - cumulation cannot be resized to accommodate the additional data
                // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
                //   assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
                return expandCumulation(alloc, cumulation, in);
            }
            cumulation.writeBytes(in, in.readerIndex(), required);
            in.readerIndex(in.writerIndex());
            return cumulation;
        } finally {
            // We must release in all cases as otherwise it may produce a leak if writeBytes(...) throw
            // for whatever release (for example because of OutOfMemoryError)
            in.release();
        }
    }
};

#

  • 减小粒度 - 锁的对象与范围
  • 注意锁对象本身的大小
  • 注意锁的速度 - JDK 不同版本实现 (LongAdderCounter - AtomicLongAdder)
  • 不同场景使用不同并发容器
  • 尽量不使用锁

# 源码解析

NioEventLoop#run -> AbstractNioChannel.unsafe#read

  • OP_ACCEPT: AbstractNioMessageChannel#read
  • OP_READ: AbstractNioByteChannel#read

# Server 启动

# main thread

  1. 创建 selector
  2. 创建 server socket channel
  3. 初始化 server socket channel
  4. 给 server socket channel 从 boss group 中绑定一个 event loop

# boss event loop

  1. 将 server socket channel 注册到 boss event loop 的 selector 上
  2. 绑定 local address 启动
  3. 注册 OP_ACCEPT 到 selector 上
1
2
3
4
5
6
7
Selector selector = sun.nio.ch.SelectorProviderImpl.openSelector();
ServerSocketChannel ssc = provider.openSeverSocketChannel();
// attachment 用于后续获取
SelectionKey sk = AbstractChannel.javaChannel().register(eventLoop().unwrappedSelector, 0, this);

javaChannel().bind(localAddress, config.getBacklog());
sk.interestOps(OP_ACCEPT);

# 建立连接

  1. 创建连接的初始化和注册是通过 pipeline.fireChannelReadServerBootstrapAcceptor 中完成的。
  2. 第一次 register 并不是监听 OP_READ,而是 0
  3. 最终监听 OP_READ 是通过 register 完成后的 fireChannelActive 来触发
  4. worker NioEventLoop 通过 register 操作来启动
  5. 接受连接的读操作,不会尝试读取超过 16 次

# 调优参数

# Linux 系统参数

  • ulimit -n [xxx] (每个进程打开文件句柄数量限制)

# Netty 相关系统参数

ServerSocketChannnel

  • SO_RCVBUF 接收缓冲区大小 /proc/sys/net/ipv4/tcp_rmem: 4k
  • SO_REUSEADDR 多网卡 IP 绑定相同 port
  • SO_BACKLOG 最大的等待连接数量
    • /proc/sys/net/core/somaxcon
    • sysctl
    • 默认 128

SocketChannel

  • SO_SNDBUF 发送缓冲区大小 /proc/sys/net/ipv4/tcp_wmem: 4k
  • SO_RCVBUF 接收缓冲区大小 /proc/sys/net/ipv4/tcp_rmem: 4k
  • SO_KEEPALIVE TCP 层 keepalive
  • SO_REUSEADDR 多网卡 IP 绑定相同 port
  • SO_LINGER 关闭 socket 的延时时间
  • IP_TOS 设置 IP header 的 Type-of-Service 字段, 描述 IP 包的优先级和 QoS 选项 (倾向于延时还是吞吐量)
  • TCP_NODELAY 是否启用 Nagle 算法, 将小的碎片数据连接成更大的报文

# Netty 核心参数

ChannelOption

  • WRITE_BUFFER_WATER_MARK 高低水位线, 控制写数据 OOM 32k ~ 64k per channel
  • CONNECT_TIMEOUT_MILLIS 客户端你连接服务器最大允许时间
  • MAX_MESSAGES_PER_READ 最大允许连续读次数
  • WRITE_SPIN_COUNT 最大允许连续写次数
  • ALLOCATOR ByteBuf 分配器
  • RCVBUF_ALLOCATOR 数据接收 ByteBuf 分配器大小计算器+读次数控制器 AdaptiveRecvByteBufAllocator
  • AUTO_READ 是否监听读事件
  • AUTO_CLOSE 写数据失败, 是否关闭连接
  • MESSAGE_SIZE_ESTIMATOR 数据大小计算器 DefaultMessageSizeEstimator.DEFAULT
  • SINGLE_EVENTEXECUTOR_PER_GROUP 当增加一个 handler 且指定 EventExecutorGroup 时: 决定这个 handler 是否只用 group 中的一个固定 EventExecutor
  • ALLOW_HALF_CLOSURE 关闭连接时, 允许半关

# annotations

  • @Sharable handler is sharable through all pipelines
  • @Skip
  • @UnstableApi

# usage improvement

  • separate write & flush -> ctx.writeAndFlush (flush on the flight)
    • channelRead -> ctx.write
    • channelReadComplete -> ctx.flush
  • flushConsolidationHandler
  • 流量整形
    • Global level - GlobalTrafficShapingHandler
    • Channel level - ChannelTrafficShapingHandler
    • combine both - GlobalChannelTrafficShapingHandler

Netty 流量整形源码分析总结

  • 读写流控判断: 按一定时间段 checkInterval 来统计
    • writeLimit/readLimit 设置为 0 时, 关闭写/读整形
  • 等待时间范围控制 10ms (MINIMAL_WAIT) – 15s (maxTime)
  • 读流控: 取消读事件监听, 让读缓存区存满, 然后对端写缓存区写满, 然后对端写不出去, 对端对数据进行丢弃或者减缓发送
  • 写流控: 待发数据放入 Queue. 等待超过 4s(maxWriteDelay) | 单个 channel 缓存的数据超过 4M(MaxWriteSize) | 所有缓存数据超过 400M(maxGLobalWriteSize)时修改写状态不可写

Netty OOM - 读速度大于写速度

  • 上游发送太快: 任务重
  • 自己: 处理慢/不发或发的慢: 处理能力有限, 流量控制等原因
  • 网速: 卡
  • 下游处理速度慢: 导致不及时读取接受 Buffer 数据, 触发背压
  • ChannelOutboundBuffer - LinkedList<ChannelOutboundBuffer.Entry>
    • 判断 totalPendingSize 大于 writeBufferWaterMark.high() 设置为 unwritable
  • TrafficShapingHandler - Channel-
    • messagesQueue<ChannelTrafficShapingHandler.ToSend>
    • 判断 queueSize 大于 maxWriteSize 或 delay 大于 maxWriteDelay 设置为 unwritable
  • 对策
    • 高低水位线 (32-64k)
    • 流量整形参数
    • maxWrite (4M)
    • maxGlobalWriteSize (400M)
    • maxWriteDelay (4s)

空闲检测 IdleStateHandler + KeepAlive

基于 IP 进行拦截 AbstractRemoteAddressFilter - IPRuleBasedFilter

  • cassandra

# References

  • Netty 源码剖析与实战 - 极客时间
Get Things Done
Built with Hugo
Theme Stack designed by Jimmy