在多线程情况下访问资源,我们需要加锁来保证业务的正常进行,JDK中提供了很多并发控制相关的工具包,来保证多线程下可以高效工作,同样在分布式环境下,有些互斥操作我们可以借助分布式锁来实现两个操作不能同时运行,必须等到另外一个任务结束了把锁释放了才能获取锁然后执行,因为跨JVM我们需要一个第三方系统来协助实现分布式锁,一般我们可以用 数据库,redis,zookeeper,etcd等来实现.
要实现一把分布式锁,我们需要先分析下这把锁有哪些特性
1.在分布式集群中,也就是不同的JVM中,相互有冲突的方法,可以是不同JVM相同实例内的同一个方法,也可以是不同方法,也就是不同业务间的隔离和同一个业务操作不能并行运行,而分布式锁需要保证这两个方法在同一时间只能有一个运行.
2.这把锁最好是可重入的,因为不可重入的锁很容易出现死锁
3.获取锁和释放锁的性能要很高
4.支持获取锁的时候可以阻塞等待,以及等待时间
5.获取锁后支持设置一个期限,超过这个期限可以自动释放,防止程序没有自己释放的情况
6.这是一把轻量锁,对业务侵入小
7.易用
由于数据库的锁无能是在性能高可用上都不及其他方式,这里我们简单介绍下可能的方案
SET resource_name my_random_value NX PX 30000 复制代码
我们可以看看下面获取分布式锁的使用场景,假设我们释放锁,直接del这个key
if (!redisComponent.acquireLock(lockKey) { LOGGER.warn(">>分布式并发锁获取失败"); return ; } try { // do business ... } catch (BusinessException e) { // exception handler ... } finally { redisComponent.releaseLock(lockKey); } 复制代码
为了解决以上问题,我们可以在释放锁的时候,判断下锁是否存在,这样进程A在释放锁的时候就不会将进程B加的锁释放了, 或者通过以下方式,将过期时间做为value存储在对应的key中,释放锁的时候,判断当前时间是否小于过期时间,只有小于当前时间才处理,我们也可以在进行del操作的时候判断下对应的value是否相等,这个时候就需要在del操作的时候传人 my_random_value
下面我们看下redis实现分布式锁java代码实现,我们采用在del的时候判断下当前时间是否小于过期时间
public boolean acquireLock(String lockKey, long expired) { ShardedJedis jedis = null; try { jedis = pool.getResource(); String value = String.valueOf(System.currentTimeMillis() + expired + 1); int tryTimes = 0; while (tryTimes++ < 3) { /* * 1. 尝试锁 * setnx : set if not exist */ if (jedis.setnx(lockKey, value).equals(1L)) { return true; } /* * 2. 已经被别的线程锁住,判断是否失效 */ String oldValue = jedis.get(lockKey); if (StringUtils.isBlank(oldValue)) { /* * 2.1 value存的是超时时间,如果为空有2种情况 * 1. 异常数据,没有value 或者 value为空字符 * 2. 锁恰好被别的线程释放了 * 此时需要尝试重新尝试,为了避免出现情况1时导致死循环,只重试3次 */ continue; } Long oldValueL = Long.valueOf(oldValue); if (oldValueL < System.currentTimeMillis()) { /* * 已超时,重新尝试锁 * * Redis:getSet 操作步骤: * 1.获取 Key 对应的 Value 作为返回值,不存在时返回null * 2.设置 Key 对应的 Value 为传入的值 * 这里如果返回的 getValue != oldValue 表示已经被其它线程重新修改了 */ String getValue = jedis.getSet(lockKey, value); return oldValue.equals(getValue); } else { // 未超时,则直接返回失败 return false; } } return false; } catch (Throwable e) { logger.error("acquireLock error", e); return false; } finally { returnResource(jedis); } } /** * 释放锁 * * @param lockKey * key */ public void releaseLock(String lockKey) { ShardedJedis jedis = null; try { jedis = pool.getResource(); long current = System.currentTimeMillis(); // 避免删除非自己获取到的锁 String value = jedis.get(lockKey); if (StringUtils.isNotBlank(value) && current < Long.valueOf(value)) { jedis.del(lockKey); } } catch (Throwable e) { logger.error("releaseLock error", e); } finally { returnResource(jedis); } } 复制代码
这种方式没有用到刚刚说的my_random_value,我们看下如果我们按以下代码获取锁会有什么问题
if (!redisComponent.acquireLock(lockKey) { LOGGER.warn(">>分布式并发锁获取失败"); return ; } try { boolean locked = redisComponent.acquireLock(lockKey); if(locked) // do business ... } catch (BusinessException e) { // exception handler ... } finally { redisComponent.releaseLock(lockKey); } 复制代码
同样这种方式当进程A没有获取到锁,之后进程B获取到锁,进程A会释放进程B的锁,这个时候我们可以借助my_random_value来实现
/** * 释放锁 * * @param lockKey ,value */ public void releaseLock(String lockKey, long oldvalue) { ShardedJedis jedis = null; try { jedis = pool.getResource(); String value = jedis.get(lockKey); if (StringUtils.isNotBlank(value) && oldvalue == Long.valueOf(value)) { jedis.del(lockKey); } } catch (Throwable e) { logger.error("releaseLock error", e); } finally { returnResource(jedis); } } 复制代码
这种方式需要保存之前获取锁时候的value值,并在释放锁的带上value值,不过这种实现方式,value的值为过期时间也不唯一
由于我们用了redis得超时机制来释放锁,那么当进程在锁租约到期后还没有执行结束,那么其他进程获取到锁后则会产生并发写的情况,这种如果业务上需要精确控制,只能用乐观锁来控制了,每次写入数据都带一个锁的版本,如果下次获取锁的时候版本加1,这样上面那种情况,锁到期释放了新的进程获取到锁后会使用新的版本号,之前的进程锁已经释放了如果继续使用该锁则会发现版本已经不对了
可以借助zookeeper的顺序节点,在一个父节点下,所有需要争抢锁的资源都去这个目录下创建一个顺序节点,然后判断这个临时顺序节点是否是兄弟节点中顺序最小的,如果是最小的则获取到锁,如果不是则监听这个顺序最小的节点的删除事件,然后在继续根据这个流程获取最小节点
public void lock() { try { // 创建临时子节点 String myNode = zk.create(root + "/" + lockName , data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(j.join(Thread.currentThread().getName() + myNode, "created")); // 取出所有子节点 List<String> subNodes = zk.getChildren(root, false); TreeSet<String> sortedNodes = new TreeSet<>(); for(String node :subNodes) { sortedNodes.add(root +"/" +node); } String smallNode = sortedNodes.first(); String preNode = sortedNodes.lower(myNode); if (myNode.equals( smallNode)) { // 如果是最小的节点,则表示取得锁 System.out.println(j.join(Thread.currentThread().getName(), myNode, "get lock")); this.nodeId.set(myNode); return; } CountDownLatch latch = new CountDownLatch(1); Stat stat = zk.exists(preNode, new LockWatcher(latch));// 同时注册监听。 // 判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听 if (stat != null) { System.out.println(j.join(Thread.currentThread().getName(), myNode, " waiting for " + root + "/" + preNode + " released lock")); latch.await();// 等待,这里应该一直等待其他线程释放锁 nodeId.set(myNode); latch = null; } } catch (Exception e) { throw new RuntimeException(e); } } public void unlock() { try { System.out.println(j.join(Thread.currentThread().getName(), nodeId.get(), "unlock ")); if (null != nodeId) { zk.delete(nodeId.get(), -1); } nodeId.remove(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } 复制代码
当然如果我们开发环境使用的是etcs也可以用etcd来实现分布式锁,原理和zookeeper类似