上回 我们介绍了Ticket Lock算法,和传统的简单的CAS操作来实现spin lock相比,它提供了不少很好的性质:比如FIFO、没有饥饿等等。
但是根据 论文 ,ticket lock的scalability很差。我们不妨简单回顾一下(代码片段也摘自该论文):
struct spinlock_t
{
int current_ticket ;
int next_ticket ;
}
void spin_lock (spinlock_t *lock)
{
int t = atomic_fetch_and_inc(&lock -> next_ticket );
while (t != lock->current_ticket )
; /* spin */
}
void spin_unlock (spinlock_t *lock)
{
lock->current_ticket++;
}
正如论文中指出的那样,当某个core上持有锁的线程释放锁时,它将把其他core上的 current_ticket
对应的cacheline都invalid掉,之前所有等锁的线程都会通过bus来读取最新的cacheline,这将造成“拥堵”(也就是所谓的bus traffic)。由于在绝大多数体系结构下,这些读取请求都会被串行化,one by one的处理,因此 spin_lock
所需时间将正比于等待锁的线程数目。
本质上,这里的问题还是在于资源共享:所有等锁线程都spin在同一个全局变量上。如果每个线程仅仅spin在本地变量(该线程私有的变量),那么将有效的提高scalability。本文要介绍的MCS Lock就是这样的思路。
这次,我们先给出实现(环境为:GCC 4.8 + X86体系结构 + Ubuntu 14.04 32bit系统),然后再讲解算法。
#include<pthread.h> //just for test demo.
#define CPU_BARRIER() __sync_synchronize()
#define CPU_RELAX() __asm__ __volatile__("pause/n": : :"memory")
#define CAS(address, exp, target) __sync_bool_compare_and_swap(address, exp, target)
#define ATOMIC_EXCHANGE(address, val) __atomic_exchange_n(address, val, __ATOMIC_SEQ_CST)
//=========================================
struct MCSLockNode
{
volatile MCSLockNode *volatile next;
volatile int spin;
};
typedef MCSLockNode MCSLock;
static void lock(MCSLock **m, MCSLockNode *me)
{
me->next = NULL;
me->spin = 0;
MCSLockNode *tail = ATOMIC_EXCHANGE(m, me);
if (!tail) {
return;
}
tail->next = me;
CPU_BARRIER();
while (!me->spin) {
CPU_RELAX();
}
return;
}
static void unlock(MCSLock **m, MCSLockNode *me)
{
if (!me->next) {
if (CAS(m, me, NULL)) {
return;
}
while (!me->next) {
CPU_RELAX();
}
}
/*It holds that me->next != NULL here */
me->next->spin = 1;
}
MCS Lock算法维护一个队列(队尾指针tail)。
lock:尝试加锁的线程创建一个节点,并将其中的 spin
域设置为0。为0表示不是锁的持有者(lock holder),接着通过原子操作 ATOMIC_EXCHANGE
,将自己插入队列中,并获得插入前队尾指针tail。如果tail指针为空,说明当前锁是空闲的,没有被任何线程占用,因此获得锁成功;如果tail指针不空,那么设置自己的前驱,然后自旋,等待它的前驱(线程)将自己的 spin
域置为1。 spin
域为1,那么该线程就变成锁的持有者了。
注意到23行的cpu级别的memory barrier。因为这里我们需要写 tail->next
,然后读 me->spin
,这两个是不同的变量,而在X86下,写读不同变量可能会被CPU乱序,而一旦乱序,我们的实现就不正确了。
unlock:释放锁的线程需要检查是否存在后继,如果存在后继,那么将它的后继的 spin
域设置为1即可,后继将结束spin,成功获得锁。一切搞定,如41行代码所示。
如果不存在后继,那么这里需要特别特别注意。不存在后继有两种可能:
它是最后一个线程。33-35行处理的就是这种情况。那么这时候只需要把队列置为空即可。
它不是最后一个线程。那么此时存在另外一个线程在进行加锁操作,尝试加锁的线程已经通过18-21行设置了新的队尾指针,但是还没设置它的前驱,也就是还没执行到22行。因此释放锁的线程需要等待,等待22行被其他线程执行,等待它被人设置为别人的前驱。这也就是36-38行所做的事。
简单说来,MCS Lock就是将线程们用队列管理起来;加锁的线程spin在自己的变量上;释放锁的线程只更新它后继的变量。
1,MCS Lock的优点是显而易见的,因为每个等待锁的线程都spin在自己的局部的变量上。以至于最新版的linux内核已经开始采用它,来代替之前的ticket lock实现。那么,它的缺点呢?
2,列出表格,从公平性、是否出现饥饿、时间空间复杂度等方面,全面对比ticket lock、mcs lock、peterson lock等。
3,编写程序测试这几个算法的性能,考察它们的scalability。
MCS Lock测试程序Demo
MCSLock *global_lock = NULL;
void* t1(void *arg)
{
MCSLockNode mcs_node;
for(int i = 0; i < 1000000; i++) {
lock(&global_lock, &mcs_node);
unlock(&global_lock, &mcs_node);
}
return NULL;
}
void* t2(void *arg)
{
MCSLockNode mcs_node;
for(int i = 0; i < 1000000; i++) {
lock(&global_lock, &mcs_node);
unlock(&global_lock, &mcs_node);
}
return NULL;
}
int main()
{
pthread_t pthread1;
pthread_t pthread2;
pthread_create(&pthread1, NULL, t1, NULL);
pthread_create(&pthread2, NULL, t2, NULL);
pthread_join(pthread1, NULL);
pthread_join(pthread2, NULL);
return 0;
}