转载

MCS Lock

回顾

上回 我们介绍了Ticket Lock算法,和传统的简单的CAS操作来实现spin lock相比,它提供了不少很好的性质:比如FIFO、没有饥饿等等。

但是根据 论文 ,ticket lock的scalability很差。我们不妨简单回顾一下(代码片段也摘自该论文):

struct spinlock_t
{
  int current_ticket ;
  int next_ticket ;
}
void spin_lock (spinlock_t *lock)
{
  int t = atomic_fetch_and_inc(&lock -> next_ticket );
  while (t != lock->current_ticket )
  ; /* spin */
}
void spin_unlock (spinlock_t *lock)
{
  lock->current_ticket++;
}

正如论文中指出的那样,当某个core上持有锁的线程释放锁时,它将把其他core上的 current_ticket 对应的cacheline都invalid掉,之前所有等锁的线程都会通过bus来读取最新的cacheline,这将造成“拥堵”(也就是所谓的bus traffic)。由于在绝大多数体系结构下,这些读取请求都会被串行化,one by one的处理,因此 spin_lock 所需时间将正比于等待锁的线程数目。

本质上,这里的问题还是在于资源共享:所有等锁线程都spin在同一个全局变量上。如果每个线程仅仅spin在本地变量(该线程私有的变量),那么将有效的提高scalability。本文要介绍的MCS Lock就是这样的思路。

实现

这次,我们先给出实现(环境为:GCC 4.8 + X86体系结构 + Ubuntu 14.04 32bit系统),然后再讲解算法。

#include<pthread.h> //just for test demo.
#define CPU_BARRIER() __sync_synchronize()
#define CPU_RELAX() __asm__ __volatile__("pause/n": : :"memory")
#define CAS(address, exp, target) __sync_bool_compare_and_swap(address, exp, target)
#define ATOMIC_EXCHANGE(address, val) __atomic_exchange_n(address, val, __ATOMIC_SEQ_CST)
//=========================================
struct MCSLockNode
{
  volatile MCSLockNode *volatile next;
  volatile int spin;
};

typedef MCSLockNode MCSLock;
static void lock(MCSLock **m, MCSLockNode *me)
{
  me->next = NULL;
  me->spin = 0;
  MCSLockNode *tail = ATOMIC_EXCHANGE(m, me);
  if (!tail) {
    return;
  }
  tail->next = me;
  CPU_BARRIER();
  while (!me->spin) {
    CPU_RELAX();
  }
  return;
}

static void unlock(MCSLock **m, MCSLockNode *me)
{
  if (!me->next) {
    if (CAS(m, me, NULL)) {
      return;
    }
    while (!me->next) {
      CPU_RELAX();
    }
  }
  /*It holds that me->next != NULL here */
  me->next->spin = 1;
}

算法

MCS Lock算法维护一个队列(队尾指针tail)。

lock:尝试加锁的线程创建一个节点,并将其中的 spin 域设置为0。为0表示不是锁的持有者(lock holder),接着通过原子操作 ATOMIC_EXCHANGE ,将自己插入队列中,并获得插入前队尾指针tail。如果tail指针为空,说明当前锁是空闲的,没有被任何线程占用,因此获得锁成功;如果tail指针不空,那么设置自己的前驱,然后自旋,等待它的前驱(线程)将自己的 spin 域置为1。 spin 域为1,那么该线程就变成锁的持有者了。

注意到23行的cpu级别的memory barrier。因为这里我们需要写 tail->next ,然后读 me->spin ,这两个是不同的变量,而在X86下,写读不同变量可能会被CPU乱序,而一旦乱序,我们的实现就不正确了。

unlock:释放锁的线程需要检查是否存在后继,如果存在后继,那么将它的后继的 spin 域设置为1即可,后继将结束spin,成功获得锁。一切搞定,如41行代码所示。

如果不存在后继,那么这里需要特别特别注意。不存在后继有两种可能:

它是最后一个线程。33-35行处理的就是这种情况。那么这时候只需要把队列置为空即可。

它不是最后一个线程。那么此时存在另外一个线程在进行加锁操作,尝试加锁的线程已经通过18-21行设置了新的队尾指针,但是还没设置它的前驱,也就是还没执行到22行。因此释放锁的线程需要等待,等待22行被其他线程执行,等待它被人设置为别人的前驱。这也就是36-38行所做的事。

简单说来,MCS Lock就是将线程们用队列管理起来;加锁的线程spin在自己的变量上;释放锁的线程只更新它后继的变量。

练习

1,MCS Lock的优点是显而易见的,因为每个等待锁的线程都spin在自己的局部的变量上。以至于最新版的linux内核已经开始采用它,来代替之前的ticket lock实现。那么,它的缺点呢?

2,列出表格,从公平性、是否出现饥饿、时间空间复杂度等方面,全面对比ticket lock、mcs lock、peterson lock等。

3,编写程序测试这几个算法的性能,考察它们的scalability。

附录

MCS Lock测试程序Demo

MCSLock *global_lock = NULL;
void* t1(void *arg)
{
  MCSLockNode mcs_node;
  for(int i = 0; i < 1000000; i++) {
    lock(&global_lock, &mcs_node);
    unlock(&global_lock, &mcs_node);
  }
  return NULL;
}
void* t2(void *arg)
{
  MCSLockNode mcs_node;
  for(int i = 0; i < 1000000; i++) {
    lock(&global_lock, &mcs_node);
    unlock(&global_lock, &mcs_node);
  }
  return NULL;
}
int main()
{
  pthread_t pthread1;
  pthread_t pthread2;
  pthread_create(&pthread1, NULL, t1, NULL);
  pthread_create(&pthread2, NULL, t2, NULL);
  pthread_join(pthread1, NULL);
  pthread_join(pthread2, NULL);
  return 0;
}

原文  http://www.yebangyu.org/blog/2016/08/21/mcslock/
正文到此结束
Loading...