转载

Golang sync包源码剖析

Golang在标准库中提供了一些并发编程所需要的基本工具,包括同步机制、原子操作、对象池等,这些工具基本都包含在sync包中,本文从源码入手简单分析一下各个工具的实现机制。

原子操作

Golang的原子操作函数全部包含在了sync/atomic中。sync包中其他许多工具的实现都依赖于原子操作,所以先来看一下原子操作的实现。

原子操作的实现全部在asm_xxx.s文件中,使用汇编写成。以amd64平台的cas操作为例,它的源码如下:

TEXT ·CompareAndSwapUint32(SB),NOSPLIT,$0-17
MOVQ addr+0(FP), BP
MOVL old+8(FP), AX
MOVL new+12(FP), CX
// 载入参数
LOCK
CMPXCHGL CX, 0(BP)
// 使用LOCK前缀完成交换操作
SETEQ swapped+16(FP)
//设置返回值
RET

可见cas操作是通过对修改命令加LOCK前缀实现的,LOCK前缀可以保证对CPU缓存的操作是排他的。

事实上,其他的原子操作基本也是通过LOCK前缀加一个读改写指令完成,或者是通过XCHG系列指令完成(Store系列函数),而XCHG指令本身是带有锁语义的。值得注意的是,Load系列函数由于只读不改,所以没有进行任何加锁。

atomic.Value的方法实际上就是对其他原子操作的调用,Golang中的接口类型在内存中是一个struct,包含类型和值两个域,atomic.Value的方法都是对指向这两个域的指针分别做原子操作的过程。

Mutex/RWMutex

Mutex内部使用了信号量和自旋锁,信号量在runtime/sema.go中实现,自旋锁在runtime/proc1.go中实现,再此不再赘述。

Mutex的Lock方法有三个执行分支:

首先是fastpath,使用一个简单的cas操作修改锁状态,如果修改失败进入后面的分支。实际上sync包中许多函数的实现都是类似的结构,首先用原子操作实现一个fastpath,操作失败后再使用信号量和自旋锁进行slowpath。

在第二个分支会调用自旋锁,忙等待这个Mutex的Unlock。但是自旋锁的使用是有限制的(runtime/proc1.go第3712行的判断),不可能让忙等待的goroutine占用太多资源,所以自旋一定次数或者自旋的goroutine数量超出一个限制之后runtime就会禁用自旋锁。这时就会进入第三个分支。

第三个分支比较简短,会在cas操作失败之后直接获取信号量,然后等待被唤醒,如此循环下去。

Unlock过程要简单的多,直接使用cas操作修改锁的状态,然后释放信号量。

RWMutex的实现比Mutex要简单。RWMutex的结构中有两个信号量,分别对应于读者和写者,另外两个整数记录读者和写者的数量。RWMutex的加解锁过程都是两步,首先原子操作修改读者或写者计数,然后直接获取信号量,与Mutex相比省去了fastpath和自旋锁过程。由此可见,当多读少写的场景中,使用RWMutex的效率应当会高于Mutex,因为省去了大量的忙等待过程。

Cond

Cond结构由一个等待者计数、信号量、锁组成(还有一个copychecker用来检测Cond对象是否被移动,与Cond的功能实现无关)。

Cond.Wait方法会首先原子递增等待者计数,然后获取信号量进入休眠,代码如下:

c.L.Unlock()
runtime_Syncsemacquire(&c.sema)
c.L.Lock()

所以在Wait之前应当手动为c.L上锁,Wait结束后手动解锁。为避免虚假唤醒,需要将Wait放到一个条件判断循环中。官方要求的写法如下:

c.L.Lock()
for !condition() {
c.Wait()
}
... make use of condition ...
c.L.Unlock()

Broadcast和Signal方法底层都是调用signalImpl这个方法,区别只在于唤醒等待在信号量上的goroutine数量不同。

WaitGroup

WaitGroup由一个计数器和一个32位整数信号量组成。这个计数器是一个[12]byte,事实上只有后64位用于计数,前面的32位是为了在32位编译环境下保证64位对齐(64位的原子操作的要求)。

WaitGroup的方法不复杂,都是使用原子操作修改计数器然后对信号量进行操作。

WaitGroup的使用要求稍微多一点,文档要求当计数器为0时,Add一个正数必须在Wait之前发生。另外,如果要复用WaitGroup,必须等上一次使用中所有的Wait均已返回才行。

Pool

sync.Pool是一个在Go1.3才加入标准库的新类型,设计目的是复用对象,减轻GC压力。

由于Pool在使用时可能会在多个goroutine之间交换对象,所以比较复杂。我们先来看一下数据结构:

type Pool struct {
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array

// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}

// Local per-P Pool appendix.
type poolLocal struct {
private interface{} // Can be used only by the respective P.
shared []interface{} // Can be used by any P.
Mutex // Protects shared.
pad [128]byte // Prevents false sharing.
}

可以看到Pool类型中保存了一个poolLocal的数组,大小即P的数量,也就是给每一个系统线程分配了一个poolLocal,而poolLocal类型中保存了真正的数据。为什么要分poolLocal要分private和shared两个域来保存对象呢?因为poolLocal中的对象可能会被其他P偷走,private域保证这个P不会被偷光,至少能保留一个对象供自己用。否则,如果这个P只剩一个对象,被偷走了,那么当它本身需要对象时又要从别的P偷回来,造成了不必要的开销。

Get操作分为三个阶段:

  1. fastpath。从自身对应的poolLocal中按private->shared的顺序获取对象。

  2. 偷取对象。挨个遍历其他P的poolLocal,一旦发现shared数组不为空,就将尾部的对象偷走并返回。

  3. 使用New生成一个新的对象。

Put操作首先会试图将新对象放到自己的poolLocal.private中,如果private已经有对象了,就会放到shared的尾部。可见Put操作不会涉及到其他的P,比较简单。

Pool的清空:

在每次GC之前,runtime会调用poolCleanup函数来将Pool所有的指针变为nil,计数变为0,这样原本Pool中储存的对象会被GC全部回收。这个特性使得Pool有自己独特的用途。首先,有状态的对象绝不能储存在Pool中,Pool不能用作连接池。其次,你不需要担心Pool会不会一直增长,因为runtime定期帮你回收Pool中的数据。但是也不能无限制地向Pool中Put新的对象,这样会拖累GC,也违背了Pool的设计初衷。官方的说法是Pool适用于储存一些会在goroutine间分享的临时对象,举的例子是fmt包中的输出缓冲区。

Once

Once由一个Mutex和一个整数类型的标志done组成,只有一个方法Do,代码如下:

func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 1 {
return
}
// Slow-path.
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}

如果这个函数已经执行过了,就会走fastpath,一个原子操作判断一下done标志然后返回。

如果没有执行过,需要加锁执行函数,然后修改标志。这里在加锁后仍然使用原子操作修改标志是因为fastpath没有加锁。

sync包的主要内容就是这样,其他一些辅助函数就不分析了。欢迎讨论。

正文到此结束
Loading...