leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer。它规定了请求到达leader时需要经历的路径:
PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor
具体情况可以参看代码:
@Override protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); commitProcessor.start(); ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); proposalProcessor.initialize(); prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); prepRequestProcessor.start(); firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor); setupContainerManager(); }
让我们一步步分析这些RP都做了什么工作?其中PrepRequestProcessor、 FinalRequestProcessor已经在上篇文章中做了分析:
那我们就开始余下的RP吧
1. ProposalRequestProcessor
这个RP仅仅将请求转发到AckRequestProcessor和SyncRequestProcessor上,看具体代码:
public void processRequest(Request request) throws RequestProcessorException { // LOG.warn("Ack>>> cxid = " + request.cxid + " type = " + // request.type + " id = " + request.sessionId); // request.addRQRec(">prop"); /* In the following IF-THEN-ELSE block, we process syncs on the leader. * If the sync is coming from a follower, then the follower * handler adds it to syncHandler. Otherwise, if it is a client of * the leader that issued the sync command, then syncHandler won't * contain the handler. In this case, we add it to syncHandler, and * call processRequest on the next processor. */ if (request instanceof LearnerSyncRequest){ zks.getLeader().processSync((LearnerSyncRequest)request); } else { nextProcessor.processRequest(request); if (request.getHdr() != null) { // We need to sync and get consensus on any transactions try { zks.getLeader().propose(request); } catch (XidRolloverException e) { throw new RequestProcessorException(e.getMessage(), e); } syncProcessor.processRequest(request); } } }
SyncRequestProcessor 我们已经在上文中进行了分析,这里就不在赘述了,那就看看AckRequestProcessor的工作是什么吧?
AckRequestProcessor仅仅将发送过来的请求作为ACk转发给leader。代码见明细:
/** * Forward the request as an ACK to the leader */ public void processRequest(Request request) { QuorumPeer self = leader.self; if(self != null) leader.processAck(self.getId(), request.zxid, null); else LOG.error("Null QuorumPeer"); }
leader处理请求如下所示:
/** * Keep a count of acks that are received by the leader for a particular * proposal * * @param zxid, the zxid of the proposal sent out * @param sid, the id of the server that sent the ack * @param followerAddr */ synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) { if (!allowedToCommit) return; // last op committed was a leader change - from now on // the new leader should commit if (LOG.isTraceEnabled()) { LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid)); for (Proposal p : outstandingProposals.values()) { long packetZxid = p.packet.getZxid(); LOG.trace("outstanding proposal: 0x{}", Long.toHexString(packetZxid)); } LOG.trace("outstanding proposals all"); } if ((zxid & 0xffffffffL) == 0) { /* * We no longer process NEWLEADER ack with this method. However, * the learner sends an ack back to the leader after it gets * UPTODATE, so we just ignore the message. */ return; } if (outstandingProposals.size() == 0) { if (LOG.isDebugEnabled()) { LOG.debug("outstanding is 0"); } return; } if (lastCommitted >= zxid) { if (LOG.isDebugEnabled()) { LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid)); } // The proposal has already been committed return; } Proposal p = outstandingProposals.get(zxid); if (p == null) { LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr); return; } p.addAck(sid); /*if (LOG.isDebugEnabled()) { LOG.debug("Count for zxid: 0x{} is {}", Long.toHexString(zxid), p.ackSet.size()); }*/ boolean hasCommitted = tryToCommit(p, zxid, followerAddr); // If p is a reconfiguration, multiple other operations may be ready to be committed, // since operations wait for different sets of acks. // Currently we only permit one outstanding reconfiguration at a time // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is // pending all wait for a quorum of old and new config, so its not possible to get enough acks // for an operation without getting enough acks for preceding ops. But in the future if multiple // concurrent reconfigs are allowed, this can happen and then we need to check whether some pending // ops may already have enough acks and can be committed, which is what this code does. if (hasCommitted && p.request!=null && p.request.getHdr().getType() == OpCode.reconfig){ long curZxid = zxid; while (allowedToCommit && hasCommitted && p!=null){ curZxid++; p = outstandingProposals.get(curZxid); if (p !=null) hasCommitted = tryToCommit(p, curZxid, null); } } }
调用实现,最终由CommitProcessor 接着处理请求:
/** * @return True if committed, otherwise false. * @param a proposal p **/ synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) { // make sure that ops are committed in order. With reconfigurations it is now possible // that different operations wait for different sets of acks, and we still want to enforce // that they are committed in order. Currently we only permit one outstanding reconfiguration // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is // pending all wait for a quorum of old and new config, so its not possible to get enough acks // for an operation without getting enough acks for preceding ops. But in the future if multiple // concurrent reconfigs are allowed, this can happen. if (outstandingProposals.containsKey(zxid - 1)) return false; // getting a quorum from all necessary configurations if (!p.hasAllQuorums()) { return false; } // commit proposals in order if (zxid != lastCommitted+1) { LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid) + " from " + followerAddr + " not first!"); LOG.warn("First is " + (lastCommitted+1)); } // in order to be committed, a proposal must be accepted by a quorum outstandingProposals.remove(zxid); if (p.request != null) { toBeApplied.add(p); } if (p.request == null) { LOG.warn("Going to commmit null: " + p); } else if (p.request.getHdr().getType() == OpCode.reconfig) { LOG.debug("Committing a reconfiguration! " + outstandingProposals.size()); //if this server is voter in new config with the same quorum address, //then it will remain the leader //otherwise an up-to-date follower will be designated as leader. This saves //leader election time, unless the designated leader fails Long designatedLeader = getDesignatedLeader(p, zxid); //LOG.warn("designated leader is: " + designatedLeader); QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier(); self.processReconfig(newQV, designatedLeader, zk.getZxid(), true); if (designatedLeader != self.getId()) { allowedToCommit = false; } // we're sending the designated leader, and if the leader is changing the followers are // responsible for closing the connection - this way we are sure that at least a majority of them // receive the commit message. commitAndActivate(zxid, designatedLeader); informAndActivate(p, designatedLeader); //turnOffFollowers(); } else { commit(zxid); inform(p); } zk.commitProcessor.commit(p.request); if(pendingSyncs.containsKey(zxid)){ for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) { sendSync(r); } } return true; }
该程序第一步是发送一个请求到Quorum的所有成员
/** * Create a commit packet and send it to all the members of the quorum * * @param zxid */ public void commit(long zxid) { synchronized(this){ lastCommitted = zxid; } QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null); sendPacket(qp); }
发送报文如下:
/** * send a packet to all the followers ready to follow * * @param qp * the packet to be sent */ void sendPacket(QuorumPacket qp) { synchronized (forwardingFollowers) { for (LearnerHandler f : forwardingFollowers) { f.queuePacket(qp); } } }
第二步是通知Observer
/** * Create an inform packet and send it to all observers. * @param zxid * @param proposal */ public void inform(Proposal proposal) { QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, proposal.packet.getData(), null); sendObserverPacket(qp); }
发送observer程序如下:
/** * send a packet to all observers */ void sendObserverPacket(QuorumPacket qp) { for (LearnerHandler f : getObservingLearners()) { f.queuePacket(qp); } }
第三步到
zk.commitProcessor.commit(p.request);
2. CommitProcessor
CommitProcessor是多线程的,线程间通信通过queue,atomic和wait/notifyAll同步。CommitProcessor扮演一个网关角色,允许请求到剩下的处理管道。在同一瞬间,它支持多个读请求而仅支持一个写请求,这是为了保证写请求在事务中的顺序。
1个commit处理主线程,它监控请求队列,并将请求分发到工作线程,分发过程基于sessionId,这样特定session的读写请求通常分发到同一个线程,因而可以保证运行的顺序。
0~N个工作进程,他们在请求上运行剩下的请求处理管道。如果配置为0个工作线程,主commit线程将会直接运行管道。
经典(默认)线程数是:在32核的机器上,一个commit处理线程和32个工作线程。
多线程的限制:
每个session的请求处理必须是顺序的。
写请求处理必须按照zxid顺序。
必须保证一个session内不会出现写条件竞争,条件竞争可能导致另外一个session的读请求触发监控。
当前实现解决第三个限制,仅仅通过不允许在写请求时允许读进程的处理。
@Override public void run() { Request request; try { while (!stopped) { synchronized(this) { while ( !stopped && ((queuedRequests.isEmpty() || isWaitingForCommit() || isProcessingCommit()) && (committedRequests.isEmpty() || isProcessingRequest()))) { wait(); } } /* * Processing queuedRequests: Process the next requests until we * find one for which we need to wait for a commit. We cannot * process a read request while we are processing write request. */ while (!stopped && !isWaitingForCommit() && !isProcessingCommit() && (request = queuedRequests.poll()) != null) { if (needCommit(request)) { nextPending.set(request); } else { sendToNextProcessor(request); } } /* * Processing committedRequests: check and see if the commit * came in for the pending request. We can only commit a * request when there is no other request being processed. */ processCommitted(); } } catch (Throwable e) { handleException(this.getName(), e); } LOG.info("CommitProcessor exited loop!"); }
主逻辑程序如下:
/* * Separated this method from the main run loop * for test purposes (ZOOKEEPER-1863) */ protected void processCommitted() { Request request; if (!stopped && !isProcessingRequest() && (committedRequests.peek() != null)) { /* * ZOOKEEPER-1863: continue only if there is no new request * waiting in queuedRequests or it is waiting for a * commit. */ if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) { return; } request = committedRequests.poll(); /* * We match with nextPending so that we can move to the * next request when it is committed. We also want to * use nextPending because it has the cnxn member set * properly. */ Request pending = nextPending.get(); if (pending != null && pending.sessionId == request.sessionId && pending.cxid == request.cxid) { // we want to send our version of the request. // the pointer to the connection in the request pending.setHdr(request.getHdr()); pending.setTxn(request.getTxn()); pending.zxid = request.zxid; // Set currentlyCommitting so we will block until this // completes. Cleared by CommitWorkRequest after // nextProcessor returns. currentlyCommitting.set(pending); nextPending.set(null); sendToNextProcessor(pending); } else { // this request came from someone else so just // send the commit packet currentlyCommitting.set(request); sendToNextProcessor(request); } } }
启动多线程处理程序
/** * Schedule final request processing; if a worker thread pool is not being * used, processing is done directly by this thread. */ private void sendToNextProcessor(Request request) { numRequestsProcessing.incrementAndGet(); workerPool.schedule(new CommitWorkRequest(request), request.sessionId); }
真实逻辑是
/** * Schedule work to be done by the thread assigned to this id. Thread * assignment is a single mod operation on the number of threads. If a * worker thread pool is not being used, work is done directly by * this thread. */ public void schedule(WorkRequest workRequest, long id) { if (stopped) { workRequest.cleanup(); return; } ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest); // If we have a worker thread pool, use that; otherwise, do the work // directly. int size = workers.size(); if (size > 0) { try { // make sure to map negative ids as well to [0, size-1] int workerNum = ((int) (id % size) + size) % size; ExecutorService worker = workers.get(workerNum); worker.execute(scheduledWorkRequest); } catch (RejectedExecutionException e) { LOG.warn("ExecutorService rejected execution", e); workRequest.cleanup(); } } else { // When there is no worker thread pool, do the work directly // and wait for its completion scheduledWorkRequest.start(); try { scheduledWorkRequest.join(); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); Thread.currentThread().interrupt(); } } }
请求处理线程run方法:
@Override public void run() { try { // Check if stopped while request was on queue if (stopped) { workRequest.cleanup(); return; } workRequest.doWork(); } catch (Exception e) { LOG.warn("Unexpected exception", e); workRequest.cleanup(); } }
调用commitProcessor的doWork方法
public void doWork() throws RequestProcessorException { try { nextProcessor.processRequest(request); } finally { // If this request is the commit request that was blocking // the processor, clear. currentlyCommitting.compareAndSet(request, null); /* * Decrement outstanding request count. The processor may be * blocked at the moment because it is waiting for the pipeline * to drain. In that case, wake it up if there are pending * requests. */ if (numRequestsProcessing.decrementAndGet() == 0) { if (!queuedRequests.isEmpty() || !committedRequests.isEmpty()) { wakeup(); } } } }
将请求传递给下一个RP:Leader.ToBeAppliedRequestProcessor
3.Leader.ToBeAppliedRequestProcessor
Leader.ToBeAppliedRequestProcessor仅仅维护一个toBeApplied列表。
/** * This request processor simply maintains the toBeApplied list. For * this to work next must be a FinalRequestProcessor and * FinalRequestProcessor.processRequest MUST process the request * synchronously! * * @param next * a reference to the FinalRequestProcessor */ ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) { if (!(next instanceof FinalRequestProcessor)) { throw new RuntimeException(ToBeAppliedRequestProcessor.class .getName() + " must be connected to " + FinalRequestProcessor.class.getName() + " not " + next.getClass().getName()); } this.leader = leader; this.next = next; } /* * (non-Javadoc) * * @see org.apache.zookeeper.server.RequestProcessor#processRequest(org.apache.zookeeper.server.Request) */ public void processRequest(Request request) throws RequestProcessorException { next.processRequest(request); // The only requests that should be on toBeApplied are write // requests, for which we will have a hdr. We can't simply use // request.zxid here because that is set on read requests to equal // the zxid of the last write op. if (request.getHdr() != null) { long zxid = request.getHdr().getZxid(); Iterator<Proposal> iter = leader.toBeApplied.iterator(); if (iter.hasNext()) { Proposal p = iter.next(); if (p.request != null && p.request.zxid == zxid) { iter.remove(); return; } } LOG.error("Committed request not found on toBeApplied: " + request); } }
4. FinalRequestProcessor前文已经说明,本文不在赘述。
小结:从上面的分析可以知道,leader处理请求的顺序分别是:PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor。
请求先通过PrepRequestProcessor接收请求,并进行包装,然后请求类型的不同,设置同享数据;主要负责通知所有follower和observer;CommitProcessor 启动多线程处理请求;Leader.ToBeAppliedRequestProcessor仅仅维护一个toBeApplied列表;
FinalRequestProcessor来作为消息处理器的终结者,发送响应消息,并触发watcher的处理程序。