转载

分布式锁解决方案

在多线程的软件世界里,对共享资源的争抢过程(Data Race)就是并发,而对共享资源数据进行访问保护的最直接办法就是引入锁。

POSIX threads(简称Pthreads)是在多核平台上进行并行编程的一套常用的API。线程同步(Thread Synchronization)是并行编程中非常重要的通讯手段,其中最典型的应用就是用Pthreads提供的锁机制(lock)来对多个线程之间共 享的临界区(Critical Section)进行保护(另一种常用的同步机制是barrier)。

无锁编程也是一种办法,但它不在本文的讨论范围,并发多线程转为单线程(Disruptor),函数式编程,锁粒度控制(ConcurrentHashMap桶),信号量(Semaphore)等手段都可以实现无锁或锁优化。

技术上来说,锁也可以理解成将大量并发请求串行化,但请注意串行化不能简单等同为** 排队 ,因为这里和现实世界没什么不同,排队意味着大家是公平Fair的领到资源,先到先得,然而很多情况下为了性能考量多线程之间还是会不公平Unfair**的去抢。Java中ReentrantLock可重入锁,提供了公平锁和非公平锁两种实现。

再注意一点,串行也不是意味着只有一个排队的队伍,每次只能进一个。当然可以好多个队伍,每次进入多个。比如餐馆一共10个餐桌,服务员可能一次放行最多10个人进去,有人出来再放行同数量的人进去。Java中Semaphore信号量,相当于同时管理一批锁。

锁的类型

自旋锁(Spin Lock)

自旋锁是一种非阻塞锁,也就是说,如果某线程需要获取自旋锁,但该锁已经被其他线程占用时,该线程不会被挂起,而是在不断的消耗CPU的时间,不停的试图获取自旋锁。

互斥锁 (Mutex Lock)

互斥锁是阻塞锁,当某线程无法获取互斥锁时,该线程会被直接挂起,不再消耗CPU时间,当其他线程释放互斥锁后,操作系统会唤醒那个被挂起的线程。

可重入锁 (Reentrant Lock)

可重入锁是一种特殊的互斥锁,它可以被同一个线程多次获取,而不会产生死锁。

锁举例

本地锁

java环境下可以通过synchronized和lock开实现本地锁。

//synchronized

    public synchronized void demoMethod(){}
    
    public void demoMethod(){
        synchronized (this)
        {
            //other thread safe code
        }
    }

    private final Object lock = new Object();
    public void demoMethod(){
        synchronized (lock)
        {
            //other thread safe code
        }
    }

    public synchronized static void demoMethod(){}

//lock

   private final Lock queueLock = new ReentrantLock();
 
   public void printJob(Object document)
   {
      queueLock.lock();
      try
      {
         Long duration = (long) (Math.random() * 10000);
         System.out.println(Thread.currentThread().getName() + ": PrintQueue: Printing a Job during " + (duration / 1000) + " seconds :: Time - " + new Date());
         Thread.sleep(duration);
      } catch (InterruptedException e)
      {
         e.printStackTrace();
      } finally
      {
         System.out.printf("%s: The document has been printed/n", Thread.currentThread().getName());
         queueLock.unlock();
      }
   }
复制代码

锁非静态是锁了对象的实例;锁静态是锁了对象的类型。

一些特性

  • 可重入。如下可以直接进入testWrite方法不用重新申请锁。synchronized和lock都是可重入锁。
synchronized void testRead(){
        this.testWrite();
    }
    synchronized void testWrite(){}
复制代码
  • 可中断锁。例如A正在执行锁中的代码,另一线程B正在等待获取该锁如果B可以中断则该锁为可中断锁。synchronized就不是可中断锁,而Lock是可中断锁。
  • 公平锁和非公平锁。以请求锁的顺序来获取锁是公平锁。synchronized是非公平锁,lock默认是非公平锁,但是可以设置为公平锁。

对比

名称 优点 缺点
synchronized 实现简单,语义清晰,便于JVM堆栈跟踪,加锁解锁过程由JVM自动控制,提供了多种优化方案,使用更广泛 悲观的排他锁,不能进行高级功能
lock 可定时的、可轮询的与可中断的锁获取操作,提供了读写锁、公平锁和非公平锁 需手动释放锁unlock,不适合JVM进行堆栈跟踪

分布式锁

使用分布式锁的目的有两个,一个是避免多次执行幂等操作提升效率;一个是避免多个节点同时执行非幂等操作导致数据不一致。 接下来我们来看如何实现分布式锁,在java环境下有三种也即通过数据库,通过redis及通过Zk来实现。

通过数据库实现

通过主键及其他约束使用抛异常来实现分布式锁不在本文讨论范围。一下为基于数据库排他锁来实现分布式锁

/**
     * 超时获取锁
     * @param lockID
     * @param timeOuts
     * @return
     * @throws InterruptedException
     */
    public boolean acquireByUpdate(String lockID, long timeOuts) throws InterruptedException, SQLException {

        String sql = "SELECT id from test_lock where id = ? for UPDATE ";
        long futureTime = System.currentTimeMillis() + timeOuts;
        long ranmain = timeOuts;
        long timerange = 500;
        connection.setAutoCommit(false);
        while (true) {
            CountDownLatch latch = new CountDownLatch(1);
            try {
                PreparedStatement statement = connection.prepareStatement(sql);
                statement.setString(1, lockID);
                statement.setInt(2, 1);
                statement.setLong(1, System.currentTimeMillis());
                boolean ifsucess = statement.execute();//如果成功,那么就是获取到了锁
                if (ifsucess)
                    return true;
            } catch (SQLException e) {
                e.printStackTrace();
            }
            latch.await(timerange, TimeUnit.MILLISECONDS);
            ranmain = futureTime - System.currentTimeMillis();
            if (ranmain <= 0)
                break;
            if (ranmain < timerange) {
                timerange = ranmain;
            }
            continue;
        }
        return false;

    }


    /**
     * 释放锁
     * @param lockID
     * @return
     * @throws SQLException
     */
    public void unlockforUpdtate(String lockID) throws SQLException {
        connection.commit();

    }
复制代码

通过缓存系统实现

加锁

public class RedisTool {
 
    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";
 
    /**
     * 尝试获取分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @param expireTime 超期时间
     * @return 是否获取成功
     */
    public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
 
        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
 
        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;
 
    }
 
}
复制代码

第一个为key,我们使用key来当锁,因为key是唯一的。

第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。

第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;

第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。

第五个为time,与第四个参数相呼应,代表key的过期时间。

解锁

public class RedisTool {
 
    private static final Long RELEASE_SUCCESS = 1L;
 
    /**
     * 释放分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @return 是否释放成功
     */
    public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
 
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
 
        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;
 
    }
 
}
复制代码

第一行代码,我们写了一个简单的Lua脚本代码

第二行代码,我们将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKey,ARGV[1]赋值为requestId。eval()方法是将Lua代码交给Redis服务端执行。

基于Redlock实现分布式锁的争论见

Redlock

how-to-do-distributed-locking

通过ZK实现

使用curator来实现分布式锁。

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    try {
        return interProcessMutex.acquire(timeout, unit);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return true;
}
public boolean unlock() {
    try {
        interProcessMutex.release();
    } catch (Throwable e) {
        log.error(e.getMessage(), e);
    } finally {
        executorService.schedule(new Cleaner(client, path), delayTimeForClean, TimeUnit.MILLISECONDS);
    }
    return true;
}
复制代码
原文  https://juejin.im/post/5bf68ea4f265da612239fc65
正文到此结束
Loading...