打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Tomcat7中NIO处理分析(二)
  • 6.PollerEvent处理流程

Poller处理的核心是启动执行事件队列中的PollerEvent,接着从selector中遍历已经就绪的key,一旦发生了感兴趣的事件,则交由processSocket方法处理。PollerEvent的作用是向socket注册或更新感兴趣的事件:

Java代码  
  1. /** 
  2.  * 
  3.  * PollerEvent, cacheable object for poller events to avoid GC 
  4.  */  
  5. public static class PollerEvent implements Runnable {  
  6.   
  7. 每个PollerEvent都会保存NioChannel的引用  
  8.     protected NioChannel socket;  
  9.     protected int interestOps;  
  10.     protected KeyAttachment key;  
  11.     public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) {  
  12.         reset(ch, k, intOps);  
  13.     }  
  14.   
  15.     public void reset(NioChannel ch, KeyAttachment k, int intOps) {  
  16.         socket = ch;  
  17.         interestOps = intOps;  
  18.         key = k;  
  19.     }  
  20.   
  21.     public void reset() {  
  22.         reset(nullnull0);  
  23.     }  
  24.   
  25.     @Override  
  26.     public void run() {  
  27.         //socket第一次注册到selector中,完成对socket读事件的注册  
  28.         if ( interestOps == OP_REGISTER ) {  
  29.             try {  
  30.                 socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);  
  31.             } catch (Exception x) {  
  32.                 log.error('', x);  
  33.             }  
  34.         } else {  
  35.             // socket之前已经注册到了selector中,更新socket所感兴趣的事件  
  36.             final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());  
  37.             try {  
  38.                 boolean cancel = false;  
  39.                 if (key != null) {  
  40.                     final KeyAttachment att = (KeyAttachment) key.attachment();  
  41.                     if ( att!=null ) {  
  42.                         //handle callback flag  
  43.                         if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {  
  44.                             att.setCometNotify(true);  
  45.                         } else {  
  46.                             att.setCometNotify(false);  
  47.                         }  
  48.                         interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag  
  49.                         // 刷新事件的最后访问时间,防止事件超时   
  50.                         att.access();//to prevent timeout  
  51.                         //we are registering the key to start with, reset the fairness counter.  
  52.                         int ops = key.interestOps() | interestOps;  
  53.                         att.interestOps(ops);  
  54.                         key.interestOps(ops);  
  55.                     } else {  
  56.                         cancel = true;  
  57.                     }  
  58.                 } else {  
  59.                     cancel = true;  
  60.                 }  
  61.                 if ( cancel ) socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);  
  62.             }catch (CancelledKeyException ckx) {  
  63.                 try {  
  64.                     socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT,true);  
  65.                 }catch (Exception ignore) {}  
  66.             }  
  67.         }//end if  
  68.     }//run  
  69.   
  70.     @Override  
  71.     public String toString() {  
  72.         return super.toString() '[intOps=' this.interestOps ']';  
  73.     }  
  74. }  
/** * * PollerEvent, cacheable object for poller events to avoid GC */ public static class PollerEvent implements Runnable { // 每个PollerEvent都会保存NioChannel的引用 protected NioChannel socket; protected int interestOps; protected KeyAttachment key; public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) { reset(ch, k, intOps); } public void reset(NioChannel ch, KeyAttachment k, int intOps) { socket = ch; interestOps = intOps; key = k; } public void reset() { reset(null, null, 0); } @Override public void run() { //socket第一次注册到selector中,完成对socket读事件的注册 if ( interestOps == OP_REGISTER ) { try { socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key); } catch (Exception x) { log.error('', x); } } else { // socket之前已经注册到了selector中,更新socket所感兴趣的事件 final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { boolean cancel = false; if (key != null) { final KeyAttachment att = (KeyAttachment) key.attachment(); if ( att!=null ) { //handle callback flag if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) { att.setCometNotify(true); } else { att.setCometNotify(false); } interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag // 刷新事件的最后访问时间,防止事件超时 att.access();//to prevent timeout //we are registering the key to start with, reset the fairness counter. int ops = key.interestOps() | interestOps; att.interestOps(ops); key.interestOps(ops); } else { cancel = true; } } else { cancel = true; } if ( cancel ) socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false); }catch (CancelledKeyException ckx) { try { socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT,true); }catch (Exception ignore) {} } }//end if }//run @Override public String toString() { return super.toString() '[intOps=' this.interestOps ']'; } }

 

 

  • 7.将socket交给Worker执行

在第5步的Poller处理流程的分析中看到它的run方法最后会调用processKey()处理selector检测到的通道事件,而在这个方法最后会调用processSocket来调用具体的通道处理逻辑,看下processSocket方法的实现:

Java代码  
  1. public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {  
  2.     try {  
  3.         KeyAttachment attachment = (KeyAttachment)socket.getAttachment();  
  4.         if (attachment == null) {  
  5.             return false;  
  6.         }  
  7.         attachment.setCometNotify(false); //will get reset upon next reg  
  8.         // 从SocketProcessor的缓存队列中取出一个来处理socket  
  9.         SocketProcessor sc = processorCache.poll();  
  10.         if ( sc == null ) sc = new SocketProcessor(socket,status);  
  11.         else sc.reset(socket,status);  
  12.         // 将有事件发生的socket交给Worker处理   
  13.         if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);  
  14.         else sc.run();  
  15.     } catch (RejectedExecutionException rx) {  
  16.         log.warn('Socket processing request was rejected for:' socket,rx);  
  17.         return false;  
  18.     } catch (Throwable t) {  
  19.         ExceptionUtils.handleThrowable(t);  
  20.         // This means we got an OOM or similar creating a thread, or that  
  21.         // the pool and its queue are full  
  22.         log.error(sm.getString('endpoint.process.fail'), t);  
  23.         return false;  
  24.     }  
  25.     return true;  
  26. }  
public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) { try { KeyAttachment attachment = (KeyAttachment)socket.getAttachment(); if (attachment == null) { return false; } attachment.setCometNotify(false); //will get reset upon next reg // 从SocketProcessor的缓存队列中取出一个来处理socket SocketProcessor sc = processorCache.poll(); if ( sc == null ) sc = new SocketProcessor(socket,status); else sc.reset(socket,status); // 将有事件发生的socket交给Worker处理 if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc); else sc.run(); } catch (RejectedExecutionException rx) { log.warn('Socket processing request was rejected for:' socket,rx); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString('endpoint.process.fail'), t); return false; } return true; }

Poller通过NioEndpoint的协调,将发生事件的socket交给工作者线程Worker来进一步处理。整个事件框架的工作就到此结束,下面就是Worker的处理。

 

 

  • 8.从socket中处理请求

在Tomcat6版本的NIO处理实现中有一个Worker类,在Tomcat7中把它去掉了,但工作者的职责还在,只是交由了上面看到的SocketProcessor这个类来担当,看下这个类的实现代码:

Java代码  
  1. // ---------------------------------------------- SocketProcessor Inner Class  
  2. // 这个类相当于一个工作者,但只会在一个外部线程池中简单使用。  
  3. /** 
  4.  * This class is the equivalent of the Worker, but will simply use in an 
  5.  * external Executor thread pool. 
  6.  */  
  7. protected class SocketProcessor implements Runnable {  
  8.   
  9.     // 每个SocketProcessor保存一个NioChannel的引用  
  10.     protected NioChannel socket = null;  
  11.     protected SocketStatus status = null;  
  12.   
  13.     public SocketProcessor(NioChannel socket, SocketStatus status) {  
  14.         reset(socket,status);  
  15.     }  
  16.   
  17.     public void reset(NioChannel socket, SocketStatus status) {  
  18.         this.socket = socket;  
  19.         this.status = status;  
  20.     }  
  21.   
  22.     @Override  
  23.     public void run() {  
  24.         // 从socket中获取SelectionKey  
  25.         SelectionKey key = socket.getIOChannel().keyFor(  
  26.                 socket.getPoller().getSelector());  
  27.         KeyAttachment ka = null;  
  28.   
  29.         if (key != null) {  
  30.             ka = (KeyAttachment)key.attachment();  
  31.         }  
  32.   
  33.         // Upgraded connections need to allow multiple threads to access the  
  34.         // connection at the same time to enable blocking IO to be used when  
  35.         // NIO has been configured  
  36.         if (ka != null && ka.isUpgraded() &&  
  37.                 SocketStatus.OPEN_WRITE == status) {  
  38.             synchronized (ka.getWriteThreadLock()) {  
  39.                 doRun(key, ka);  
  40.             }  
  41.         } else {  
  42.             synchronized (socket) {  
  43.                 doRun(key, ka);  
  44.             }  
  45.         }  
  46.     }  
  47.   
  48.     private void doRun(SelectionKey key, KeyAttachment ka) {  
  49.         try {  
  50.             int handshake = -1;  
  51.   
  52.             try {  
  53.                 if (key != null) {  
  54.                     // For STOP there is no point trying to handshake as the  
  55.                     // Poller has been stopped.  
  56.                     if (socket.isHandshakeComplete() ||  
  57.                             status == SocketStatus.STOP) {  
  58.                         handshake = 0;  
  59.                     } else {  
  60.                         handshake = socket.handshake(  
  61.                                 key.isReadable(), key.isWritable());  
  62.                         // The handshake process reads/writes from/to the  
  63.                         // socket. status may therefore be OPEN_WRITE once  
  64.                         // the handshake completes. However, the handshake  
  65.                         // happens when the socket is opened so the status  
  66.                         // must always be OPEN_READ after it completes. It  
  67.                         // is OK to always set this as it is only used if  
  68.                         // the handshake completes.  
  69.                         status = SocketStatus.OPEN_READ;  
  70.                     }  
  71.                 }  
  72.             }catch ( IOException x ) {  
  73.                 handshake = -1;  
  74.                 if ( log.isDebugEnabled() ) log.debug('Error during SSL handshake',x);  
  75.             }catch ( CancelledKeyException ckx ) {  
  76.                 handshake = -1;  
  77.             }  
  78.             if ( handshake == 0 ) {  
  79.                 SocketState state = SocketState.OPEN;  
  80.                 // Process the request from this socket  
  81.                 if (status == null) {  
  82.                     // 最关键的代码,这里将KeyAttachment(实际就是socket)交给Handler处理请求  
  83.                     state = handler.process(ka, SocketStatus.OPEN_READ);  
  84.                 } else {  
  85.                     state = handler.process(ka, status);  
  86.                 }  
  87.                 if (state == SocketState.CLOSED) {  
  88.                     // Close socket and pool  
  89.                     try {  
  90.                         close(ka, socket, key, SocketStatus.ERROR);  
  91.                     } catch ( Exception x ) {  
  92.                         log.error('',x);  
  93.                     }  
  94.                 }  
  95.             } else if (handshake == -1 ) {  
  96.                 close(ka, socket, key, SocketStatus.DISCONNECT);  
  97.             } else {  
  98.                 ka.getPoller().add(socket, handshake);  
  99.             }  
  100.         } catch (CancelledKeyException cx) {  
  101.             socket.getPoller().cancelledKey(key, nullfalse);  
  102.         } catch (OutOfMemoryError oom) {  
  103.             try {  
  104.                 oomParachuteData = null;  
  105.                 log.error('', oom);  
  106.                 if (socket != null) {  
  107.                     socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false);  
  108.                 }  
  109.                 releaseCaches();  
  110.             }catch ( Throwable oomt ) {  
  111.                 try {  
  112.                     System.err.println(oomParachuteMsg);  
  113.                     oomt.printStackTrace();  
  114.                 }catch (Throwable letsHopeWeDontGetHere){  
  115.                     ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);  
  116.                 }  
  117.             }  
  118.         } catch (VirtualMachineError vme) {  
  119.             ExceptionUtils.handleThrowable(vme);  
  120.         }catch ( Throwable t ) {  
  121.             log.error('',t);  
  122.             if (socket != null) {  
  123.                 socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);  
  124.             }  
  125.         } finally {  
  126.             socket = null;  
  127.             status = null;  
  128.             //return to cache  
  129.             if (running && !paused) {  
  130.                 processorCache.offer(this);  
  131.             }  
  132.         }  
  133.     }  
  134.   
  135.     private void close(KeyAttachment ka, NioChannel socket, SelectionKey key,  
  136.             SocketStatus socketStatus) {  
  137. .  
  138.     }  
  139. }  
// ---------------------------------------------- SocketProcessor Inner Class // 这个类相当于一个工作者,但只会在一个外部线程池中简单使用。 /** * This class is the equivalent of the Worker, but will simply use in an * external Executor thread pool. */ protected class SocketProcessor implements Runnable { // 每个SocketProcessor保存一个NioChannel的引用 protected NioChannel socket = null; protected SocketStatus status = null; public SocketProcessor(NioChannel socket, SocketStatus status) { reset(socket,status); } public void reset(NioChannel socket, SocketStatus status) { this.socket = socket; this.status = status; } @Override public void run() { // 从socket中获取SelectionKey SelectionKey key = socket.getIOChannel().keyFor( socket.getPoller().getSelector()); KeyAttachment ka = null; if (key != null) { ka = (KeyAttachment)key.attachment(); } // Upgraded connections need to allow multiple threads to access the // connection at the same time to enable blocking IO to be used when // NIO has been configured if (ka != null && ka.isUpgraded() && SocketStatus.OPEN_WRITE == status) { synchronized (ka.getWriteThreadLock()) { doRun(key, ka); } } else { synchronized (socket) { doRun(key, ka); } } } private void doRun(SelectionKey key, KeyAttachment ka) { try { int handshake = -1; try { if (key != null) { // For STOP there is no point trying to handshake as the // Poller has been stopped. if (socket.isHandshakeComplete() || status == SocketStatus.STOP) { handshake = 0; } else { handshake = socket.handshake( key.isReadable(), key.isWritable()); // The handshake process reads/writes from/to the // socket. status may therefore be OPEN_WRITE once // the handshake completes. However, the handshake // happens when the socket is opened so the status // must always be OPEN_READ after it completes. It // is OK to always set this as it is only used if // the handshake completes. status = SocketStatus.OPEN_READ; } } }catch ( IOException x ) { handshake = -1; if ( log.isDebugEnabled() ) log.debug('Error during SSL handshake',x); }catch ( CancelledKeyException ckx ) { handshake = -1; } if ( handshake == 0 ) { SocketState state = SocketState.OPEN; // Process the request from this socket if (status == null) { // 最关键的代码,这里将KeyAttachment(实际就是socket)交给Handler处理请求 state = handler.process(ka, SocketStatus.OPEN_READ); } else { state = handler.process(ka, status); } if (state == SocketState.CLOSED) { // Close socket and pool try { close(ka, socket, key, SocketStatus.ERROR); } catch ( Exception x ) { log.error('',x); } } } else if (handshake == -1 ) { close(ka, socket, key, SocketStatus.DISCONNECT); } else { ka.getPoller().add(socket, handshake); } } catch (CancelledKeyException cx) { socket.getPoller().cancelledKey(key, null, false); } catch (OutOfMemoryError oom) { try { oomParachuteData = null; log.error('', oom); if (socket != null) { socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false); } releaseCaches(); }catch ( Throwable oomt ) { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } catch (VirtualMachineError vme) { ExceptionUtils.handleThrowable(vme); }catch ( Throwable t ) { log.error('',t); if (socket != null) { socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false); } } finally { socket = null; status = null; //return to cache if (running && !paused) { processorCache.offer(this); } } } private void close(KeyAttachment ka, NioChannel socket, SelectionKey key, SocketStatus socketStatus) { ... } }

可以看到由SocketProcessor寻找合适的Handler处理器做最终socket转换处理。

 

可以用下面这幅图总结一下NioEndpoint的主要流程:


Acceptor和Poller是线程数组,Worker是一个线程池(Executor)

 

 

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
谈谈 Tomcat 请求处理流程,更好的了解Tomcat原理
SocketChannel续2---各种注意点
nio框架中的多个Selector结构
编写一个简易的Java NIO Reactor库
netty源码解析1
简单聊天程序java socket
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服