- 浏览: 19707 次
- 性别:
- 来自: 杭州
文章分类
最新评论
上文http://nod0620.iteye.com/admin/blogs/1030398 写了session复制的发送部分,继续接收部分:
当接收方tomcat接收到需要session的消息时,最终调用了GroupChannel的messageReceived()方法
public void messageReceived(ChannelMessage msg) { if ( msg == null ) return; try { if ( Logs.MESSAGES.isTraceEnabled() ) { Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " from "+msg.getAddress().getName()); } Serializable fwd = null; if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) { fwd = new ByteMessage(msg.getMessage().getBytes()); } else { try { fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0, msg.getMessage().getLength()); }catch (Exception sx) { log.error("Unable to deserialize message:"+msg,sx); return; } } if ( Logs.MESSAGES.isTraceEnabled() ) { Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " +fwd); } //get the actual member with the correct alive time Member source = msg.getAddress(); boolean rx = false; boolean delivered = false; for ( int i=0; i<channelListeners.size(); i++ ) { ChannelListener channelListener = (ChannelListener)channelListeners.get(i); if (channelListener != null && channelListener.accept(fwd, source)) { channelListener.messageReceived(fwd, source); delivered = true; //if the message was accepted by an RPC channel, that channel //is responsible for returning the reply, otherwise we send an absence reply if ( channelListener instanceof RpcChannel ) rx = true; } }//for if ((!rx) && (fwd instanceof RpcMessage)) { //if we have a message that requires a response, //but none was given, send back an immediate one sendNoRpcChannelReply((RpcMessage)fwd,source); } if ( Logs.MESSAGES.isTraceEnabled() ) { Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId())); } } catch ( Exception x ) { //this could be the channel listener throwing an exception, we should log it //as a warning. if ( log.isWarnEnabled() ) log.warn("Error receiving message:",x); throw new RemoteProcessException("Exception:"+x.getMessage(),x); } }
可以看到,这里主要是调用ChannelListener的messageReceived方法,显然这个实现类就是SimpleTcpCluster:
public void messageReceived(Serializable message, Member sender) { ClusterMessage fwd = (ClusterMessage)message; fwd.setAddress(sender); messageReceived(fwd); } public void messageReceived(ClusterMessage message) { long start = 0; if (log.isDebugEnabled() && message != null) log.debug("Assuming clocks are synched: Replication for " + message.getUniqueId() + " took=" + (System.currentTimeMillis() - (message).getTimestamp()) + " ms."); //invoke all the listeners boolean accepted = false; if (message != null) { for (Iterator iter = clusterListeners.iterator(); iter.hasNext();) { ClusterListener listener = (ClusterListener) iter.next(); if (listener.accept(message)) { accepted = true; listener.messageReceived(message); } } } if (!accepted && log.isDebugEnabled()) { if (notifyLifecycleListenerOnFailure) { Member dest = message.getAddress(); // Notify our interested LifecycleListeners lifecycle.fireLifecycleEvent(RECEIVE_MESSAGE_FAILURE_EVENT, new SendMessageData(message, dest, null)); } log.debug("Message " + message.toString() + " from type " + message.getClass().getName() + " transfered but no listener registered"); } return; }
其实这里调用的是ClusterListener的messageReceived()方法,默认的两个ClusterListener是ClusterSessionListener和JvmRouteSessionIDBinderListener,JvmRouteSessionIDBinderListener只有在发送的消息是关于session id改变的消息时才起作用,重点看ClusterSessionListener.messageReceived()方法,其实是调用到了DeltaManager的messageDataReceived()方法:
public void messageDataReceived(ClusterMessage cmsg) { if (cmsg != null && cmsg instanceof SessionMessage) { SessionMessage msg = (SessionMessage) cmsg; switch (msg.getEventType()) { case SessionMessage.EVT_GET_ALL_SESSIONS: case SessionMessage.EVT_SESSION_CREATED: case SessionMessage.EVT_SESSION_EXPIRED: case SessionMessage.EVT_SESSION_ACCESSED: case SessionMessage.EVT_SESSION_DELTA: case SessionMessage.EVT_CHANGE_SESSION_ID: { synchronized(receivedMessageQueue) { if(receiverQueue) { receivedMessageQueue.add(msg); return ; } } break; } default: { //we didn't queue, do nothing break; } } //switch messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null); } }
这里的事件是EVT_GET_ALL_SESSIONS,在messageReceived()方法中有这个分支的处理代码,最后面调用
handleGET_ALL_SESSIONS()方法:
protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) throws IOException { counterReceive_EVT_GET_ALL_SESSIONS++; //get a list of all the session from this manager if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName())); // Write the number of active sessions, followed by the details // get all sessions and serialize without sync Session[] currentSessions = findSessions(); long findSessionTimestamp = System.currentTimeMillis() ; if (isSendAllSessions()) { sendSessions(sender, currentSessions, findSessionTimestamp); } else { // send session at blocks for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) { int len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize(); Session[] sendSessions = new Session[len]; System.arraycopy(currentSessions, i, sendSessions, 0, len); sendSessions(sender, sendSessions,findSessionTimestamp); if (getSendAllSessionsWaitTime() > 0) { try { Thread.sleep(getSendAllSessionsWaitTime()); } catch (Exception sleep) { } }//end if }//for }//end if SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName()); newmsg.setTimestamp(findSessionTimestamp); if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName())); counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++; cluster.send(newmsg, sender); }
主要是先找到所有的session,然后发送所有的session,然后发送一个标志结束的消息表示发送完成。这样第一次tomcat的启动加入集群是拿所用的session就完成了,主要的流程是这样的:
发送方:
StandardContext.start()--->SimpleTcpCluster.createManager()--->StandardContext.setManager()--->
DeltaManager.start()--->DeltaManager.getAllClusterSessions()--->SimpleTcpCluster.send()
---->GroupChannel.send()---->ParallelNioSender.sendMessage()--->NioSender.process()
接收方:
NioReceiver.listen()--->NioReceiver.readDataFromSocket()---->NioReplicationTask.drainChannel()--->
ListenCallback.messageDataReceived()---->ChannelCoordinator.messageReceived()--->
GroupChannel.messageReceived()--->SimpleTcpCluster.messageReceived()--->
DeltaManager.messageDataReceived()---->DeltaManager.handleGET_ALL_SESSIONS()
接收方准备好数据又要发送给发送方,简单来说通信是这样:
发送方发送请求--->接收方接受请求,发送数据,一个是session的数据,一个是session发送完毕的数据--->
发送方收到接收方过来的两种数据
发表评论
-
tomcat StandardSession
2011-05-11 16:33 3218Tomcat中Session的实现还是比较清晰的,先看类图 ... -
tomcat session复制(一)
2011-05-05 16:32 2690tomcat的session复制大致分两种:all-to-a ... -
tomcat NioSender
2011-05-02 01:58 1250上次说了NioReceiver,这次看看NioSe ... -
NIO基础
2011-04-29 16:48 5579tomcat集群的时候,在心跳通讯的时候,默认的接 ... -
tomcat的Http11Protocol,Http11Processor
2011-04-11 21:42 2085上文说到在JIoEndpoint类中处理请求最终是调用到内 ... -
tomcat 6的JIoEndpoint
2011-04-11 20:11 2680先上个图先,一个只有我自己能看懂的url时序图. 这个基本上 ...
相关推荐
tomcat5 session 复制
傻瓜式的描述怎么实现在一台windows机器上怎么实现Apache2.2+Tomcat6.0的负载均衡和session复制,不用动脑子的,因为我自己在配置过程中由于某些设置有问题,死活不成功,被折磨了很久,所以把分析过程写下来,如果...
而实际情况下,采取Apache 加Tomcat进行负载均衡集群的时候,是可以不用将Session复制到所有的节点里, 比如有六个Tomcat实例 Tomcat1,Tomcat2,Tomcat3,Tomcat4,Tomcat5,Tomcat6 是可以配置成 三组互相复制...
tomcat集群实现session复制tomcat集群实现session复制tomcat集群实现session复制tomcat集群实现session复制tomcat集群实现session复制tomcat集群实现session复制tomcat集群实现session复制tomcat集群实现session复制...
NULL 博文链接:https://xueweiabcok.iteye.com/blog/1841448
linux 下实现apache+tomcat集群及session复制
nginx+tomcat8+memcached session共享所需jar包 直接放到tomcat/lib下即可
tomcat 做session共享所需jar包压缩包 具体如何做见我的博客。 msm开源项目所需包
Nginx Tomcat 集群的Session 复制,解决了,集群情况下的session复制问题。
Apache,tomcat负载均衡和session复制
Nginx+tomcat负载均衡集群session复制 windos
apache-tomcat-6.0.35.tar.gz、tomcat-native-1.1.20-src.tar.gz apr-1.4.6.tar.gz、apr-iconv-1.2.1.tar.gz、apr-util-1.4.1.tar.gz jdk-1_5_0_22-linux-i586.bin #mkdir –p /usr/local/soft #将以上软件上传到...
此压缩包为tomcat8利用redis实现session共享所需要的jar包,包含(commons-pool2-2.6.0.jar、jedis-2.9.0.jar、tomcat-redis-session-manager.jar)直接将三个jar包复制Tomcat目录lib下面,在修改conf下context.xml...
Tomcat5集群中的Session复制
NULL 博文链接:https://mushme.iteye.com/blog/1175228
公司花钱买的apache + tomcat 集群+session复制解决方案。 感觉对一些网站建设比较有用,但不太符合我们的产品
tomcat7整合session所需的jar包,使用redis同步session信息
此压缩包为tomcat7利用memcache实现session共享所需要的jar包打包,直接将所有jar包复制到系统相应的目录就可以了,亲测jar包齐全可以成功搭建
tomcat8基于redis的session 的相关jar包;tomcat8-redis-session-manager-2.0.0.jar
Tomcat Session共享方案使用Tomcat内置的Session复制方案 项目web.xml文件中添加节点2.使用Redis实现session共享to