微信公众号:房东的小黑黑
路途随遥远,将来更美好
学海无涯,大家一起加油!
在简单的单机系统中,当存在多个线程同时要修改某个共享变量时,为了数据的操作安全,往往需要通过加锁的方法,在同一时刻同一代码块只能有一个进程执行操作,存在很多加锁的方式,比如在java中有synchronize或Lock子类等。
但是在分布式中,会存在多个主机,即会存在多个jvm, 在jvm之间数据是不能共享的,上面的方法只能在一个jvm中执行有效,在多个jvm中同一变量可能会有不同的值。所以我们要设计一种跨jvm的共享互斥机制来控制共享变量资源的访问,这也是提出分布式锁的初衷。
为了将分布式锁实现较好的性能,我们需要解决下面几个重要的问题:
下面分别利用redis和zookeeper来实现加锁和解锁机制。
本版本通过变量sign设置锁的唯一标识,确保只有拥有该锁的客户端才能删除它,其他客户端不能删除。
利用阻塞锁的思想, 通过 while(System.currentTimeMillis() < endTime)
和 Thread.sleep()
相结合,在设置的规定时间内进行多次尝试。
但是 setnx
操作和 expire
分割开了,不具有 原子性 ,可能会出现问题。
比如说,在执行到 jedis.expire
时,可能系统发生了崩溃,导致锁没有设置过期时间,导致发生死锁。
public String addLockVersion1(String key, int blockTime, int expireTime) { if (blockTime <=0 || expireTime <= 0) return null; Jedis jedis = null; try { jedis = jedisPool.getResource(); String sign = UUID.randomUUID().toString(); String token = null; //设置阻塞尝试时间 long endTime = System.currentTimeMillis() + blockTime; while (System.currentTimeMillis() < endTime) { if (jedis.setnx(key, sign) == 1) { // 添加成功,设置锁的过期时间,防止死锁 jedis.expire(key, expireTime); // 在释放锁时用于验证 token = sign; return token; } //加锁失败,休眠一段时间,再进行尝试。 try { Thread.sleep(DEFAULT_SLEEP_TIME); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } catch (JedisException e) { e.printStackTrace(); } finally { if (jedis != null) jedis.close(); } return null; } 复制代码
通过设置key对应的value值为锁的过期时间,当遇到系统崩溃,致使利用 expire
设置锁过期时间失败时,通过获取value值,来判断当前锁是否过期,如果该锁已经过期了,则进行重新获取。
但是它也存在一些问题。当锁过期时,如果多个进程同时执行 jedis.getSet
方法,虽然只有一个进程可以获得该锁,但是这个进程的锁的过期时间可能被其他进程的锁所覆盖。
该锁没有设置唯一标识,也会被其他客户端锁释放,不满足只能被锁的拥有者锁释放的条件。
public boolean addLockVersion2(String key, int blockTime, int expireTime) { if (blockTime <=0 || expireTime <= 0) return false; Jedis jedis = null; try { jedis = jedisPool.getResource(); long endTime = System.currentTimeMillis() + blockTime; while (System.currentTimeMillis() < endTime) { long redisExpierTime = System.currentTimeMillis() + expireTime; if (jedis.setnx(key, redisExpierTime + "") == 1) { jedis.expire(key, expireTime); return true; } else { String oldRedisExpierTime = jedis.get(key); // 当锁设置成功,但是没有通过expire成功设置过期时间,但是根据存的值判断出它实际上已经过期了 if (oldRedisExpierTime != null && Long.parseLong(oldRedisExpierTime) < System.currentTimeMillis()) { String lastRedisExpierTime = jedis.getSet(key, System.currentTimeMillis() + blockTime + ""); //获取到该锁,没有被其他线程所修改 if (lastRedisExpierTime.equals(oldRedisExpierTime)) { jedis.expire(key, expireTime); return true; } } } //加锁失败,休眠一段时间,再进行尝试。 try { Thread.sleep(DEFAULT_SLEEP_TIME); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } catch (JedisException e) { e.printStackTrace(); } finally { if (jedis != null) { jedis.close(); } } return false; } 复制代码
具体通过 set
方法来实现 setnx
和 expire
的相加功能,实现了原子操作。
如果key不存在时,就进行加锁操作,并对锁设置一个有效期,同时uniqueId表示加锁的客户端;如果key存在,不做任何操作。
public boolean addLockVersion3(String key, String uniqueId, int blockTime, int expireTime) { Jedis jedis = null; try { long endTime = System.currentTimeMillis() + blockTime; while (System.currentTimeMillis() < endTime) { jedis = jedisPool.getResource(); String result = jedis.set(key, uniqueId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_STATE.equals(result)) return true; try { Thread.sleep(DEFAULT_SLEEP_TIME); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } return false; } catch (JedisException e) { e.printStackTrace(); } finally { if (jedis != null) jedis.close(); } return false; } 复制代码
为了使对同一个对象添加多次锁,并且不发生阻塞,即实现类似可重入锁,我们借鉴了 ReetrantLock
的思想,添加了变量 states
来控制。
public boolean addLockVersion4(String key, String uniqueId, int expireTime) { int state = states.get(); if (state > 1) { states.set(state+1); return true; } return doLock(key, uniqueId, expireTime); } private boolean doLock(String key, String uniqueId, int expireTime) { Jedis jedis = null; if (expireTime <= 0) return false; try { jedis = jedisPool.getResource(); String result = jedis.set(key, uniqueId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_STATE.equals(result)) states.set(states.get() + 1); return true; } catch (JedisException e) { e.printStackTrace(); } finally { if (jedis != null) jedis.close(); } return false; } 复制代码
从上面可知,利用 setnx
和 expire
实现加锁机制时因为不是原子操作,会产生一些问题,我们可用lua脚本来实现。
public boolean addLockVersion5(String key, String uniqueId, int expireTime) { Jedis jedis = null; try { jedis = jedisPool.getResource(); String luaScript = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" + "redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end"; List<String> keys = new ArrayList<>(); List<String> values = new ArrayList<>(); keys.add(key); values.add(uniqueId); values.add(String.valueOf(expireTime)); Object result = jedis.eval(luaScript, keys, values); if ((Long)result == 1L) return true; } catch (JedisException e) { e.printStackTrace(); } finally { if (jedis != null) { jedis.close(); } } return false; } 复制代码
在解锁时首先判断加速与解锁是否是同一个客户端,然后利用 del
方法进行删除。
但是会出现一些问题。
当方法执行到判断内部时,即将要执行 del
方法时,该锁已经过期了,并被其他的客户端所请求应有,此时执行 del
会造成锁的误删。
public boolean releaseLockVersion1(String key, String uniqueId) { Jedis jedis = null; try { jedis = jedisPool.getResource(); //加锁与解锁是否是同一个客户端 String lockId = jedis.get(key); if (lockId != null && lockId.equals(uniqueId)) { jedis.del(key); return true; } } catch (JedisException e) { e.printStackTrace(); } finally { if (jedis != null) jedis.close(); } return false; } 复制代码
从上面的分析来看,我们要确保删除的原子性,利用lua脚本可以保证一点。
在脚本语言里,KEYS[1]和ARGV[1]分别表示传入的key名和唯一标识符。
public boolean releaseLockVersion2(String key, String uniqueId) { String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Jedis jedis = null; Object result = null; try{ jedis = jedisPool.getResource(); result = jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(uniqueId)); if ((Long)result == 1) return true; } catch (JedisException e) { e.printStackTrace(); } finally { if (jedis != null) jedis.close(); } return false; } 复制代码
在利用可重入锁思想时,只有当 states=1
时才能被释放,大于0时,只能进行减1操作。
public boolean releaseLockVersion3(String key, String uniqueId) { int state = states.get(); if (state > 1) { states.set(states.get() - 1); return false; } return this.doRelease(key, uniqueId); } private boolean doRelease(String key, String uniqueId) { String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Jedis jedis = null; Object result = null; try{ jedis = jedisPool.getResource(); result = jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(uniqueId)); if ((Long)result == 1) return true; } catch (JedisException e) { e.printStackTrace(); } finally { states.set(0); if (jedis != null) jedis.close(); } return false; } 复制代码
Zookeeper提供一个多层次的节点命名空间,每个节点都用一个以斜杠(/)分割的路径表示,
而且每个节点都有父节点(根节点除外),非常类似于文件系统。
public class zklock { private ZkClient zkClient; private String name; private String currentLockPath; private CountDownLatch countDownLatch; private static final String PATENT_LOCK_PATH = "distribute_lock"; private static final int MAX_RETEY_TIMES = 3; private static final int DEFAULT_WAIT_TIME = 3; public zklock(ZkClient zkClient, String name) { this.zkClient = zkClient; this.name = name; } public void addLock() { if (!zkClient.exists(PATENT_LOCK_PATH)) { zkClient.createPersistent(PATENT_LOCK_PATH); } int count = 0; boolean iscompleted = false; while (!iscompleted) { iscompleted = true; try { //创建当前目录下的临时有序节点 currentLockPath = zkClient.createEphemeralSequential(PATENT_LOCK_PATH + "/", System.currentTimeMillis()); } catch (Exception e) { if (count++ < MAX_RETEY_TIMES) { iscompleted = false; } else throw e; } } } public void releaseLock() { zkClient.delete(currentLockPath); } //检查是否是最小的节点 private boolean checkMinNode(String localPath) { List<String> children = zkClient.getChildren(PATENT_LOCK_PATH); Collections.sort(children); int index = children.indexOf(localPath.substring(PATENT_LOCK_PATH.length()+1)); if (index == 0) { if (countDownLatch != null) { countDownLatch.countDown(); } return true; } else { String waitPath = PATENT_LOCK_PATH + "/" + children.get(index-1); waitForLock(waitPath, false); return false; } } //监听有序序列中的前一个节点 private void waitForLock(String waitPath, boolean useTime) { countDownLatch = new CountDownLatch(1); zkClient.subscribeDataChanges(waitPath, new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { } @Override public void handleDataDeleted(String s) throws Exception { checkMinNode(currentLockPath); } }); if (!zkClient.exists(waitPath)) { return; } try { if (useTime == true) countDownLatch.await(DEFAULT_WAIT_TIME, TimeUnit.SECONDS); else countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch = null; } } 复制代码
set
方法创建key, 因为Redis的key是唯一的,谁先创建成功,谁能够先获得锁。 redis分布式锁,其实需要自己不断去尝试获取锁,比较消耗性能。
zk分布式锁,获取不到锁,注册个监听器即可,不需要不断主动尝试获取锁,性能开销较小。
另外一点就是,如果是redis获取锁的那个客户端bug了或者挂了,那么只能等待超时时间之后才能释放锁;而zk的话,因为创建的是临时znode,只要客户端挂了,znode就没了,此时就自动释放锁。