正文
Redission分布式锁源码解析
顶
原
荐
字数 1267
阅读 185
收藏 10
点赞 0
评论 0
Redis Lua Java
【活动】决战应用运维 性能之巅 赢华为荣耀V10 >>>
Redission锁继承Implements Reentrant Lock,所以具备 Reentrant Lock 锁中的一些特性:超时,重试,可中断等。加上Redission中Redis具备分布式的特性,所以非常适合用来做Java中的分布式锁。 下面我们对其加锁、解锁过程中的源码细节进行一一分析。
锁的接口定义了一下方法:
分布式锁当中加锁,我们常用的加锁接口:
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
下面我们来看一下方法的具体实现:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() { @Override public void operationComplete(Future<RedissonLockEntry> future) throws Exception { if (subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); } } }); } acquireFailed(threadId); return false; } try { time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
首先我们看到调用tryAcquire尝试获取锁,在这里是否能获取到锁,是根据锁名称的过期时间TTL来判定的(TTL<=0:则说明该锁不存在或者已经超时,此时获取锁成功。TTL>0:则说明该锁被其他现成持有,此时获取锁失败);
下面我们接着看一下tryAcquire的实现:
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); }
可以看到真正获取锁的操作经过一层get操作里面执行的,这里为何要这么操作,本人也不是太理解,如有理解错误,欢迎指正。
get 是由CommandAsyncExecutor(一个线程Executor)封装的一个Executor
设置一个单线程的同步控制器CountDownLatch,用于控制单个线程的中断信息。个人理解经过中间的这么一步:主要是为了支持线程可中断操作。
public <V> V get(RFuture<V> future) { if (!future.isDone()) { final CountDownLatch l = new CountDownLatch(1); future.addListener(new FutureListener<V>() { @Override public void operationComplete(Future<V> future) throws Exception { l.countDown(); } }); boolean interrupted = false; while (!future.isDone()) { try { l.await(); } catch (InterruptedException e) { interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } } // commented out due to blocking issues up to 200 ms per minute for each thread:由于每个线程的阻塞问题,每分钟高达200毫秒 // future.awaitUninterruptibly(); if (future.isSuccess()) { return future.getNow(); } throw convertException(future); }
我们进一步往下看:
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future) throws Exception { if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
首先判断锁是否有超时时间,有过期时间的话,会在后面获取锁的时候设置进去。没有过期时间的话,则会用默认的
private long lockWatchdogTimeout = 30 * 1000;
下面我们在进一步往下分析真正获取锁的操作:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
我把里面的重点信息做了以下三点总结:
1:真正执行的是一段具有原子性的Lua脚本,并且最终也是由CommandAsynExecutor去执行。
2:锁真正持久化到Redis时,用的hash类型key field value
3:获取锁的三个参数:getName()是逻辑锁名称,例如:分布式锁要锁住的methodName+params;internalLockLeaseTime是毫秒单位的锁过期时间;getLockName则是锁对应的线程级别的名称,因为支持相同线程可重入,不同线程不可重入,所以这里的锁的生成方式是:UUID+":"threadId。有的同学可能会问,这样不是很缜密:不同的JVM可能会生成相同的threadId,所以Redission这里加了一个区分度很高的UUID;
Lua脚本中的执行分为以下三步:
1:exists检查redis中是否存在锁名称;如果不存在,则获取成功;同时把逻辑锁名称KEYS[1],线程级别的锁名称[ARGV[2],value=1,设置到redis。并设置逻辑锁名称的过期时间ARGV[2],返回;
2:如果检查到存在KEYS[1],[ARGV[2],则说明获取成功,此时会自增对应的value值,记录重入次数;并更新锁的过期时间
3:key不存,直接返回key的剩余过期时间(-2)
锁获取失败、解锁过程后在后面的文章继续补充
© 著作权归作者所有
共有人打赏支持
粉丝 5
博文 34
码字总数 16788
作品 0
杭州
相关文章 最新文章
一种简单的,支持不同方案的高性能分布式锁 简介 lock4j-spring-boot-starter是一个分布式锁组件,其提供了多种不同的支持以满足不同性能和环境的需求。 立志打造一个简单但富有内涵的分布式...
小锅盖
昨天
0
0
前言:分布式环境有很多问题,比如你前一个请求访问的是服务器A,第二个请求访问到了服务器B,就会发生并发重复插入问题,这个时候需要依赖单点来实现分布锁,而redis就是。 先导入maven依赖...
王念博客
2016/05/02
2.9K
4
转载请标明出处: http://blog.csdn.net/forezp/article/details/70148833 本文出自方志朋的博客 错过了这一篇,你可能再也学不会 Spring Cloud 了!Spring Boot做为下一代 web 框架,Sprin...
forezp
2017/04/12
0
0
Gecco 1.1.3 发布了,该版本改进内容包括: 处理jsonp的时候,分号问题修复 2.支持自定义下载重试次数定义,GeccoEngine.retry(count) 3.HttpClientDownloader支持response的Set Cookie自动存...
xtuhcy
2016/05/31
2.3K
9
[TOC] 分布式锁实现汇总 很多时候我们需要保证同一时间一个方法只能被同一个线程调用,在单机环境中,Java中其实提供了很多并发处理相关的API,但是这些API在分布式场景中就无能为力了。也就...
Wang_Coder
2017/12/01
0
0
Gecco是什么 Gecco是一款用java语言开发的轻量化的易用的网络爬虫。Gecco整合了jsoup、httpclient、fastjson、spring、htmlunit、redission等优秀框架,让您只需要配置一些jquery风格的选择器...
xtuhcy
2016/02/19
12.2K
21
存储过程高级篇 讲解了一些存储过程的高级特性,包括 cursor、schema、控制语句、事务等。 数据库索引与事务管理 本篇文章为对数据库知识的查缺补漏,从索引,事务管理,存储过程,触发器,一...
掘金官方
01/04
0
0
快工作快三年了,本科学历。做个两个java项目,第一个是做产品,第二个为平台框架的。 我在项目中使用的一些技术: 前端jsp,freemarker,sitemesh都懂一些,也能做出东西。但是兴趣不是很大。...
只是会java
2013/08/03
695
10
一、锁的发展 系统结构由传统的“单应用服务--》SOA --》微服务 --》无服务器” 的演进过程中,场景越来越复杂,由单体应用的但进程中多线程并发的内存锁,随着互联网场景越来越复杂,在复杂...
贾浩v
2017/10/24
0
0
学习地址:http://www.roncoo.com/course/view/e2b459016e2e477dbd5d67c8b23fe86d 课程介绍 Quartz是OpenSymphony开源组织在Job scheduling领域又一个开源项目,它可以与J2EE与J2SE应用程序相...
小红牛
04/19
0
0
没有更多内容
加载失败,请刷新页面
加载更多下一页
#安装源yum install http://opensource.wandisco.com/centos/7/git/x86_64/wandisco-git-release-7-2.noarch.rpm#安装gityum install git#更新gityum update git...
TonyStarkSir
59分钟前
0
0
大O符号 运行时间是评判一个算法的指标,运行时间由算法的执行次数决定。大O符号就是反映出算法的执行次数,即,反应算法的运行时间。如下图所示: 常见的大O运行时间
刀锋
今天
0
0
Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @莱布妮子:分享Nana的单曲《Lonely》 《Lonely》- Nana 手机党少年们想听歌,请使劲儿戳(这里) @Re-Inspired :每天早上起床的我,一眼万年...
小小编辑
今天
49
7
执行定期备份并有一整套恢复计划是一个DBA的重要工作之一,这也是数据库可用性和完整性的保障。我们可以在多个机房之间部署流复制集群,以保证但节点故障,但是集群无法保证认为的意外DELET...
闻术苑
今天
0
0
mac下懒得找扫描工具,自己写了一个简单的文件路径如下: #文件结构-file文件夹-result文件夹-scan.py# -*- coding: utf-8 -*-import requestsimport timeimport osfrom threading i...
hirainn
今天
0
0
11.10/11.11/11.12 安装PHP5 11.13 安装PHP7 php中mysql,mysqli,mysqlnd,pdo到底是什么 http://blog.csdn.net/u013785951/article/details/60876816 查看编译参数 http://ask.apelearn.com/......
王鑫linux
今天
0
0
首先判断MP3文件中是否含有ID3V2的标签,关于ID3V2的格式有一堆的说法 我嘛,不怎么关心,因此只攻专辑图片,也就是判断是否包含APIC这个标识 找到这个标识其实也就是和解析普通文件一样,每...
会哭的鳄鱼
今天
0
0
最近,又临近博主所负责的一个大型项目的发版了。之前有提到过,该项目涉及到30-40台服务器的发版。且项目客户规定发版需在晚上10-11点左右开始进行,这里博主不得不说每次发布最后都是眼花缭...
em_aaron
今天
0
0
单链表的查找和删除以及显示 正确使用指针 class Link{public int iData;public double dData;public Link next;public Link(int id,double dd){iData = id;dData =...
沉迷于编程的小菜菜
今天
1
0
介绍 意图:避免请求发送者与接收者耦合在一起,让多个对象都有可能接收请求,将这些对象连接成一条链,并且沿着这条链传递请求,直到有对象处理它为止。 主要解决:职责链上的处理者负责处理...
巨輪
昨天
1
0
没有更多内容
加载失败,请刷新页面
加载更多下一页