在多线程的软件世界里,对共享资源的争抢过程(Data Race)就是并发,而对共享资源数据进行访问保护的最直接办法就是引入锁。
POSIX threads(简称Pthreads)是在多核平台上进行并行编程的一套常用的API。线程同步(Thread Synchronization)是并行编程中非常重要的通讯手段,其中最典型的应用就是用Pthreads提供的锁机制(lock)来对多个线程之间共 享的临界区(Critical Section)进行保护(另一种常用的同步机制是barrier)。
无锁编程也是一种办法,但它不在本文的讨论范围,并发多线程转为单线程(Disruptor),函数式编程,锁粒度控制(ConcurrentHashMap桶),信号量(Semaphore)等手段都可以实现无锁或锁优化。
技术上来说,锁也可以理解成将大量并发请求串行化,但请注意串行化不能简单等同为** 排队 ,因为这里和现实世界没什么不同,排队意味着大家是公平Fair的领到资源,先到先得,然而很多情况下为了性能考量多线程之间还是会不公平Unfair**的去抢。Java中ReentrantLock可重入锁,提供了公平锁和非公平锁两种实现。
再注意一点,串行也不是意味着只有一个排队的队伍,每次只能进一个。当然可以好多个队伍,每次进入多个。比如餐馆一共10个餐桌,服务员可能一次放行最多10个人进去,有人出来再放行同数量的人进去。Java中Semaphore信号量,相当于同时管理一批锁。
自旋锁是一种非阻塞锁,也就是说,如果某线程需要获取自旋锁,但该锁已经被其他线程占用时,该线程不会被挂起,而是在不断的消耗CPU的时间,不停的试图获取自旋锁。
互斥锁是阻塞锁,当某线程无法获取互斥锁时,该线程会被直接挂起,不再消耗CPU时间,当其他线程释放互斥锁后,操作系统会唤醒那个被挂起的线程。
可重入锁是一种特殊的互斥锁,它可以被同一个线程多次获取,而不会产生死锁。
java环境下可以通过synchronized和lock开实现本地锁。
//synchronized public synchronized void demoMethod(){} public void demoMethod(){ synchronized (this) { //other thread safe code } } private final Object lock = new Object(); public void demoMethod(){ synchronized (lock) { //other thread safe code } } public synchronized static void demoMethod(){} //lock private final Lock queueLock = new ReentrantLock(); public void printJob(Object document) { queueLock.lock(); try { Long duration = (long) (Math.random() * 10000); System.out.println(Thread.currentThread().getName() + ": PrintQueue: Printing a Job during " + (duration / 1000) + " seconds :: Time - " + new Date()); Thread.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.printf("%s: The document has been printed/n", Thread.currentThread().getName()); queueLock.unlock(); } } 复制代码
锁非静态是锁了对象的实例;锁静态是锁了对象的类型。
synchronized void testRead(){ this.testWrite(); } synchronized void testWrite(){} 复制代码
名称 | 优点 | 缺点 |
---|---|---|
synchronized | 实现简单,语义清晰,便于JVM堆栈跟踪,加锁解锁过程由JVM自动控制,提供了多种优化方案,使用更广泛 | 悲观的排他锁,不能进行高级功能 |
lock | 可定时的、可轮询的与可中断的锁获取操作,提供了读写锁、公平锁和非公平锁 | 需手动释放锁unlock,不适合JVM进行堆栈跟踪 |
使用分布式锁的目的有两个,一个是避免多次执行幂等操作提升效率;一个是避免多个节点同时执行非幂等操作导致数据不一致。 接下来我们来看如何实现分布式锁,在java环境下有三种也即通过数据库,通过redis及通过Zk来实现。
通过主键及其他约束使用抛异常来实现分布式锁不在本文讨论范围。一下为基于数据库排他锁来实现分布式锁
/** * 超时获取锁 * @param lockID * @param timeOuts * @return * @throws InterruptedException */ public boolean acquireByUpdate(String lockID, long timeOuts) throws InterruptedException, SQLException { String sql = "SELECT id from test_lock where id = ? for UPDATE "; long futureTime = System.currentTimeMillis() + timeOuts; long ranmain = timeOuts; long timerange = 500; connection.setAutoCommit(false); while (true) { CountDownLatch latch = new CountDownLatch(1); try { PreparedStatement statement = connection.prepareStatement(sql); statement.setString(1, lockID); statement.setInt(2, 1); statement.setLong(1, System.currentTimeMillis()); boolean ifsucess = statement.execute();//如果成功,那么就是获取到了锁 if (ifsucess) return true; } catch (SQLException e) { e.printStackTrace(); } latch.await(timerange, TimeUnit.MILLISECONDS); ranmain = futureTime - System.currentTimeMillis(); if (ranmain <= 0) break; if (ranmain < timerange) { timerange = ranmain; } continue; } return false; } /** * 释放锁 * @param lockID * @return * @throws SQLException */ public void unlockforUpdtate(String lockID) throws SQLException { connection.commit(); } 复制代码
加锁
public class RedisTool { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; /** * 尝试获取分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @param expireTime 超期时间 * @return 是否获取成功 */ public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) { String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } } 复制代码
第一个为key,我们使用key来当锁,因为key是唯一的。
第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。
第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。
第五个为time,与第四个参数相呼应,代表key的过期时间。
解锁
public class RedisTool { private static final Long RELEASE_SUCCESS = 1L; /** * 释放分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @return 是否释放成功 */ public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; } } 复制代码
第一行代码,我们写了一个简单的Lua脚本代码
第二行代码,我们将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKey,ARGV[1]赋值为requestId。eval()方法是将Lua代码交给Redis服务端执行。
基于Redlock实现分布式锁的争论见
Redlock
how-to-do-distributed-locking
使用curator来实现分布式锁。
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { try { return interProcessMutex.acquire(timeout, unit); } catch (Exception e) { e.printStackTrace(); } return true; } public boolean unlock() { try { interProcessMutex.release(); } catch (Throwable e) { log.error(e.getMessage(), e); } finally { executorService.schedule(new Cleaner(client, path), delayTimeForClean, TimeUnit.MILLISECONDS); } return true; } 复制代码