转载

基于Zookeeper的分布式共享锁

首先,说说我们的场景,订单服务是做成集群的,当两个以上结点同时收到一个相同订单的创建指令,这时并发就产生了,系统就会重复创建订单。等等......场景。这时,分布式共享锁就闪亮登场了。

共享锁在同一个进程中是很容易实现的,但在跨进程或者在不同Server之间就不好实现了。Zookeeper就很容易实现。具体的实现原理官网和其它网站也有翻译,这里就不在赘述了。

官网资料: http://zookeeper.apache.org/doc/r3.4.5/recipes.html

中文资料: https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper

详见Locks章节。

原理都知道了,网上一搜索Apache上面已经有提供了,既然已经有轮子了,哪我们也没必要重复造轮子了吧!直接使用 Curator 。但是,我们在测试中发现,用于共享锁的结点无法自动回收,除了最末一级的临时结点会在锁释放和session超时的时候能自动回收外,其它结点均无法自动回收。我们的订单一天有好几万,遇到618和双十一的时候每天的订单量超50W,如果结点长期不回收的话,肯定会影响Zookeeper的性能。这时,我们就想到了一句话“自己动手,丰衣足食”。下面直接上代码:

首先,创建一个Maven工程,在pom文件里导入下面的包:

<dependencies>   <dependency>    <groupId>org.apache.zookeeper</groupId>    <artifactId>zookeeper</artifactId>    <version>3.4.6</version>   </dependency>   <dependency>    <groupId>org.apache.curator</groupId>    <artifactId>curator-client</artifactId>    <version>2.8.0</version>   </dependency>   <dependency>    <groupId>org.apache.curator</groupId>    <artifactId>curator-recipes</artifactId>    <version>2.8.0</version>   </dependency>   <dependency>    <groupId>org.apache.curator</groupId>    <artifactId>curator-framework</artifactId>    <version>2.8.0</version>   </dependency>   <dependency>    <groupId>commons-beanutils</groupId>    <artifactId>commons-beanutils</artifactId>    <version>1.9.2</version>   </dependency>   <dependency>    <groupId>commons-logging</groupId>    <artifactId>commons-logging</artifactId>    <version>1.2</version>   </dependency>   <dependency>    <groupId>commons-lang</groupId>    <artifactId>commons-lang</artifactId>    <version>2.6</version>   </dependency>  </dependencies>

LockZookeeperClient接口:

package com.XXX.framework.lock;  import org.apache.curator.framework.CuratorFramework;  /**  *   * description  *   * @author Roadrunners  * @version 1.0, 2015年7月9日  */ public interface LockZookeeperClient {  /**   *    * @return   */  CuratorFramework getCuratorFramework();   /**   *    * @return   */  String getBasePath();   /**   * garbage collector   *    * @param gcPath   */  void gc(String gcPath); }

LockZookeeperClient接口的实现LockZookeeperClientFactory:

package com.XXX.framework.lock;  import java.util.Date; import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentSkipListSet;  import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry;  /**  *   * description  *    * @author Roadrunners  * @version 1.0, 2015年7月9日  */ public class LockZookeeperClientFactory implements LockZookeeperClient {  private static final Log LOG = LogFactory.getLog(LockZookeeperClientFactory.class);   private boolean hasGc = true;  private Timer gcTimer;  private TimerTask gcTimerTask;  private ConcurrentSkipListSet<String> gcPaths = new ConcurrentSkipListSet<String>();  private int gcIntervalSecond = 60;   private CuratorFramework curatorFramework;  private String zookeeperIpPort = "localhost:2181";  private int sessionTimeoutMs = 10000;  private int connectionTimeoutMs = 10000;  private String basePath = "/locks";   public void setHasGc(boolean hasGc) {   this.hasGc = hasGc;  }   public void setGcIntervalSecond(int gcIntervalSecond) {   this.gcIntervalSecond = gcIntervalSecond;  }   public void setZookeeperIpPort(String zookeeperIpPort) {   this.zookeeperIpPort = zookeeperIpPort;  }   public void setSessionTimeoutMs(int sessionTimeoutMs) {   this.sessionTimeoutMs = sessionTimeoutMs;  }   public void setConnectionTimeoutMs(int connectionTimeoutMs) {   this.connectionTimeoutMs = connectionTimeoutMs;  }   public void setBasePath(String basePath) {   basePath = basePath.trim();   if (basePath.endsWith("/")) {    basePath = basePath.substring(0, basePath.length() - 1);   }    this.basePath = basePath;  }   public void init() {   if(StringUtils.isBlank(zookeeperIpPort)){    throw new NullPointerException("zookeeperIpPort");   }    ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);   curatorFramework = CuratorFrameworkFactory.newClient(zookeeperIpPort.trim(), sessionTimeoutMs, connectionTimeoutMs, retryPolicy);   curatorFramework.start();   LOG.info("CuratorFramework initialise succeed.");    if (hasGc) {    gc();   }  }   public void destroy() {   gcPaths.clear();   gcPaths = null;   gcStop();   curatorFramework.close();   curatorFramework = null;  }   @Override  public void gc(String gcPath) {   if (hasGc && StringUtils.isNotBlank(gcPath)) {    gcPaths.add(gcPath.trim());   }  }   @Override  public CuratorFramework getCuratorFramework() {   return this.curatorFramework;  }   @Override  public String getBasePath() {   return this.basePath;  }   private synchronized void gc() {   gcStop();    try {    scanningGCNodes();   } catch (Throwable e) {    LOG.warn(e);   }    gcTimerTask = new TimerTask() {    @Override    public void run() {     doingGc();    }   };    Date begin = new Date();   begin.setTime(begin.getTime() + (10 * 1000L));   gcTimer = new Timer("lock-gc", true);   gcTimer.schedule(gcTimerTask, begin, gcIntervalSecond * 1000L);  }   private synchronized void gcStop() {   if (null != gcTimer) {    gcTimer.cancel();    gcTimer = null;   }    if (null != gcTimerTask) {    gcTimerTask.cancel();    gcTimerTask = null;   }  }   private synchronized void scanningGCNodes() throws Exception {   if (null == curatorFramework.checkExists().forPath(basePath)) {    return;   }    List<String> paths = curatorFramework.getChildren().forPath(basePath);   if (CollectionUtils.isEmpty(paths)) {    gcPaths.add(basePath);    return;   }    for (String path : paths) {    try{     String tmpPath = basePath + "/" + path;     if (null == curatorFramework.checkExists().forPath(tmpPath)) {      continue;     }          gcPaths.add(tmpPath);    } catch(Throwable e){     LOG.warn("scanning gc nodes error.", e);    }   }  }    private synchronized void doingGc() {   LOG.debug("GC beginning.");    if (CollectionUtils.isNotEmpty(gcPaths)) {    for (String path : gcPaths) {     try {      if (null != curatorFramework.checkExists().forPath(path)) {       if (CollectionUtils.isEmpty(curatorFramework.getChildren().forPath(path))) {        curatorFramework.delete().forPath(path);        gcPaths.remove(path);        LOG.debug("GC " + path);       }      } else {       gcPaths.remove(path);      }     } catch (Throwable e) {      gcPaths.remove(path);      LOG.warn(e);     }    }   }    LOG.debug("GC ended.");  }  }

SharedLock共享锁:

package com.XXX.framework.lock.shared;  import java.util.concurrent.TimeUnit;  import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessMutex;  import com.XXX.framework.lock.LockZookeeperClient;  /**  *   * description  *   * @author Roadrunners  * @version 1.0, 2015年7月9日  */ public class SharedLock {  private InterProcessLock interProcessLock;   public SharedLock(LockZookeeperClient lockZookeeperClient, String resourceId) {   super();      if (StringUtils.isBlank(resourceId)) {    throw new NullPointerException("resourceId");   }   String path = lockZookeeperClient.getBasePath();   path += ("/" + resourceId.trim());    interProcessLock = new InterProcessMutex(lockZookeeperClient.getCuratorFramework(), path);   lockZookeeperClient.gc(path);  }       /**      * Acquire the mutex - blocking until it's available. Each call to acquire must be balanced by a call      * to {@link #release()}      *      * @throws Exception ZK errors, connection interruptions      */  public void acquire() throws Exception {   interProcessLock.acquire();  }      /**      * Acquire the mutex - blocks until it's available or the given time expires. Each call to acquire that returns true must be balanced by a call      * to {@link #release()}      *      * @param time time to wait      * @param unit time unit      * @return true if the mutex was acquired, false if not      * @throws Exception ZK errors, connection interruptions      */  public boolean acquire(long time, TimeUnit unit) throws Exception {   return interProcessLock.acquire(time, unit);  }      /**      * Perform one release of the mutex.      *      * @throws Exception ZK errors, interruptions, current thread does not own the lock      */  public void release() throws Exception {   interProcessLock.release();  }       /**      * Returns true if the mutex is acquired by a thread in this JVM      *      * @return true/false      */  public boolean isAcquiredInThisProcess() {   return interProcessLock.isAcquiredInThisProcess();  } }

到此代码已经完成。下面写一个简单的Demo:

//LockZookeeperClientFactory通常是通过Spring配置注入的,此处是为了Demo的简单明了才这样写的,不建议这样写   LockZookeeperClientFactory lzc = new LockZookeeperClientFactory();   lzc.setZookeeperIpPort("10.100.15.1:8900");   lzc.setBasePath("/locks/sharedLock/");   lzc.init();    SharedLock sharedLock = new SharedLock(lzc, "sharedLock1");   try {    if (sharedLock.acquire(100, TimeUnit.MILLISECONDS)) {     System.out.println("sharedLock1 get");    }   } catch (Exception e) {    e.printStackTrace();   } finally {    try {     sharedLock.release();    } catch (Exception e) {     e.printStackTrace();    }   }    lzc.destroy();

就这样,系统就会每隔一分钟去回收一次没有使用的结点。

正文到此结束
Loading...