概述
又花了大概三天时间才理解 Reactor模型的整体脉络,学习他妈的太难了.
EventLoopGroup 和 EventLoop的关系
整理了一下EventLoop和EventLoopGroup的继承关系,和一些关键的方法和对象 需要注意几个关键信息
- NioEventLoopGroup 里面有一个 Childern数组,里面存储的都是 NioEventLoop
- NioEventLoopGroup里面的submit,execute等线程池方法,其实都是找Childern里面挑一个 EventLoop来执行
3. NioEventLoop是一个单线程的执行器. 持有一个 Selector
,线程就遍历 Selector,处理这个Selector上的事件,然后 再看看任务队列里面有没有其他任务需要执行
,例如新的Channel注册。
上图就是下图的代码实际情况
源码阅读
这次的源码分析主要就是根据上面这个图来寻找到对应的源码
EventLoopGroup
研究EventLoopGroup主要从几个方面入手
- Loop数组创建的过程
- 任务提交到group之后是如何分配给内部的loop的
Loop数组创建的过程
上面说到EventLoopGroup里面装了很多的EventLoop我们来看EventLoopGroup构建时候的代码 在NioEventLoopGroup的类中实现了newChild,所以数组中存放的都是NioEventLoop
任务提交到group之后是如何分配给内部的loop的
当我们找线程池提交任务的时候,会调用 execute方法,我们来看看EventLoopGroup的源码调用
@Override
public void execute(Runnable command) {
next().execute(command);
}
调用了一个抽象方法然后执行execute,这个抽象方法的实现在 MultithreadEventexecutorGroup
中
@Override
public EventExecutor next() {
return chooser.next();
}
这个chooser在构造器中已经初始化完毕,里面存有整个EventLoop数组
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
# 判断数组长度是不是2的倍数,如果是的话用另一个choose
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
# 这个位运算就是判断是不是2的幂数
return (val & -val) == val;
}
# 其实都是轮寻数组,只是增加效率
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
EventLoopGroup总结
所以我们知道了EventLoopGroup其实就是装了一堆的EventLoopGroup,然后别人调用线程池的方法,就在数组里面找一个EventLoop丢给他执行。所以接下来逻辑的重点就是EventLoop
EventLoop
EventLoop这次为了搞懂的还是以上面的架构图为目的,涉及到的其他细节会在其他章节说,比如Channel,Promise等。
- EventLoop监听事件的处理事件的代码逻辑
- BossGroup将产生的新的连接注册到WorkerGroup的流程是什么
EventLoop监听事件的处理事件的代码逻辑
在开始之前我们先来看看创建EventLoop的时候都做了哪些事情,回想一下EventLoop创建是NioEventLoopGroup构造方法的的newChild方法实现的
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
这里可以看出存放了一个provider
和一个selector
,然后调用了父类的构造方法
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}
protected Queue newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue(maxPendingTasks);
}
父类创建了一个 LinkedBlockingQueue
,然后又调用了父类的构造器
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
protected Queue newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue(maxPendingTasks);
}
可以看到父类存放了一个 ThreadPerTaskExecutor
的 executor,然后又保存了一个 LinkedBlockingQueue
. 再调父类就是保存一下自己属于哪个 EventLoopGroup了。
至此,我们知道了一个NioEventLoop里面有什么东西
想要知道处理事件通知的逻辑,我们先从 execute
方法入手,在SingleThreadEventExecutor类中实现了execute方法
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
# 判断是不是EventLoop线程调用的该方法
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
# 是的话就把任务添加到队列中
addTask(task);
} else {
# 否则就开启一个EventLoop线程
startThread();
# 还是放到队列
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
# 添加任务到队列的方法
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
上面的逻辑并不难,就是判断调用方法的是不是 executor线程里面来的,不是的话就 尝试去开启线程池
,我们看看这个尝试开启线程池的逻辑是如何
private void startThread() {
# 注意这里,如果线程池的状态是没有开启过才去开启,否则不会开启,所以这是一个单线程的线程池
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}
# 我删除了大量和本次逻辑无关的语句
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
# 保存了线程池里面的线程
thread = Thread.currentThread();
# 执行抽象方法run
SingleThreadEventExecutor.this.run();
}
});
}
从上面的代码可以看出, SingleThreadEventExecutor保持单线程的逻辑就是,来任务就放到队列里面去,如果线程池状态是未开启的话,开启。所以executor的execute方法只能被执行一次,也就是单线程。那么我们的逻辑就来到了这个抽象的run方法里面,run方法就是我一开始截图的那个,我们现在来仔细看看其中逻辑.
@Override
protected void run() {
# 上来就是一个死循环,整个 EventLoop的线程就卡在这里了。 这也为什么说是一个 loop
for (;;) {
try {
# 这里是判断 EventLoop的两个blockingqueue有没有任务,如果有任务的话优先处理任务
# 如果没有任务则去Select
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
# 队列中没有任务就去调用 NIO 的 Select方法里面也是一个死循环,除非有任务打断
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
# 处理 selector产生的 selector的selectorKey
processSelectedKeys();
} finally {
// Ensure we always run tasks.
# 执行taskQueue里面的任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
当跳出 select(wakenUp.getAndSet(false));
循环的时候,要么表明有 taskQueue任务,要么表明 selector接受到数据了,代码中都处理了一遍。先看对 Selector的处理
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
processSelectedKey(k, (AbstractNioChannel) a);
}
}
遍历这次selector产生的key。调用 processSelectedKey
方法,在这个方法里对事件进行分发
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
# 如果是客户端 connect到服务端会产生 OP_CONNECT 事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
# 写事件的注册一般都没有,只要内核中有缓冲区就会马上响应写事件,一般不会注册
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
# 读事件和接受客户端事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
通过上面的源码可以知道 OP_CONNECT
是客户端连接产生的事件,而 OP_WRITE
又几乎不会用到,所以重点就关注 OP_READ
读取事件和 OP_ACCEPT
连接事件,都走 unsafe.read方法
BossGroup将产生的新的连接注册到WorkerGroup的流程是什么
上面的分析停在了 unsafe.read方法上,在这个方法就是处理读取事件和客户端连接事件,也就来到了我们第二个问题,Netty是如何把一个客户端连接从BossGroup丢到WorkerGroup中处理的。先从NIO的角度分析一下事情之后的流程
- BossGroup的NioEventLoop accept到了一个 Channel,这个Channel是客户端的。
- BossGroup把这个Channel注册到WorkerGroup里面的EventLoop的 Selector中。
- WorkerGroup中的EventLoop就开始继续遍历自己的Selector 因为这个流程中还涉及到了大量的Channel和Pileline的操作,先略过不看,直接看结果,梳理总体的流程
unsafe.read已经把客户端的Channel读取到了,在 readBuf里面
读取到的数据会来到 ServerBootStrap
的 channelRead方法中,这里在启动的时候绑定了 Boos和Worker两个group
而我们知道这个childGroup就是workergroup
之后调用 next的register,我们知道是从自己的数组里面挑一个注册,所以方法来到了 SingleThreadEventLoop中的register。并且还包装了一个 Promise
这一步就是把Channel注册到EventLoop中, 注意 EventLoop里面是有一个Selector的,所以之后就是要找到NIO方法中的 selector.register
方法来到了 AbstractChannel中,注意Netty的很多方法都会判断 inEventLoop中,其实不用管,只要知道最终都会丢到这个Loop的队列,等这个Loop自己拉任务来执行就好了
接下来看register0里面做了什么
这个doRegister就是让不同的Channel去实现自定义的注册方式,我们直接AbstractNioChannel的
到这里已经很明了了,完成了boss分配channel到worker的selector,但是还有一点就是这个注册的监听事件是0.因为JDK有一个bug,Netty作者的解释。如果注册0成功的话,会再注册一次监听事件,在 AbstractNioChannel的 doBeginRead方法,打一个断点就可以知道
总结
至此整个流程梳理完毕,再回顾一下
- BossGroup启动EventLoop,这是一个单线程一直循环监听 selector产生的事件。在 NioEventLoop的run方法中
- 监听到事件后在 NioEventLoop 的 processSelectedKey 方法中处理事件。发现了注册事件,所以到了 ServerBootStrap的 channelRead方法,
这里完成了 boss传递channel到worker的步骤
- worker拿到channel后把 channel注册到自己的
selector
上,然后继续遍历自己的selector。完成事件监听