Poller处理的核心是启动执行事件队列中的PollerEvent,接着从selector中遍历已经就绪的key,一旦发生了感兴趣的事件,则交由processSocket方法处理。PollerEvent的作用是向socket注册或更新感兴趣的事件:
- /**
- *
- * PollerEvent, cacheable object for poller events to avoid GC
- */
- public static class PollerEvent implements Runnable {
-
- 每个PollerEvent都会保存NioChannel的引用
- protected NioChannel socket;
- protected int interestOps;
- protected KeyAttachment key;
- public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) {
- reset(ch, k, intOps);
- }
-
- public void reset(NioChannel ch, KeyAttachment k, int intOps) {
- socket = ch;
- interestOps = intOps;
- key = k;
- }
-
- public void reset() {
- reset(null, null, 0);
- }
-
- @Override
- public void run() {
- //socket第一次注册到selector中,完成对socket读事件的注册
- if ( interestOps == OP_REGISTER ) {
- try {
- socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
- } catch (Exception x) {
- log.error('', x);
- }
- } else {
- // socket之前已经注册到了selector中,更新socket所感兴趣的事件
- final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
- try {
- boolean cancel = false;
- if (key != null) {
- final KeyAttachment att = (KeyAttachment) key.attachment();
- if ( att!=null ) {
- //handle callback flag
- if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {
- att.setCometNotify(true);
- } else {
- att.setCometNotify(false);
- }
- interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag
- // 刷新事件的最后访问时间,防止事件超时
- att.access();//to prevent timeout
- //we are registering the key to start with, reset the fairness counter.
- int ops = key.interestOps() | interestOps;
- att.interestOps(ops);
- key.interestOps(ops);
- } else {
- cancel = true;
- }
- } else {
- cancel = true;
- }
- if ( cancel ) socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
- }catch (CancelledKeyException ckx) {
- try {
- socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT,true);
- }catch (Exception ignore) {}
- }
- }//end if
- }//run
-
- @Override
- public String toString() {
- return super.toString() '[intOps=' this.interestOps ']';
- }
- }
/** * * PollerEvent, cacheable object for poller events to avoid GC */ public static class PollerEvent implements Runnable { // 每个PollerEvent都会保存NioChannel的引用 protected NioChannel socket; protected int interestOps; protected KeyAttachment key; public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) { reset(ch, k, intOps); } public void reset(NioChannel ch, KeyAttachment k, int intOps) { socket = ch; interestOps = intOps; key = k; } public void reset() { reset(null, null, 0); } @Override public void run() { //socket第一次注册到selector中,完成对socket读事件的注册 if ( interestOps == OP_REGISTER ) { try { socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key); } catch (Exception x) { log.error('', x); } } else { // socket之前已经注册到了selector中,更新socket所感兴趣的事件 final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { boolean cancel = false; if (key != null) { final KeyAttachment att = (KeyAttachment) key.attachment(); if ( att!=null ) { //handle callback flag if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) { att.setCometNotify(true); } else { att.setCometNotify(false); } interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag // 刷新事件的最后访问时间,防止事件超时 att.access();//to prevent timeout //we are registering the key to start with, reset the fairness counter. int ops = key.interestOps() | interestOps; att.interestOps(ops); key.interestOps(ops); } else { cancel = true; } } else { cancel = true; } if ( cancel ) socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false); }catch (CancelledKeyException ckx) { try { socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT,true); }catch (Exception ignore) {} } }//end if }//run @Override public String toString() { return super.toString() '[intOps=' this.interestOps ']'; } }
在第5步的Poller处理流程的分析中看到它的run方法最后会调用processKey()处理selector检测到的通道事件,而在这个方法最后会调用processSocket来调用具体的通道处理逻辑,看下processSocket方法的实现:
- public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
- try {
- KeyAttachment attachment = (KeyAttachment)socket.getAttachment();
- if (attachment == null) {
- return false;
- }
- attachment.setCometNotify(false); //will get reset upon next reg
- // 从SocketProcessor的缓存队列中取出一个来处理socket
- SocketProcessor sc = processorCache.poll();
- if ( sc == null ) sc = new SocketProcessor(socket,status);
- else sc.reset(socket,status);
- // 将有事件发生的socket交给Worker处理
- if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
- else sc.run();
- } catch (RejectedExecutionException rx) {
- log.warn('Socket processing request was rejected for:' socket,rx);
- return false;
- } catch (Throwable t) {
- ExceptionUtils.handleThrowable(t);
- // This means we got an OOM or similar creating a thread, or that
- // the pool and its queue are full
- log.error(sm.getString('endpoint.process.fail'), t);
- return false;
- }
- return true;
- }
public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) { try { KeyAttachment attachment = (KeyAttachment)socket.getAttachment(); if (attachment == null) { return false; } attachment.setCometNotify(false); //will get reset upon next reg // 从SocketProcessor的缓存队列中取出一个来处理socket SocketProcessor sc = processorCache.poll(); if ( sc == null ) sc = new SocketProcessor(socket,status); else sc.reset(socket,status); // 将有事件发生的socket交给Worker处理 if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc); else sc.run(); } catch (RejectedExecutionException rx) { log.warn('Socket processing request was rejected for:' socket,rx); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString('endpoint.process.fail'), t); return false; } return true; }
Poller通过NioEndpoint的协调,将发生事件的socket交给工作者线程Worker来进一步处理。整个事件框架的工作就到此结束,下面就是Worker的处理。
在Tomcat6版本的NIO处理实现中有一个Worker类,在Tomcat7中把它去掉了,但工作者的职责还在,只是交由了上面看到的SocketProcessor这个类来担当,看下这个类的实现代码:
- // ---------------------------------------------- SocketProcessor Inner Class
- // 这个类相当于一个工作者,但只会在一个外部线程池中简单使用。
- /**
- * This class is the equivalent of the Worker, but will simply use in an
- * external Executor thread pool.
- */
- protected class SocketProcessor implements Runnable {
-
- // 每个SocketProcessor保存一个NioChannel的引用
- protected NioChannel socket = null;
- protected SocketStatus status = null;
-
- public SocketProcessor(NioChannel socket, SocketStatus status) {
- reset(socket,status);
- }
-
- public void reset(NioChannel socket, SocketStatus status) {
- this.socket = socket;
- this.status = status;
- }
-
- @Override
- public void run() {
- // 从socket中获取SelectionKey
- SelectionKey key = socket.getIOChannel().keyFor(
- socket.getPoller().getSelector());
- KeyAttachment ka = null;
-
- if (key != null) {
- ka = (KeyAttachment)key.attachment();
- }
-
- // Upgraded connections need to allow multiple threads to access the
- // connection at the same time to enable blocking IO to be used when
- // NIO has been configured
- if (ka != null && ka.isUpgraded() &&
- SocketStatus.OPEN_WRITE == status) {
- synchronized (ka.getWriteThreadLock()) {
- doRun(key, ka);
- }
- } else {
- synchronized (socket) {
- doRun(key, ka);
- }
- }
- }
-
- private void doRun(SelectionKey key, KeyAttachment ka) {
- try {
- int handshake = -1;
-
- try {
- if (key != null) {
- // For STOP there is no point trying to handshake as the
- // Poller has been stopped.
- if (socket.isHandshakeComplete() ||
- status == SocketStatus.STOP) {
- handshake = 0;
- } else {
- handshake = socket.handshake(
- key.isReadable(), key.isWritable());
- // The handshake process reads/writes from/to the
- // socket. status may therefore be OPEN_WRITE once
- // the handshake completes. However, the handshake
- // happens when the socket is opened so the status
- // must always be OPEN_READ after it completes. It
- // is OK to always set this as it is only used if
- // the handshake completes.
- status = SocketStatus.OPEN_READ;
- }
- }
- }catch ( IOException x ) {
- handshake = -1;
- if ( log.isDebugEnabled() ) log.debug('Error during SSL handshake',x);
- }catch ( CancelledKeyException ckx ) {
- handshake = -1;
- }
- if ( handshake == 0 ) {
- SocketState state = SocketState.OPEN;
- // Process the request from this socket
- if (status == null) {
- // 最关键的代码,这里将KeyAttachment(实际就是socket)交给Handler处理请求
- state = handler.process(ka, SocketStatus.OPEN_READ);
- } else {
- state = handler.process(ka, status);
- }
- if (state == SocketState.CLOSED) {
- // Close socket and pool
- try {
- close(ka, socket, key, SocketStatus.ERROR);
- } catch ( Exception x ) {
- log.error('',x);
- }
- }
- } else if (handshake == -1 ) {
- close(ka, socket, key, SocketStatus.DISCONNECT);
- } else {
- ka.getPoller().add(socket, handshake);
- }
- } catch (CancelledKeyException cx) {
- socket.getPoller().cancelledKey(key, null, false);
- } catch (OutOfMemoryError oom) {
- try {
- oomParachuteData = null;
- log.error('', oom);
- if (socket != null) {
- socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false);
- }
- releaseCaches();
- }catch ( Throwable oomt ) {
- try {
- System.err.println(oomParachuteMsg);
- oomt.printStackTrace();
- }catch (Throwable letsHopeWeDontGetHere){
- ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
- }
- }
- } catch (VirtualMachineError vme) {
- ExceptionUtils.handleThrowable(vme);
- }catch ( Throwable t ) {
- log.error('',t);
- if (socket != null) {
- socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
- }
- } finally {
- socket = null;
- status = null;
- //return to cache
- if (running && !paused) {
- processorCache.offer(this);
- }
- }
- }
-
- private void close(KeyAttachment ka, NioChannel socket, SelectionKey key,
- SocketStatus socketStatus) {
- .
- }
- }
// ---------------------------------------------- SocketProcessor Inner Class // 这个类相当于一个工作者,但只会在一个外部线程池中简单使用。 /** * This class is the equivalent of the Worker, but will simply use in an * external Executor thread pool. */ protected class SocketProcessor implements Runnable { // 每个SocketProcessor保存一个NioChannel的引用 protected NioChannel socket = null; protected SocketStatus status = null; public SocketProcessor(NioChannel socket, SocketStatus status) { reset(socket,status); } public void reset(NioChannel socket, SocketStatus status) { this.socket = socket; this.status = status; } @Override public void run() { // 从socket中获取SelectionKey SelectionKey key = socket.getIOChannel().keyFor( socket.getPoller().getSelector()); KeyAttachment ka = null; if (key != null) { ka = (KeyAttachment)key.attachment(); } // Upgraded connections need to allow multiple threads to access the // connection at the same time to enable blocking IO to be used when // NIO has been configured if (ka != null && ka.isUpgraded() && SocketStatus.OPEN_WRITE == status) { synchronized (ka.getWriteThreadLock()) { doRun(key, ka); } } else { synchronized (socket) { doRun(key, ka); } } } private void doRun(SelectionKey key, KeyAttachment ka) { try { int handshake = -1; try { if (key != null) { // For STOP there is no point trying to handshake as the // Poller has been stopped. if (socket.isHandshakeComplete() || status == SocketStatus.STOP) { handshake = 0; } else { handshake = socket.handshake( key.isReadable(), key.isWritable()); // The handshake process reads/writes from/to the // socket. status may therefore be OPEN_WRITE once // the handshake completes. However, the handshake // happens when the socket is opened so the status // must always be OPEN_READ after it completes. It // is OK to always set this as it is only used if // the handshake completes. status = SocketStatus.OPEN_READ; } } }catch ( IOException x ) { handshake = -1; if ( log.isDebugEnabled() ) log.debug('Error during SSL handshake',x); }catch ( CancelledKeyException ckx ) { handshake = -1; } if ( handshake == 0 ) { SocketState state = SocketState.OPEN; // Process the request from this socket if (status == null) { // 最关键的代码,这里将KeyAttachment(实际就是socket)交给Handler处理请求 state = handler.process(ka, SocketStatus.OPEN_READ); } else { state = handler.process(ka, status); } if (state == SocketState.CLOSED) { // Close socket and pool try { close(ka, socket, key, SocketStatus.ERROR); } catch ( Exception x ) { log.error('',x); } } } else if (handshake == -1 ) { close(ka, socket, key, SocketStatus.DISCONNECT); } else { ka.getPoller().add(socket, handshake); } } catch (CancelledKeyException cx) { socket.getPoller().cancelledKey(key, null, false); } catch (OutOfMemoryError oom) { try { oomParachuteData = null; log.error('', oom); if (socket != null) { socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false); } releaseCaches(); }catch ( Throwable oomt ) { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } catch (VirtualMachineError vme) { ExceptionUtils.handleThrowable(vme); }catch ( Throwable t ) { log.error('',t); if (socket != null) { socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false); } } finally { socket = null; status = null; //return to cache if (running && !paused) { processorCache.offer(this); } } } private void close(KeyAttachment ka, NioChannel socket, SelectionKey key, SocketStatus socketStatus) { ... } }
可以看到由SocketProcessor寻找合适的Handler处理器做最终socket转换处理。
可以用下面这幅图总结一下NioEndpoint的主要流程:
Acceptor和Poller是线程数组,Worker是一个线程池(Executor)
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请
点击举报。