转载

zookeeper源码分析之一客户端

ZooKeeper Client Library提供了丰富直观的API供用户程序使用,下面是一些常用的API:

  • create(path, data, flags): 创建一个ZNode, path是其路径,data是要存储在该ZNode上的数据,flags常用的有: PERSISTEN, PERSISTENT_SEQUENTAIL, EPHEMERAL, EPHEMERAL_SEQUENTAIL
  • delete(path, version): 删除一个ZNode,可以通过version删除指定的版本, 如果version是-1的话,表示删除所有的版本
  • exists(path, watch): 判断指定ZNode是否存在,并设置是否Watch这个ZNode。这里如果要设置Watcher的话,Watcher是在创建ZooKeeper实例时指定的,如果要设置特定的Watcher的话,可以调用另一个重载版本的exists(path, watcher)。以下几个带watch参数的API也都类似
  • getData(path, watch): 读取指定ZNode上的数据,并设置是否watch这个ZNode
  • setData(path, watch): 更新指定ZNode的数据,并设置是否Watch这个ZNode
  • getChildren(path, watch): 获取指定ZNode的所有子ZNode的名字,并设置是否Watch这个ZNode
  • sync(path): 把所有在sync之前的更新操作都进行同步,达到每个请求都在半数以上的ZooKeeper Server上生效。path参数目前没有用
  • setAcl(path, acl): 设置指定ZNode的Acl信息
  • getAcl(path): 获取指定ZNode的Acl信息

具体是如何其作用的呢?

客户端连接到服务器

启动客户端的脚本zookeeper/bin/zkCli.sh

# use POSTIX interface, symlink is followed automatically ZOOBIN="${BASH_SOURCE-$0}" ZOOBIN="$(dirname "${ZOOBIN}")" ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"  if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then   . "$ZOOBINDIR"/../libexec/zkEnv.sh else   . "$ZOOBINDIR"/zkEnv.sh fi  ZOO_LOG_FILE=zookeeper-$USER-cli-$HOSTNAME.log  "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" "-Dzookeeper.log.file=${ZOO_LOG_FILE}" /      -cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS /      org.apache.zookeeper.ZooKeeperMain "$@"

从上述脚本可以看出:

连接到zookeeper服务器时,需要启动org.apache.zookeeper.ZooKeeperMain类,其入口main方法如下:

public static void main(String args[])    throws KeeperException, IOException, InterruptedException   {    ZooKeeperMain main = new ZooKeeperMain(args);    main.run();   } void run() throws KeeperException, IOException, InterruptedException {    if (cl.getCommand() == null) {     System.out.println("Welcome to ZooKeeper!");     boolean jlinemissing = false;     // only use jline if it's in the classpath     try {      Class<?> consoleC = Class.forName("jline.console.ConsoleReader");      Class<?> completorC =       Class.forName("org.apache.zookeeper.JLineZNodeCompleter");      System.out.println("JLine support is enabled");      Object console =       consoleC.getConstructor().newInstance();      Object completor =       completorC.getConstructor(ZooKeeper.class).newInstance(zk);      Method addCompletor = consoleC.getMethod("addCompleter",        Class.forName("jline.console.completer.Completer"));      addCompletor.invoke(console, completor);      String line;      Method readLine = consoleC.getMethod("readLine", String.class);      while ((line = (String)readLine.invoke(console, getPrompt())) != null) {       executeLine(line);      }     } catch (ClassNotFoundException e) {      LOG.debug("Unable to start jline", e);      jlinemissing = true;     } catch (NoSuchMethodException e) {      LOG.debug("Unable to start jline", e);      jlinemissing = true;     } catch (InvocationTargetException e) {      LOG.debug("Unable to start jline", e);      jlinemissing = true;     } catch (IllegalAccessException e) {      LOG.debug("Unable to start jline", e);      jlinemissing = true;     } catch (InstantiationException e) {      LOG.debug("Unable to start jline", e);      jlinemissing = true;     }     if (jlinemissing) {      System.out.println("JLine support is disabled");      BufferedReader br =       new BufferedReader(new InputStreamReader(System.in));      String line;      while ((line = br.readLine()) != null) {       executeLine(line);      }     }    } else {     // Command line args non-null.  Run what was passed.     processCmd(cl);    }   } 

其中,主要方法为processCmd(cl):

protected boolean processZKCmd(MyCommandOptions co)  throws KeeperException, IOException, InterruptedException {  String[] args = co.getArgArray();  String cmd = co.getCommand();  if (args.length < 1) {   usage();   return false;  }  if (!commandMap.containsKey(cmd)) {   usage();   return false;  }  boolean watch = false;  LOG.debug("Processing " + cmd);  try {  if (cmd.equals("quit")) {   zk.close();   System.exit(0);  } else if (cmd.equals("redo") && args.length >= 2) {   Integer i = Integer.decode(args[1]);   if (commandCount <= i){ // don't allow redoing this redo    System.out.println("Command index out of range");    return false;   }   cl.parseCommand(history.get(i));   if (cl.getCommand().equals( "redo" )){    System.out.println("No redoing redos");    return false;   }   history.put(commandCount, history.get(i));   processCmd( cl);  } else if (cmd.equals("history")) {   for (int i=commandCount - 10;i<=commandCount;++i) {    if (i < 0) continue;    System.out.println(i + " - " + history.get(i));   }  } else if (cmd.equals("printwatches")) {   if (args.length == 1) {    System.out.println("printwatches is " + (printWatches ? "on" : "off"));   } else {    printWatches = args[1].equals("on");   }  } else if (cmd.equals("connect")) {   if (args.length >=2) {    connectToZK(args[1]);   } else {    connectToZK(host);       }  } 

我们以connect命令来看看连接的过程。

protected void connectToZK(String newHost) throws InterruptedException, IOException {  if (zk != null && zk.getState().isAlive()) {   zk.close();  }  host = newHost;  boolean readOnly = cl.getOption("readonly") != null;  if (cl.getOption("secure") != null) {   System.setProperty(ZooKeeper.SECURE_CLIENT, "true");   System.out.println("Secure connection is enabled");  }  zk = new ZooKeeper(host,     Integer.parseInt(cl.getOption("timeout")),     new MyWatcher(), readOnly); } 

创建客户端:

/**  * To create a ZooKeeper client object, the application needs to pass a  * connection string containing a comma separated list of host:port pairs,  * each corresponding to a ZooKeeper server.  * <p>  * Session establishment is asynchronous. This constructor will initiate  * connection to the server and return immediately - potentially (usually)  * before the session is fully established. The watcher argument specifies  * the watcher that will be notified of any changes in state. This  * notification can come at any point before or after the constructor call  * has returned.  * <p>  * The instantiated ZooKeeper client object will pick an arbitrary server  * from the connectString and attempt to connect to it. If establishment of  * the connection fails, another server in the connect string will be tried  * (the order is non-deterministic, as we random shuffle the list), until a  * connection is established. The client will continue attempts until the  * session is explicitly closed.  * <p>  * Added in 3.2.0: An optional "chroot" suffix may also be appended to the  * connection string. This will run the client commands while interpreting  * all paths relative to this root (similar to the unix chroot command).  * <p>  *  * @param connectString  *            comma separated host:port pairs, each corresponding to a zk  *            server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If  *            the optional chroot suffix is used the example would look  *            like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"  *            where the client would be rooted at "/app/a" and all paths  *            would be relative to this root - ie getting/setting/etc...  *            "/foo/bar" would result in operations being run on  *            "/app/a/foo/bar" (from the server perspective).  * @param sessionTimeout  *            session timeout in milliseconds  * @param watcher  *            a watcher object which will be notified of state changes, may  *            also be notified for node events  * @param canBeReadOnly  *            (added in 3.4) whether the created client is allowed to go to  *            read-only mode in case of partitioning. Read-only mode  *            basically means that if the client can't find any majority  *            servers but there's partitioned server it could reach, it  *            connects to one in read-only mode, i.e. read requests are  *            allowed while write requests are not. It continues seeking for  *            majority in the background.  *  * @throws IOException  *             in cases of network failure  * @throws IllegalArgumentException  *             if an invalid chroot path is specified  */ public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,         boolean canBeReadOnly) throws IOException {     this(connectString, sessionTimeout, watcher, canBeReadOnly,             createDefaultHostProvider(connectString)); } 

调用创建客户端方法:

/**  * To create a ZooKeeper client object, the application needs to pass a  * connection string containing a comma separated list of host:port pairs,  * each corresponding to a ZooKeeper server.  * <p>  * Session establishment is asynchronous. This constructor will initiate  * connection to the server and return immediately - potentially (usually)  * before the session is fully established. The watcher argument specifies  * the watcher that will be notified of any changes in state. This  * notification can come at any point before or after the constructor call  * has returned.  * <p>  * The instantiated ZooKeeper client object will pick an arbitrary server  * from the connectString and attempt to connect to it. If establishment of  * the connection fails, another server in the connect string will be tried  * (the order is non-deterministic, as we random shuffle the list), until a  * connection is established. The client will continue attempts until the  * session is explicitly closed.  * <p>  * Added in 3.2.0: An optional "chroot" suffix may also be appended to the  * connection string. This will run the client commands while interpreting  * all paths relative to this root (similar to the unix chroot command).  * <p>  * For backward compatibility, there is another version  * {@link #ZooKeeper(String, int, Watcher, boolean)} which uses  * default {@link StaticHostProvider}  *  * @param connectString  *            comma separated host:port pairs, each corresponding to a zk  *            server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If  *            the optional chroot suffix is used the example would look  *            like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"  *            where the client would be rooted at "/app/a" and all paths  *            would be relative to this root - ie getting/setting/etc...  *            "/foo/bar" would result in operations being run on  *            "/app/a/foo/bar" (from the server perspective).  * @param sessionTimeout  *            session timeout in milliseconds  * @param watcher  *            a watcher object which will be notified of state changes, may  *            also be notified for node events  * @param canBeReadOnly  *            (added in 3.4) whether the created client is allowed to go to  *            read-only mode in case of partitioning. Read-only mode  *            basically means that if the client can't find any majority  *            servers but there's partitioned server it could reach, it  *            connects to one in read-only mode, i.e. read requests are  *            allowed while write requests are not. It continues seeking for  *            majority in the background.  * @param aHostProvider  *            use this as HostProvider to enable custom behaviour.  *  * @throws IOException  *             in cases of network failure  * @throws IllegalArgumentException  *             if an invalid chroot path is specified  */ public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,         boolean canBeReadOnly, HostProvider aHostProvider)         throws IOException {     LOG.info("Initiating client connection, connectString=" + connectString             + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);      watchManager = defaultWatchManager();     watchManager.defaultWatcher = watcher;      ConnectStringParser connectStringParser = new ConnectStringParser(             connectString);     hostProvider = aHostProvider;      cnxn = new ClientCnxn(connectStringParser.getChrootPath(),             hostProvider, sessionTimeout, this, watchManager,             getClientCnxnSocket(), canBeReadOnly);     cnxn.start(); } 

启动两个进程:

public void start() {         sendThread.start();         eventThread.start();     }

eventThread的run方法:

@Override  public void run() {     try {     isRunning = true;     while (true) {     Object event = waitingEvents.take();     if (event == eventOfDeath) {     wasKilled = true;     } else {     processEvent(event);     }     if (wasKilled)     synchronized (waitingEvents) {        if (waitingEvents.isEmpty()) {        isRunning = false;        break;        }     }     }     } catch (InterruptedException e) {     LOG.error("Event thread exiting due to interruption", e);     }   LOG.info("EventThread shut down for session: 0x{}",      Long.toHexString(getSessionId()));  }    private void processEvent(Object event) {    try {     if (event instanceof WatcherSetEventPair) {      // each watcher will process the event      WatcherSetEventPair pair = (WatcherSetEventPair) event;      for (Watcher watcher : pair.watchers) {       try {        watcher.process(pair.event);       } catch (Throwable t) {        LOG.error("Error while calling watcher ", t);       }      }    } else if (event instanceof LocalCallback) {     LocalCallback lcb = (LocalCallback) event;     if (lcb.cb instanceof StatCallback) {      ((StatCallback) lcb.cb).processResult(lcb.rc, lcb.path,        lcb.ctx, null);     } else if (lcb.cb instanceof DataCallback) {      ((DataCallback) lcb.cb).processResult(lcb.rc, lcb.path,        lcb.ctx, null, null);     } else if (lcb.cb instanceof ACLCallback) {      ((ACLCallback) lcb.cb).processResult(lcb.rc, lcb.path,        lcb.ctx, null, null);     } else if (lcb.cb instanceof ChildrenCallback) {      ((ChildrenCallback) lcb.cb).processResult(lcb.rc,        lcb.path, lcb.ctx, null);     } else if (lcb.cb instanceof Children2Callback) {      ((Children2Callback) lcb.cb).processResult(lcb.rc,        lcb.path, lcb.ctx, null, null);     } else if (lcb.cb instanceof StringCallback) {      ((StringCallback) lcb.cb).processResult(lcb.rc,        lcb.path, lcb.ctx, null);     } else {      ((VoidCallback) lcb.cb).processResult(lcb.rc, lcb.path,        lcb.ctx);     }    } else {      Packet p = (Packet) event;      int rc = 0;      String clientPath = p.clientPath;      if (p.replyHeader.getErr() != 0) {       rc = p.replyHeader.getErr();      }      if (p.cb == null) {       LOG.warn("Somehow a null cb got to EventThread!");      } else if (p.response instanceof ExistsResponse        || p.response instanceof SetDataResponse        || p.response instanceof SetACLResponse) {       StatCallback cb = (StatCallback) p.cb;       if (rc == 0) {        if (p.response instanceof ExistsResponse) {         cb.processResult(rc, clientPath, p.ctx,           ((ExistsResponse) p.response)             .getStat());        } else if (p.response instanceof SetDataResponse) {         cb.processResult(rc, clientPath, p.ctx,           ((SetDataResponse) p.response)             .getStat());        } else if (p.response instanceof SetACLResponse) {         cb.processResult(rc, clientPath, p.ctx,           ((SetACLResponse) p.response)             .getStat());        }       } else {        cb.processResult(rc, clientPath, p.ctx, null);       }      } else if (p.response instanceof GetDataResponse) {       DataCallback cb = (DataCallback) p.cb;       GetDataResponse rsp = (GetDataResponse) p.response;       if (rc == 0) {        cb.processResult(rc, clientPath, p.ctx, rsp          .getData(), rsp.getStat());       } else {        cb.processResult(rc, clientPath, p.ctx, null,          null);       }      } else if (p.response instanceof GetACLResponse) {       ACLCallback cb = (ACLCallback) p.cb;       GetACLResponse rsp = (GetACLResponse) p.response;       if (rc == 0) {        cb.processResult(rc, clientPath, p.ctx, rsp          .getAcl(), rsp.getStat());       } else {        cb.processResult(rc, clientPath, p.ctx, null,          null);       }      } else if (p.response instanceof GetChildrenResponse) {       ChildrenCallback cb = (ChildrenCallback) p.cb;       GetChildrenResponse rsp = (GetChildrenResponse) p.response;       if (rc == 0) {        cb.processResult(rc, clientPath, p.ctx, rsp          .getChildren());       } else {        cb.processResult(rc, clientPath, p.ctx, null);       }      } else if (p.response instanceof GetChildren2Response) {       Children2Callback cb = (Children2Callback) p.cb;       GetChildren2Response rsp = (GetChildren2Response) p.response;       if (rc == 0) {        cb.processResult(rc, clientPath, p.ctx, rsp          .getChildren(), rsp.getStat());       } else {        cb.processResult(rc, clientPath, p.ctx, null, null);       }      } else if (p.response instanceof CreateResponse) {       StringCallback cb = (StringCallback) p.cb;       CreateResponse rsp = (CreateResponse) p.response;       if (rc == 0) {        cb.processResult(rc, clientPath, p.ctx,          (chrootPath == null            ? rsp.getPath()            : rsp.getPath()         .substring(chrootPath.length())));       } else {        cb.processResult(rc, clientPath, p.ctx, null);       }      } else if (p.response instanceof Create2Response) {       Create2Callback cb = (Create2Callback) p.cb;       Create2Response rsp = (Create2Response) p.response;       if (rc == 0) {        cb.processResult(rc, clientPath, p.ctx,          (chrootPath == null            ? rsp.getPath()            : rsp.getPath()         .substring(chrootPath.length())), rsp.getStat());       } else {        cb.processResult(rc, clientPath, p.ctx, null, null);       }             } else if (p.response instanceof MultiResponse) {       MultiCallback cb = (MultiCallback) p.cb;       MultiResponse rsp = (MultiResponse) p.response;       if (rc == 0) {        List<OpResult> results = rsp.getResultList();        int newRc = rc;        for (OpResult result : results) {         if (result instanceof ErrorResult           && KeeperException.Code.OK.intValue() != (newRc = ((ErrorResult) result)           .getErr())) {          break;         }        }        cb.processResult(newRc, clientPath, p.ctx, results);       } else {        cb.processResult(rc, clientPath, p.ctx, null);       }      }  else if (p.cb instanceof VoidCallback) {       VoidCallback cb = (VoidCallback) p.cb;       cb.processResult(rc, clientPath, p.ctx);      }     }    } catch (Throwable t) {     LOG.error("Caught unexpected throwable", t);    }    } } 

sendThread(

/**

* This class services the outgoing request queue and generates the heart

* beats. It also spawns the ReadThread.

*/

)线程的run方法:

@Override public void run() {  clientCnxnSocket.introduce(this, sessionId, outgoingQueue);  clientCnxnSocket.updateNow();  clientCnxnSocket.updateLastSendAndHeard();  int to;  long lastPingRwServer = Time.currentElapsedTime();  final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds  while (state.isAlive()) {   try {    if (!clientCnxnSocket.isConnected()) {     // don't re-establish connection if we are closing     if (closing) {      break;     }     startConnect();     clientCnxnSocket.updateLastSendAndHeard();    }    if (state.isConnected()) {     // determine whether we need to send an AuthFailed event.     if (zooKeeperSaslClient != null) {      boolean sendAuthEvent = false;      if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {       try {        zooKeeperSaslClient.initialize(ClientCnxn.this);       } catch (SaslException e) {          LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);        state = States.AUTH_FAILED;        sendAuthEvent = true;       }      }      KeeperState authState = zooKeeperSaslClient.getKeeperState();      if (authState != null) {       if (authState == KeeperState.AuthFailed) {        // An authentication error occurred during authentication with the Zookeeper Server.        state = States.AUTH_FAILED;        sendAuthEvent = true;       } else {        if (authState == KeeperState.SaslAuthenticated) {         sendAuthEvent = true;        }       }      }      if (sendAuthEvent == true) {       eventThread.queueEvent(new WatchedEvent(          Watcher.Event.EventType.None,          authState,null));      }     }     to = readTimeout - clientCnxnSocket.getIdleRecv();    } else {     to = connectTimeout - clientCnxnSocket.getIdleRecv();    }    if (to <= 0) {     throw new SessionTimeoutException(       "Client session timed out, have not heard from server in "         + clientCnxnSocket.getIdleRecv() + "ms"         + " for sessionid 0x"         + Long.toHexString(sessionId));    }    if (state.isConnected()) {     //1000(1 second) is to prevent race condition missing to send the second ping     //also make sure not to send too many pings when readTimeout is small      int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -        ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);     //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL     if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {      sendPing();      clientCnxnSocket.updateLastSend();     } else {      if (timeToNextPing < to) {       to = timeToNextPing;      }     }    }    // If we are in read-only mode, seek for read/write server    if (state == States.CONNECTEDREADONLY) {     long now = Time.currentElapsedTime();     int idlePingRwServer = (int) (now - lastPingRwServer);     if (idlePingRwServer >= pingRwTimeout) {      lastPingRwServer = now;      idlePingRwServer = 0;      pingRwTimeout =       Math.min(2*pingRwTimeout, maxPingRwTimeout);      pingRwServer();     }     to = Math.min(to, pingRwTimeout - idlePingRwServer);    }    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);   } catch (Throwable e) {    if (closing) {     if (LOG.isDebugEnabled()) {      // closing so this is expected      LOG.debug("An exception was thrown while closing send thread for session 0x"        + Long.toHexString(getSessionId())        + " : " + e.getMessage());     }     break;    } else {     // this is ugly, you have a better way speak up     if (e instanceof SessionExpiredException) {      LOG.info(e.getMessage() + ", closing socket connection");     } else if (e instanceof SessionTimeoutException) {      LOG.info(e.getMessage() + RETRY_CONN_MSG);     } else if (e instanceof EndOfStreamException) {      LOG.info(e.getMessage() + RETRY_CONN_MSG);     } else if (e instanceof RWServerFoundException) {      LOG.info(e.getMessage());     } else {      LOG.warn(        "Session 0x"          + Long.toHexString(getSessionId())          + " for server "          + clientCnxnSocket.getRemoteSocketAddress()          + ", unexpected error"          + RETRY_CONN_MSG, e);     }     // At this point, there might still be new packets appended to outgoingQueue.     // they will be handled in next connection or cleared up if closed.     cleanup();     if (state.isAlive()) {      eventThread.queueEvent(new WatchedEvent(        Event.EventType.None,        Event.KeeperState.Disconnected,        null));     }     clientCnxnSocket.updateNow();     clientCnxnSocket.updateLastSendAndHeard();    }   }  }  synchronized (state) {   // When it comes to this point, it guarantees that later queued   // packet to outgoingQueue will be notified of death.   cleanup();  }  clientCnxnSocket.close();  if (state.isAlive()) {   eventThread.queueEvent(new WatchedEvent(Event.EventType.None,     Event.KeeperState.Disconnected, null));  }  ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),    "SendThread exited loop for session: 0x"        + Long.toHexString(getSessionId())); } 

Client与ZooKeeper之间的通信,需要创建一个Session,这个Session会有一个超时时间。因为ZooKeeper集群会把Client的Session信息持久化,所以在Session没超时之前,Client与ZooKeeper Server的连接可以在各个ZooKeeper Server之间透明地移动。

在实际的应用中,如果Client与Server之间的通信足够频繁,Session的维护就不需要其它额外的消息了。否则,ZooKeeper Client会每t/3 ms发一次心跳给Server,如果Client 2t/3 ms没收到来自Server的心跳回应,就会换到一个新的ZooKeeper Server上。这里t是用户配置的Session的超时时间。

@Override void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)   throws IOException, InterruptedException {  selector.select(waitTimeOut);  Set<SelectionKey> selected;  synchronized (this) {   selected = selector.selectedKeys();  }  // Everything below and until we get back to the select is  // non blocking, so time is effectively a constant. That is  // Why we just have to do this once, here  updateNow();  for (SelectionKey k : selected) {   SocketChannel sc = ((SocketChannel) k.channel());   if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {    if (sc.finishConnect()) {     updateLastSendAndHeard();     updateSocketAddresses();     sendThread.primeConnection();    }   } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {    doIO(pendingQueue, cnxn);   }  }  if (sendThread.getZkState().isConnected()) {   if (findSendablePacket(outgoingQueue,     sendThread.tunnelAuthInProgress()) != null) {    enableWrite();   }  }  selected.clear(); } 

ZooKeeper支持一种Watch操作,Client可以在某个ZNode上设置一个Watcher,来Watch该ZNode上的变化。如果该ZNode上有相应的变化,就会触发这个Watcher,把相应的事件通知给设置Watcher的Client。需要注意的是,ZooKeeper中的Watcher是一次性的,即触发一次就会被取消,如果想继续Watch的话,需要客户端重新设置Watcher。

/**  * @return true if a packet was received  * @throws InterruptedException  * @throws IOException  */ void doIO(List<Packet> pendingQueue, ClientCnxn cnxn)   throws InterruptedException, IOException {  SocketChannel sock = (SocketChannel) sockKey.channel();  if (sock == null) {   throw new IOException("Socket is null!");  }  if (sockKey.isReadable()) {   int rc = sock.read(incomingBuffer);   if (rc < 0) {    throw new EndOfStreamException(      "Unable to read additional data from server sessionid 0x"        + Long.toHexString(sessionId)        + ", likely server has closed socket");   }   if (!incomingBuffer.hasRemaining()) {    incomingBuffer.flip();    if (incomingBuffer == lenBuffer) {     recvCount++;     readLength();    } else if (!initialized) {     readConnectResult();     enableRead();     if (findSendablePacket(outgoingQueue,       sendThread.tunnelAuthInProgress()) != null) {      // Since SASL authentication has completed (if client is configured to do so),      // outgoing packets waiting in the outgoingQueue can now be sent.      enableWrite();     }     lenBuffer.clear();     incomingBuffer = lenBuffer;     updateLastHeard();     initialized = true;    } else {     sendThread.readResponse(incomingBuffer);     lenBuffer.clear();     incomingBuffer = lenBuffer;     updateLastHeard();    }   }  }  if (sockKey.isWritable()) {   Packet p = findSendablePacket(outgoingQueue,     sendThread.tunnelAuthInProgress());   if (p != null) {    updateLastSend();    // If we already started writing p, p.bb will already exist    if (p.bb == null) {     if ((p.requestHeader != null) &&       (p.requestHeader.getType() != OpCode.ping) &&       (p.requestHeader.getType() != OpCode.auth)) {      p.requestHeader.setXid(cnxn.getXid());     }     p.createBB();    }    sock.write(p.bb);    if (!p.bb.hasRemaining()) {     sentCount++;     outgoingQueue.removeFirstOccurrence(p);     if (p.requestHeader != null       && p.requestHeader.getType() != OpCode.ping       && p.requestHeader.getType() != OpCode.auth) {      synchronized (pendingQueue) {       pendingQueue.add(p);      }     }    }   }   if (outgoingQueue.isEmpty()) {    // No more packets to send: turn off write interest flag.    // Will be turned on later by a later call to enableWrite(),    // from within ZooKeeperSaslClient (if client is configured    // to attempt SASL authentication), or in either doIO() or    // in doTransport() if not.    disableWrite();   } else if (!initialized && p != null && !p.bb.hasRemaining()) {    // On initial connection, write the complete connect request    // packet, but then disable further writes until after    // receiving a successful connection response.  If the    // session is expired, then the server sends the expiration    // response and immediately closes its end of the socket.  If    // the client is simultaneously writing on its end, then the    // TCP stack may choose to abort with RST, in which case the    // client would never receive the session expired event.  See    // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html    disableWrite();   } else {    // Just in case    enableWrite();   }  } } 

客户端命令分析

先让我们看看客户端都提供了哪些命令形式?

ZooKeeperMain中定义了cli的各种命令:

protected static final Map<String,String> commandMap = new HashMap<String,String>( ); protected static final Map<String,CliCommand> commandMapCli =    new HashMap<String,CliCommand>( ); protected MyCommandOptions cl = new MyCommandOptions(); protected HashMap<Integer,String> history = new HashMap<Integer,String>( ); protected int commandCount = 0; protected boolean printWatches = true; protected ZooKeeper zk; protected String host = ""; public boolean getPrintWatches( ) {  return printWatches; } static {  commandMap.put("connect", "host:port");  commandMap.put("history","");  commandMap.put("redo","cmdno");  commandMap.put("printwatches", "on|off");  commandMap.put("quit", "");  new CloseCommand().addToMap(commandMapCli);  new CreateCommand().addToMap(commandMapCli);  new DeleteCommand().addToMap(commandMapCli);  new DeleteAllCommand().addToMap(commandMapCli);  // Depricated: rmr  new DeleteAllCommand("rmr").addToMap(commandMapCli);  new SetCommand().addToMap(commandMapCli);  new GetCommand().addToMap(commandMapCli);  new LsCommand().addToMap(commandMapCli);  new Ls2Command().addToMap(commandMapCli);  new GetAclCommand().addToMap(commandMapCli);  new SetAclCommand().addToMap(commandMapCli);  new StatCommand().addToMap(commandMapCli);  new SyncCommand().addToMap(commandMapCli);  new SetQuotaCommand().addToMap(commandMapCli);  new ListQuotaCommand().addToMap(commandMapCli);  new DelQuotaCommand().addToMap(commandMapCli);  new AddAuthCommand().addToMap(commandMapCli);  new ReconfigCommand().addToMap(commandMapCli);  new GetConfigCommand().addToMap(commandMapCli);  new RemoveWatchesCommand().addToMap(commandMapCli);  // add all to commandMap  for (Entry<String, CliCommand> entry : commandMapCli.entrySet()) {   commandMap.put(entry.getKey(), entry.getValue().getOptionStr()); } } 

其中,commandMap是所有的命令集合,commandMapCli是连接上后的命令集合,

zookeeper源码分析之一客户端

我们以创建一个节点(CreateCommand)为例深入了解一下:

该命令形式为:create [-s] [-e] [-c] path [data] [acl]

其中 s,e,c分别代表:

sequential,ephemeral,container

ZNode根据其本身的特性,可以分为下面两类:

  • Regular ZNode: 常规型ZNode, 用户需要显式的创建、删除
  • Ephemeral ZNode: 临时型ZNode, 用户创建它之后,可以显式的删除,也可以在创建它的Session结束后,由ZooKeeper Server自动删除

ZNode还有一个Sequential的特性,如果创建的时候指定的话,该ZNode的名字后面会自动Append一个不断增加的SequenceNo。

执行命令的代码如下:

@Override public boolean exec() throws KeeperException, InterruptedException {  CreateMode flags = CreateMode.PERSISTENT;  boolean hasE = cl.hasOption("e");  boolean hasS = cl.hasOption("s");  boolean hasC = cl.hasOption("c");  if (hasC && (hasE || hasS)) {   err.println("-c cannot be combined with -s or -e. Containers cannot be ephemeral or sequential.");   return false;  }  if(hasE && hasS) {   flags = CreateMode.EPHEMERAL_SEQUENTIAL;  } else if (hasE) {   flags = CreateMode.EPHEMERAL;  } else if (hasS) {   flags = CreateMode.PERSISTENT_SEQUENTIAL;  } else if (hasC) {   flags = CreateMode.CONTAINER;  }  String path = args[1];  byte[] data = null;  if (args.length > 2) {   data = args[2].getBytes();  }  List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;  if (args.length > 3) {   acl = AclParser.parse(args[3]);  }  try {   String newPath = zk.create(path, data, acl, flags);   err.println("Created " + newPath);  } catch(KeeperException.EphemeralOnLocalSessionException e) {   err.println("Unable to create ephemeral node on a local session");   return false;  } catch (KeeperException.InvalidACLException ex) {   err.println(ex.getMessage());   return false;  }  return true; } 

创建节点的代码如下:

/**  * Create a node with the given path. The node data will be the given data,  * and node acl will be the given acl.  * <p>  * The flags argument specifies whether the created node will be ephemeral  * or not.  * <p>  * An ephemeral node will be removed by the ZooKeeper automatically when the  * session associated with the creation of the node expires.  * <p>  * The flags argument can also specify to create a sequential node. The  * actual path name of a sequential node will be the given path plus a  * suffix "i" where i is the current sequential number of the node. The sequence  * number is always fixed length of 10 digits, 0 padded. Once  * such a node is created, the sequential number will be incremented by one.  * <p>  * If a node with the same actual path already exists in the ZooKeeper, a  * KeeperException with error code KeeperException.NodeExists will be  * thrown. Note that since a different actual path is used for each  * invocation of creating sequential node with the same path argument, the  * call will never throw "file exists" KeeperException.  * <p>  * If the parent node does not exist in the ZooKeeper, a KeeperException  * with error code KeeperException.NoNode will be thrown.  * <p>  * An ephemeral node cannot have children. If the parent node of the given  * path is ephemeral, a KeeperException with error code  * KeeperException.NoChildrenForEphemerals will be thrown.  * <p>  * This operation, if successful, will trigger all the watches left on the  * node of the given path by exists and getData API calls, and the watches  * left on the parent node by getChildren API calls.  * <p>  * If a node is created successfully, the ZooKeeper server will trigger the  * watches on the path left by exists calls, and the watches on the parent  * of the node by getChildren calls.  * <p>  * The maximum allowable size of the data array is 1 MB (1,048,576 bytes).  * Arrays larger than this will cause a KeeperExecption to be thrown.  *  * @param path  *                the path for the node  * @param data  *                the initial data for the node  * @param acl  *                the acl for the node  * @param createMode  *                specifying whether the node to be created is ephemeral  *                and/or sequential  * @return the actual path of the created node  * @throws KeeperException if the server returns a non-zero error code  * @throws KeeperException.InvalidACLException if the ACL is invalid, null, or empty  * @throws InterruptedException if the transaction is interrupted  * @throws IllegalArgumentException if an invalid path is specified  */ public String create(final String path, byte data[], List<ACL> acl,         CreateMode createMode)     throws KeeperException, InterruptedException {     final String clientPath = path;     PathUtils.validatePath(clientPath, createMode.isSequential());      final String serverPath = prependChroot(clientPath);      RequestHeader h = new RequestHeader();     h.setType(createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create);     CreateRequest request = new CreateRequest();     CreateResponse response = new CreateResponse();     request.setData(data);     request.setFlags(createMode.toFlag());     request.setPath(serverPath);     if (acl != null && acl.size() == 0) {         throw new KeeperException.InvalidACLException();     }     request.setAcl(acl);     ReplyHeader r = cnxn.submitRequest(h, request, response, null);     if (r.getErr() != 0) {         throw KeeperException.create(KeeperException.Code.get(r.getErr()),                 clientPath);     }     if (cnxn.chrootPath == null) {         return response.getPath();     } else {         return response.getPath().substring(cnxn.chrootPath.length());     } } 

组装请求发送给zookeeper服务器,并返回响应报文:

public ReplyHeader submitRequest(RequestHeader h, Record request,  Record response, WatchRegistration watchRegistration,  WatchDeregistration watchDeregistration)  throws InterruptedException {     ReplyHeader r = new ReplyHeader();     Packet packet = queuePacket(h, r, request, response, null, null, null,      null, watchRegistration, watchDeregistration);     synchronized (packet) {  while (!packet.finished) {      packet.wait();  }     }     return r; } 

请求进入队列,等待处理:

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,   Record response, AsyncCallback cb, String clientPath,   String serverPath, Object ctx, WatchRegistration watchRegistration,   WatchDeregistration watchDeregistration) {  Packet packet = null;  // Note that we do not generate the Xid for the packet yet. It is  // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),  // where the packet is actually sent.  packet = new Packet(h, r, request, response, watchRegistration);  packet.cb = cb;  packet.ctx = ctx;  packet.clientPath = clientPath;  packet.serverPath = serverPath;  packet.watchDeregistration = watchDeregistration;  // The synchronized block here is for two purpose:  // 1. synchronize with the final cleanup() in SendThread.run() to avoid race  // 2. synchronized against each packet. So if a closeSession packet is added,  // later packet will be notified.  synchronized (state) {   if (!state.isAlive() || closing) {    conLossPacket(packet);   } else {    // If the client is asking to close the session then    // mark as closing    if (h.getType() == OpCode.closeSession) {     closing = true;    }    outgoingQueue.add(packet);   }  }  sendThread.getClientCnxnSocket().packetAdded();  return packet; } 

参考文献:

【1】http://www.wuzesheng.com/?p=2609

正文到此结束
Loading...