- 浏览: 19679 次
- 性别:
- 来自: 杭州
文章分类
最新评论
tomcat的session复制大致分两种:all-to-all和backup,先看all-to-all,主要是记录下自己读源代码的心得和代码流程
tomcat集群配置暂时略过。
在tomcat的启动过程中,找到seesion复制的入口,在StandardContext的start()方法中:
Manager contextManager = null; if (manager == null) { if ( (getCluster() != null) && distributable) { try { contextManager = getCluster().createManager(getName()); } catch (Exception ex) { log.error("standardContext.clusterFail", ex); ok = false; } } else { contextManager = new StandardManager(); } } // Configure default manager if none was specified if (contextManager != null) { setManager(contextManager); } if (manager!=null && (getCluster() != null) && distributable) { //let the cluster know that there is a context that is distributable //and that it has its own manager getCluster().registerManager(manager); }
如果配置了tomcat的集群,那么 contextManager = getCluster().createManager(getName())就会被调用,返回的Manager的类型是DeltaManager,接着调用setManager(contextManager)方法:
public synchronized void setManager(Manager manager) { // Change components if necessary 管理session的manager Manager oldManager = this.manager; if (oldManager == manager) return; this.manager = manager; // Stop the old component if necessary if (started && (oldManager != null) && (oldManager instanceof Lifecycle)) { try { ((Lifecycle) oldManager).stop(); } catch (LifecycleException e) { log.error("ContainerBase.setManager: stop: ", e); } } // Start the new component if necessary if (manager != null) manager.setContainer(this); if (started && (manager != null) && (manager instanceof Lifecycle)) { try { ((Lifecycle) manager).start(); } catch (LifecycleException e) { log.error("ContainerBase.setManager: start: ", e); } } // Report this property change to interested listeners support.firePropertyChange("manager", oldManager, this.manager); }
看到红色部分,这里调用了DeltaManager的start()方法,也就是启动了DeltaManager,看代码:
public void start() throws LifecycleException {
if (!initialized) init();
// Validate and update our current component state
if (started) {
return;
}
started = true;
lifecycle.fireLifecycleEvent(START_EVENT, null);
// Force initialization of the random number generator
generateSessionId();
// Load unloaded sessions, if any
try {
//the channel is already running
Cluster cluster = getCluster() ;
// stop remove cluster binding
//wow, how many nested levels of if statements can we have ;)
if(cluster == null) {
Container context = getContainer() ;
if(context != null && context instanceof Context) {
Container host = context.getParent() ;
if(host != null && host instanceof Host) {
cluster = host.getCluster();
if(cluster != null && cluster instanceof CatalinaCluster) {
setCluster((CatalinaCluster) cluster) ;
} else {
Container engine = host.getParent() ;
if(engine != null && engine instanceof Engine) {
cluster = engine.getCluster();
if(cluster != null && cluster instanceof CatalinaCluster) {
setCluster((CatalinaCluster) cluster) ;
}
} else {
cluster = null ;
}
}
}
}
}
if (cluster == null) {
log.error(sm.getString("deltaManager.noCluster", getName()));
return;
} else {
if (log.isInfoEnabled()) {
String type = "unknown" ;
if( cluster.getContainer() instanceof Host){
type = "Host" ;
} else if( cluster.getContainer() instanceof Engine){
type = "Engine" ;
}
log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));
}
}
if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName()));
//to survice context reloads, as only a stop/start is called, not
// createManager
cluster.registerManager(this);
//随机找一个节点,然后复制所有的session过来
getAllClusterSessions();
} catch (Throwable t) {
log.error(sm.getString("deltaManager.managerLoad"), t);
}
}
首先,如果没有初始化,那么进行初始化,接着设置Cluster,然后调用Cluster的registerManager()方法,注册Manager,这里没有看明白,因为在StandardContext的start()方法中也有这个方法的调用,有重复的嫌疑?
最重要的是调用getAllClusterSessions()方法,这个方法的作用是,在集群中取到第一台tomcat的member,发送消息要求获得对方的所有session数据,对方收到消息,进行处理发送所有数据,这台tomcat收到数据,之后发送数据告知已经收到数据,双方通信结束。看代码:
public synchronized void getAllClusterSessions() { if (cluster != null && cluster.getMembers().length > 0) { long beforeSendTime = System.currentTimeMillis(); Member mbr = findSessionMasterMember(); if(mbr == null) { // No domain member found return; } SessionMessage msg = new SessionMessageImpl(this.getName(),SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL","GET-ALL-" + getName()); // set reference time stateTransferCreateSendTime = beforeSendTime ; // request session state counterSend_EVT_GET_ALL_SESSIONS++; stateTransfered = false ; // FIXME This send call block the deploy thread, when sender waitForAck is enabled try { synchronized(receivedMessageQueue) { receiverQueue = true ; } cluster.send(msg, mbr); if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState",getName(), mbr,getStateTransferTimeout())); // FIXME At sender ack mode this method check only the state transfer and resend is a problem! waitForSendAllSessions(beforeSendTime); } finally { synchronized(receivedMessageQueue) { for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) { SessionMessage smsg = (SessionMessage) iter.next(); if (!stateTimestampDrop) { messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null); } else { if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS && smsg.getTimestamp() >= stateTransferCreateSendTime) { // FIXME handle EVT_GET_ALL_SESSIONS later messageReceived(smsg,smsg.getAddress() != null ? (Member) smsg.getAddress() : null); } else { if (log.isWarnEnabled()) { log.warn(sm.getString("deltaManager.dropMessage",getName(), smsg.getEventTypeString(),new Date(stateTransferCreateSendTime), new Date(smsg.getTimestamp()))); } } } } receivedMessageQueue.clear(); receiverQueue = false ; } } } else { if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.noMembers", getName())); } }
首先,需要找到集群当中的一个tomcat,因为all-to-all复制中所有的tomcat都是对等的,所以第一个就可以了.
之后,生成一个EVT_GET_ALL_SESSIONS的消息,调用Manager的属性cluster的send()给集群中的tomcat发送要求得到所有session的消息,这个过程是异步的,所以在下面有waitForSendAllSessions()方法的调用,如果在60s内集群里面的tomcat没有给这个tomcat回应,那么这个tomcat就timeout,不能加入集群了。
我们先看SimpleTcpCluster的send()方法:
public void send(ClusterMessage msg, Member dest) {
try {
msg.setAddress(getLocalMember());
if (dest != null) {
if (!getLocalMember().equals(dest)) {
channel.send(new Member[] {dest}, msg,channelSendOptions);
} else
log.error("Unable to send message to local member " + msg);
} else {
if (channel.getMembers().length>0)
channel.send(channel.getMembers(),msg,channelSendOptions);
else if (log.isDebugEnabled())
log.debug("No members in cluster, ignoring message:"+msg);
}
} catch (Exception x) {
log.error("Unable to send message through cluster sender.", x);
}
}
实际调用的是GroupChannel的send()方法,另外发送数据的类别是异步的(SEND_OPTIONS_ASYNCHRONOUS)
接着调用的是ChannelInterceptor的sendMessage()方法,这是个责任链模式的应用,其实默认的ChannelInterceptor只有两个:MessageDispatch15Interceptor和TcpFailureDetector。MessageDispatch15Interceptor只是在开了另外一个线程后,在一定条件下在该线程中进行sendMessage()方法调用。TcpFailureDetector的作用是处理sendMessage()方法出异常后对应的集群中的Member。最后面调用的ChannelCoordinator的sendMessage()方法:
public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if ( destination == null ) destination = membershipService.getMembers(); clusterSender.sendMessage(msg,destination); if ( Logs.MESSAGES.isTraceEnabled() ) { Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+Arrays.toNameString(destination)); } }
其实调用的是ParallelNioSender的sendMessage()方法,最终调用的是NioSender类进行底层的处理。到这里刚刚启动的tomcat的请求已经完成,这个时候在集群的中的一台tomcat收到消息,进行相应处理,接收消息的类是NioReceiver
让我们转换思想,转换到接收方进行考虑:
NioReceiver在前面的博文中已经详细讲过,其实他就是个后台线程,在不断的跑。在有消息进来,接收到后,每个消息产生一个Runnable任务,然后交给线程池处理。看Runnable任务的NioReplicationTask的drainChannel()方法中,有一段代码:
//process the message ReceiverBase.messageDataReceived() getCallback().messageDataReceived(msgs[i]);
调用ListenCallback的方法,如下:
public void messageDataReceived(ChannelMessage data) { if ( this.listener != null ) { if ( listener.accept(data) ) listener.messageReceived(data); } }
这里的listener的实现类是:ChannelCoordinator,看代码是调用的父类的ChannelInterceptorBase的方法:
public void messageReceived(ChannelMessage msg) { if (getPrevious() != null) getPrevious().messageReceived(msg); }
这里的ChannelInterceptor还是前面说到的那两个,无关紧要,最终调用了GroupChannel的messageReceived()方法,具体的下一篇文章再说
发表评论
-
tomcat StandardSession
2011-05-11 16:33 3212Tomcat中Session的实现还是比较清晰的,先看类图 ... -
tomcat session复制(二)
2011-05-07 18:02 1707上文http://nod0620.iteye.c ... -
tomcat NioSender
2011-05-02 01:58 1245上次说了NioReceiver,这次看看NioSe ... -
NIO基础
2011-04-29 16:48 5576tomcat集群的时候,在心跳通讯的时候,默认的接 ... -
tomcat的Http11Protocol,Http11Processor
2011-04-11 21:42 2078上文说到在JIoEndpoint类中处理请求最终是调用到内 ... -
tomcat 6的JIoEndpoint
2011-04-11 20:11 2679先上个图先,一个只有我自己能看懂的url时序图. 这个基本上 ...
相关推荐
傻瓜式的描述怎么实现在一台windows机器上怎么实现Apache2.2+Tomcat6.0的负载均衡和session复制,不用动脑子的,因为我自己在配置过程中由于某些设置有问题,死活不成功,被折磨了很久,所以把分析过程写下来,如果...
tomcat5 session 复制
而实际情况下,采取Apache 加Tomcat进行负载均衡集群的时候,是可以不用将Session复制到所有的节点里, 比如有六个Tomcat实例 Tomcat1,Tomcat2,Tomcat3,Tomcat4,Tomcat5,Tomcat6 是可以配置成 三组互相复制...
tomcat集群实现session复制tomcat集群实现session复制tomcat集群实现session复制tomcat集群实现session复制tomcat集群实现session复制tomcat集群实现session复制tomcat集群实现session复制tomcat集群实现session复制...
NULL 博文链接:https://xueweiabcok.iteye.com/blog/1841448
linux 下实现apache+tomcat集群及session复制
nginx+tomcat8+memcached session共享所需jar包 直接放到tomcat/lib下即可
tomcat 做session共享所需jar包压缩包 具体如何做见我的博客。 msm开源项目所需包
Nginx Tomcat 集群的Session 复制,解决了,集群情况下的session复制问题。
Apache,tomcat负载均衡和session复制
Nginx+tomcat负载均衡集群session复制 windos
apache-tomcat-6.0.35.tar.gz、tomcat-native-1.1.20-src.tar.gz apr-1.4.6.tar.gz、apr-iconv-1.2.1.tar.gz、apr-util-1.4.1.tar.gz jdk-1_5_0_22-linux-i586.bin #mkdir –p /usr/local/soft #将以上软件上传到...
此压缩包为tomcat8利用redis实现session共享所需要的jar包,包含(commons-pool2-2.6.0.jar、jedis-2.9.0.jar、tomcat-redis-session-manager.jar)直接将三个jar包复制Tomcat目录lib下面,在修改conf下context.xml...
Tomcat5集群中的Session复制
NULL 博文链接:https://mushme.iteye.com/blog/1175228
公司花钱买的apache + tomcat 集群+session复制解决方案。 感觉对一些网站建设比较有用,但不太符合我们的产品
tomcat7整合session所需的jar包,使用redis同步session信息
此压缩包为tomcat7利用memcache实现session共享所需要的jar包打包,直接将所有jar包复制到系统相应的目录就可以了,亲测jar包齐全可以成功搭建
tomcat8基于redis的session 的相关jar包;tomcat8-redis-session-manager-2.0.0.jar
Tomcat Session共享方案使用Tomcat内置的Session复制方案 项目web.xml文件中添加节点2.使用Redis实现session共享to