一、 Go 同步原语
sync.Cond
->notifyList
->runtime.mutex
、atomic
sync.WaitGroup
->atomic
、runtime.sema
sync.Map
->sync.Mutex
、atomic
sync.Once
->sync.Mutex
、atomic
sync.RWMutex
->sync.Mutex
、atomic
sync.Mutex
->runtime.sema
channel
->runtime.mutex
sync.Mutex和runtime.mutext区别:简单说就是sync.Mutex
是用户层的锁,Lock
抢锁失败会造成goroutine
阻塞(会调用gopark
)。runtime.mutex
是给 runtime
使用的锁,Lock
抢锁失败,会造成m
阻塞(线程阻塞,底层调用的futex
)。
二、Atomic
Golang
中的Atomic
主要保证了三件事,原子性、可见性、有序性。
我们先看下Go的源码里面Atomic 的API
,主要包括Swap
、CAS
、Add
、Load
、Store
、Pointer
几类,在IA64 CPU上对应的汇编指令如下:
Swap
: 主要是 XCHGQ 指令CAS
: 主要是 LOCK CMPXCHGQ 指令Add
: 主要是 LOCK XADDQ 指令Load
: 主要是 MOVQ(Load64) 指令Store
: 主要是 XCHGQ 指令Pointer
: 主要当做64位int,调用上述相关方法。
关于LOCK prefix和XCHG指令在 英特尔开发人员手册 section 8.2.5中,我们找到了如下的解释:
For the Intel486 and Pentium processors, the LOCK# signal is always asserted on the bus during a LOCK operation, even if the area of memory being locked is cached in the processor.
For the P6 and more recent processor families, if the area of memory being locked during a LOCK operation is cached in the processor that is performing the LOCK operation as write-back memory and is completely contained in a cache line, the processor may not assert the LOCK# signal on the bus. Instead, it will modify the memory location internally and allow it’s cache coherency mechanism to ensure that the operation is carried out atomically. This operation is called “cache locking.” The cache coherency mechanism automatically prevents two or more processors that have cached the same area of memory from simultaneously modifying data in that area.
The I/O instructions, locking instructions, the LOCK prefix, and serializing instructions force stronger orderingon the processor.
Synchronization mechanisms in multiple-processor systems may depend upon a strong memory-ordering model. Here, a program can use a locking instruction such as the XCHG instruction or the LOCK prefix to ensure that a read-modify-write operation on memory is carried out atomically. Locking operations typically operate like I/O operations in that they wait for all previous instructions to complete and for all buffered writes to drain to memory (see Section 8.1.2, “Bus Locking”).
从描述中,我们了解到:LOCK prefix 和 XCHG 指令前缀提供了强一致性的内(缓)存读写保证,可以保证 LOCK 之后的指令在带 LOCK 前缀的指令执行之后才会执行。同时,我们在手册中还了解到,现代的 CPU 中的 LOCK 操作并不是简单锁 CPU 和主存之间的通讯总线, Intel 在 cache 层实现了这个 LOCK 操作,此因此我们也无需为 LOCK 的执行效率担忧。
PS:Java
中的volatile
关键字也是基于 Lock prefix
实现的。
从上面可以看到Swap
、CAS
、Add
、Store
都是基于LOCK prefix
和XCHG
指令实现的,他能保证缓存读写的强一致性。
三、runtime.mutex
3.1 mutex 结构
runtime
的 mutex 定义在runtime/runtime2.go
中。定义如下:
type mutex struct {
// Empty struct if lock ranking is disabled, otherwise includes the lock rank
lockRankStruct
// Futex-based impl treats it as uint32 key,
// while sema-based impl as M* waitm.
// Used to be a union, but unions break precise GC.
key uintptr // lock、unlock、sleeping 三种状态 sleep 阻塞当前 m ,然后排队等系统唤醒
}
lockRankStruct
这个是给runtime
做 死锁 检测用的,只有设置了GOEXPERIMENT=staticlockranking
才lockRankStruct
才会有具体实现,否则的话这个结构体只会是个空Struct,空的Struct
只要不是最后一个字段是不会占用任何空间的(详见final-zero-size-field),具体lockrank
的CR
,可以看这个提交。lookrank
主要通过加锁顺序 来判断是否会死锁,如果加锁顺序不符合预期就会throw
异常(注意这个不是panic
不能被recover
)。具体代码如下:
// checkRanks checks if goroutine g, which has mostly recently acquired a lock
// with rank 'prevRank', can now acquire a lock with rank 'rank'.
//
//go:systemstack
func checkRanks(gp *g, prevRank, rank lockRank) {
rankOK := false
if rank < prevRank {
// If rank < prevRank, then we definitely have a rank error
rankOK = false
} else if rank == lockRankLeafRank {
// If new lock is a leaf lock, then the preceding lock can
// be anything except another leaf lock.
rankOK = prevRank < lockRankLeafRank
} else {
// We've now verified the total lock ranking, but we
// also enforce the partial ordering specified by
// lockPartialOrder as well. Two locks with the same rank
// can only be acquired at the same time if explicitly
// listed in the lockPartialOrder table.
list := lockPartialOrder[rank]
for _, entry := range list {
if entry == prevRank {
rankOK = true
break
}
}
}
if !rankOK {
printlock()
println(gp.m.procid, " ======")
printHeldLocks(gp)
throw("lock ordering problem")
}
}
3.2 lock 实现
在macOS
和Windows
上runtime.mutex
是基于pthread_mutex
来实现的,详见 lock_sema。而在Linux
上lock
是基于futex
来实现的,详见 lock_futex。这里我们只关注Linux
下的实现。
func lock(l *mutex) {
lockWithRank(l, getLockRank(l))
}
func lockWithRank(l *mutex, rank lockRank) {
lock2(l)
}
func lock2(l *mutex) {
gp := getg() // 获取当前的 goroutine
if gp.m.locks < 0 {
throw("runtime·lock: lock count")
}
gp.m.locks++ // g绑定的m的lock数量加1
// l.key 只有三种状态 mutex_unlocked、mutex_locked、mutex_sleeping
// mutex_unlocked 表示无锁状态
// mutex_locked 正常加锁状态
// mutex_sleeping 表示有线程调用futexsleep阻塞了
// 设置状态为 mutex_locked ,**注意这里是直接设置,不是CAS**
v := atomic.Xchg(key32(&l.key), mutex_locked)
if v == mutex_unlocked { // 之前的状态是 mutex_unlocked 表示加锁成功了
return
}
// 走到这里,表示没有加锁成功
// 这里 v 不是 mutex_unlocked 所以只能是 MUTEX_LOCKED 或 MUTEX_SLEEPING
// 所以 wait 可能是 MUTEX_LOCKED 或 MUTEX_SLEEPING
// 如果我们将 l->key 从 MUTEX_SLEEPING 更改为其他值,我们必须小心在返回之前将其更改回 MUTEX_SLEEPING
wait := v
// 多核情况下尝试自旋4次,单个就不用自旋了
spin := 0
if ncpu > 1 {
spin = active_spin // active_spin = 4
}
for {
for i := 0; i < spin; i++ {
// 注意我们上面设置了 l.key = mutex_locked
// 这里如果 key = mutex_unlocked,表示肯定是其他持有锁的线程进行了锁的释放
for l.key == mutex_unlocked {
// CAS 抢锁成功直接返回,否则再尝试自旋
// 这里 wait 是 MUTEX_LOCKED 或 MUTEX_SLEEPING
if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
return
}
}
procyield(active_spin_cnt) // 执行 active_spin_cnt = 30 次 PAUSE指令
}
// passive_spin = 1 ,再尝试抢一次锁。
for i := 0; i < passive_spin; i++ {
for l.key == mutex_unlocked {
if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
return
}
}
osyield() // CAS 失败,系统调用`sched_yield`让出CPU
}
// 直接设置为 mutex_sleeping 状态
v = atomic.Xchg(key32(&l.key), mutex_sleeping)
if v == mutex_unlocked {
// 注意这里,如果是从 mutex_unlocked => mutex_sleeping 也认为是加锁成功,然后直接返回,不会走futexsleep阻塞当前线程。
// 造成的影响就是,解锁的时候执行,执行 futexwakeup了,但是没有需要唤醒的线程(功能上应该没有影响)
return
}
wait = mutex_sleeping // 设置 wait 状态为 mutex_sleeping 下次循环会设置为 mutex_sleeping 状态
// l.key == mutex_sleeping 就 sleep,直到被唤醒。
// 不然继续循环(说明在atomic.Xchg mutex_sleeping设置完这短短时间内,其他线程设置又重新设置了l.key状态比如设置为了mutex_locked或者mutex_unlocked。这个时候不会进入sleep,而是会去循环执行步骤1。)
// 如果 *addr == val { 当前线程进入sleep状态 } ;不会阻塞超过ns,ns<0表示永远休眠
// futexsleep(addr *uint32, val uint32, ns int64)
// 阻塞 m
futexsleep(key32(&l.key), mutex_sleeping, -1)
}
}
lock主要步骤如下:
- 调用
atomic.Xchg
直接设置key
的状态为mutex_locked
(注意这里不是CAS
,是直接设置)。 - 根据
atomic.Xchg
返回的状态v
,来判断是否加锁成功了,如果v = mutex_unlocked
表示加锁成功了(这个时候可以直接返回了)。否则就是加锁失败,这个时候v
可能是MUTEX_LOCKED
或者MUTEX_SLEEPING
的状态。 - 如果是多核的话,会尝试自旋
4
,把l.key
从状态mutex_unlocked
改成wait
。注意,我们在步骤1
里面直接设置了key
为mutex_locked
,如果这里l.key = mutex_unlocked
,只能说明是其他持有锁的线程释放了锁。这个CAS
成功,表示加锁成功。如果加锁失败,会调用下procyield
优化下自旋性能。 - 自旋
4
次失败,会再尝试一次CAS
,失败的话会调用osyield
让出CPU
。 osyield
完成以后,继续执行,这个时候直接调用atomic.Xchg
设置l.key = mutex_sleeping
,表示当前准备调用futexsleep
进行sleep
。- 使用系统调用
futexsleep
,如果l.key == mutex_sleeping
,则当前线程进入睡眠状态,直到有其他地方调用futexwakeup
来唤醒。如果这个时候l.key != mutex_sleeping
,说明在步骤5
设置完这短短时间内,其他线程设置又重新设置了l.key
状态比如设置为了mutex_locked
或者mutex_unlocked
。这个时候不会进入sleep
,而是会去循环执行步骤1
。
3.3 unlock 实现
func unlock(l *mutex) {
unlockWithRank(l)
}
func unlockWithRank(l *mutex) {
unlock2(l)
}
func unlock2(l *mutex) {
// 设置 l.key = mutex_unlocked
v := atomic.Xchg(key32(&l.key), mutex_unlocked)
if v == mutex_unlocked {// 重复调用 unlock,直接抛出异常。
throw("unlock of unlocked lock")
}
if v == mutex_sleeping {
// 之前的状态是 mutex_sleeping,说明其他有线程在`sleep`,唤醒一个`sleep`的对象。
// 如果任何线程阻塞在addr上,则唤醒至少cnt个阻塞的任务
// futexwakeup(addr *uint32, cnt uint32)
futexwakeup(key32(&l.key), 1)
}
gp := getg()
gp.m.locks--
if gp.m.locks < 0 {
throw("runtime·unlock: lock count")
}
if gp.m.locks == 0 && gp.preempt { // restore the preemption request in case we've cleared it in newstack
gp.stackguard0 = stackPreempt
}
}
unlock 实现总结:
- 调用
atomic.Xchg
设置l.key = mutex_unlocked
。 - 如果设置之前的状态就是
mutex_unlocked
,直接抛异常程序退出。 - 如果之前状态是
mutex_sleeping
,则唤醒一个阻塞在futexsleep
的线程。 m
的锁数量减一,如果锁数量等0
且当前g
是被抢占状态,要标记gp.stackguard0
为stackPreempt
,下次发生函数调用的时候,会主动让出这个g
。
四、runtime.semaphore
4.1 信号量 P/V 操作
P原语
:P
是荷兰语Proberen
(测试)的首字母。为阻塞原语,负责把当前进程由运行状态转换为阻塞状态,直到另外一个进程唤醒它。操作为:申请一个空闲资源(把信号量减1
),若成功,则退出;若失败,则该进程被阻塞;V原语
:V
是荷兰语Verhogen
(增加)的首字母。为唤醒原语,负责把一个被阻塞的进程唤醒,它有一个参数表,存放着等待被唤醒的进程信息。操作为:释放一个被占用的资源(把信号量加1
),如果发现有被阻塞的进程,则选择一个唤醒之。
semaphore
基本操作,在 src/sync/runtime.go 定义了下面几个方法:
// Semacquire等待*s > 0,然后原子递减它。
// 它是一个简单的睡眠原语,用于同步库使用。
func runtime_Semacquire(s *uint32)
// SemacquireMutex类似于Semacquire,用来阻塞互斥的对象
// 如果lifo为true,waiter将会被插入到队列的头部
// skipframes是跟踪过程中要省略的帧数
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
// Semrelease会自动增加*s并通知一个被Semacquire阻塞的等待的goroutine
// 它是一个简单的唤醒原语,用于同步库
// 如果handoff为true, 传递信号到队列头部的waiter
// skipframes是跟踪过程中要省略的帧数,从这里开始计算
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
这个几个函数具体的实现在 src/runtime/sema.go。
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0)
}
//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
func poll_runtime_Semacquire(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0)
}
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}
//go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease
func poll_runtime_Semrelease(addr *uint32) {
semrelease(addr)
}
4.2 semtable
Treap
是Binary Search Tree
+Heap
的组合。
semTable 是一个长度为251
的全局数组,每个semaRoot
指向一个treap
,主要用于存放阻塞在信号量(semaphore
)上的sudog
var semtable [semTabSize]struct {
root semaRoot
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte // 防止 flase sharing
}
semaRoot
最早是双向链表,在某些场景下性能比较查,所以优化成了treap
,具体可以看 CR37103
优化之后semtable
存储的结构大概是这样:
4.3 semacquire 获取信号量
semaphore
获取信号量操作步骤如下:
- 调用
runtime_SemacquireMutex
(比如sync.Mutex.Lock()
场景) sync_runtime_SemacquireMutex
semacquire1
CAS(addr, v, v-1)
状态成功就返回,失败继续往下- 缓存池拿一个
sudog
,或者new
一个sudog
(acquireSudog
) - 把
g
相关的数据存到sudog
中。 - 循环
- 对当前
semaRoot
加锁 nwait++
cansemacquire/CAS(addr, v, v-1)
sudog
加到semaRoot
的treap
中/root.queue()
- 可能要调整树的结构(左旋
rotateRight
/右旋rotateLeft
)防止树退化为链表 goparkunlock
让出当前g
的执行- 被唤醒
CAS
成功或者s.ticket != 0
(当前没有其他竞争者了) 认为成功- 否则继续循环
- 对当前
- 最后释放
sudog
/releaseSudog
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
gp := getg()
if gp != gp.m.curg {// 判断下g是不是当前m绑定的g
throw("semacquire not on the G stack")
}
// CAS(addr, v, v-1) 成功就直接成功否则一直循环,如果 *addr = 0 返回 false 走下面 slowpath
if cansemacquire(addr) {
return
}
// 走到这里表示当前g要阻塞
// 下面逻辑,就是把g封装成sudog,然后存到semTable中。
// 最后调用 gopark 让出当前g
s := acquireSudog() // 这个先去 p.sudogcache 拿,没拿到去全局sudohgcache拿
root := semroot(addr) // 根据sema的地址,算出用到semTable中哪个semaRoot
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lockWithRank(&root.lock, lockRankRoot) // 加锁,方面下面修改 semaRoot的属性
// 对等待的计数加1,这样sema_release时候不会走快路径
atomic.Xadd(&root.nwait, 1)
// 看下是否有其他的goroutine调用了sema_release
// 在尝试 CAS(addr, v, v-1) 试下
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// 这里,就是这个新的 sudog 加到 semaTable中的
root.queue(addr, s, lifo)
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes) // 这你会让出当前的goroutine
// goroutine 被调度回来了,表示有 sema_release 以后唤醒了这个 sema
// s.ticket != 0 表示是等待队列头部的 sudog,当前队列只有一个sudog了,所以直接结束
// CAS(addr, v, v-1) 成功也结束
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3+skipframes)
}
releaseSudog(s) // 释放 sudog
}
func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}
}
func acquireSudog() *sudog {
// 设置禁止抢占
mp := acquirem()
pp := mp.p.ptr()
//当前本地sudog缓存没有了,则去全局缓存中拉取一批
if len(pp.sudogcache) == 0 {
lock(&sched.sudoglock)
// 首先尝试从全局缓存中获取sudog,直到本地容量达到50%
for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {
s := sched.sudogcache
sched.sudogcache = s.next
s.next = nil
pp.sudogcache = append(pp.sudogcache, s)
}
unlock(&sched.sudoglock)
// 如果全局缓存为空,则分配创建一个新的sudog
if len(pp.sudogcache) == 0 {
pp.sudogcache = append(pp.sudogcache, new(sudog))
}
}
n := len(pp.sudogcache)
s := pp.sudogcache[n-1]
pp.sudogcache[n-1] = nil
pp.sudogcache = pp.sudogcache[:n-1]
if s.elem != nil {
throw("acquireSudog: found s.elem != nil in cache")
}
//解除抢占限制
releasem(mp)
return s
}
4.4 semrelease 释放信号量
semaphore
释放信号量操作步骤如下:
- 调用
runtime_Semrelease
,比如sync.Mutex.Unlock()
场景。 sync_runtime_Semrelease
semrelease1
- 原子
*addr++
nwait=0
,表示没有阻塞在这个信号量上的g
直接返回。- 有阻塞的
g
在semTable
中找到对应的semaRoot
,然后对
semaRoot`加锁。 - 再次
check
下nwait=0
,等于0
直接返回。 - 拿到
sema
的addres
在semTable
中对应的队列头部的seamRoot
。 dequeue
是否需要调整左旋rotateLeft
或者右旋rotateRight
调整树结构。readyWithTime
,调用goread
唤醒sudog
绑定的g
。goyield
func semrelease1(addr *uint32, handoff bool, skipframes int) {
root := semroot(addr)
atomic.Xadd(addr, 1)
// 没有等待者直接返回
if atomic.Load(&root.nwait) == 0 {
return
}
//查找一个等待着并唤醒它
lockWithRank(&root.lock, lockRankRoot)
if atomic.Load(&root.nwait) == 0 {
//计数已经被其他goroutine消费,所以不需要唤醒其他goroutine
unlock(&root.lock)
return
}
s, t0 := root.dequeue(addr) //查找第一个出现的addr ( s.elem == unsafe.Pointer(addr) )
if s != nil {
atomic.Xadd(&root.nwait, -1)
}
unlock(&root.lock)
if s != nil { // 可能比较慢 甚至被挂起所以先unlock
acquiretime := s.acquiretime
if acquiretime != 0 {
mutexevent(t0-acquiretime, 3+skipframes)
}
if s.ticket != 0 {
throw("corrupted semaphore ticket")
}
if handoff && cansemacquire(addr) {
s.ticket = 1
}
readyWithTime(s, 5+skipframes) // -> goready(s.g,5)标记runnable 等待被重新调度
if s.ticket == 1 && getg().m.locks == 0 {
// 直接切换G
// readyWithTime已经将等待的G作为runnext放到当前的P
// 我们现在调用调度器可以立即执行等待的G
// 注意waiter继承了我们的时间片:这是希望避免在P上无限得进行激烈的信号量竞争
// goyield类似于Gosched,但是它是发送“被强占”的跟踪事件,更重要的是,将当前G放在本地runq,而不是全局队列。
// 我们仅在饥饿状态下执行此操作(handoff=true)
// 我们等待进入饥饿状体,然后开始进行ticket和P的手递手交接
// See issue 33747 for discussion.
// https://go-review.googlesource.com/c/go/+/206180
goyield() // -> goyield -> goyield_m
}
}
}
// goyield is like Gosched, but it:
// - does not emit a GoSched trace event
// - puts the current G on the runq of the current P instead of the globrunq
func goyield() {
checkTimeouts()
mcall(goyield_m)
}
func goyield_m(gp *g) {
pp := gp.m.p.ptr()
casgstatus(gp, _Grunning, _Grunnable)
dropg()
runqput(pp, gp, false)
schedule()
}
4.5、总结
获取信号量操作主要尝试把sema
地址CAS
方式原子减1
,成就直接返回,失败以后会把当前g
打包成sudog
然后保存到semTable
,然后调用gopark
让出当前的goroutine
。
释放信号量操作就是吧sema
地址加1
,然后看有没有等待中的g
,没有直接返回,有的话去semaTable
的等待队列取出然后调用goready
唤醒对应的g
。
主要理解semaTable
里面存储sudog
的方式就好了。
五、Sync.Mutex 源码
5.1 发展历史
sync.Mutex
第一版 代码 是2008
年的时候 @rsc 提交的。最早的实现比较简单,是通过简单的CAS
加信号量(runtime-sema)
的方式来实现的。
@dvyukov 2011
年的时候,提交了第一次优化了 sync: improve Mutex to allow successive acquisitions,这一版中加入了mutexWoken
唤醒状态和等待者计数的概念。
@dvyukov 2015
年的时候,新增了第二次优化 sync: add active spinning to Mutex,这一版里面主要是加了自旋逻辑。
@dvyukov 2016
年的时候,新增了第三次优化 sync: make Mutex more fair,这一版加入了饥饿模式,让锁在更公平一些。
5.2 Mutex结构分析
先看Mutex
的 注释:
// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
翻译如下:
// 公平锁
//
// 锁有两种模式:正常模式和饥饿模式。
// 在正常模式下,所有的等待锁的 goroutine 都会存在一个先进先出的队列中(轮流被唤醒)
// 但是一个被唤醒的goroutine并不是直接获得锁,而是仍然需要和那些新请求锁的(new arrivial)
// 的goroutine竞争,而这其实是不公平的,因为新请求锁的goroutine有一个优势——它们正在CPU上
// 运行,并且数量可能会很多。所以一个被唤醒的goroutine拿到锁的概率是很小的。在这种情况下,
// 这个被唤醒的goroutine会加入到队列的头部。如果一个等待的goroutine有超过1ms
// 都没获取到锁,那么就会把锁转变为饥饿模式。
//
// 在饥饿模式中,锁的所有权会直接从释放锁(unlock)的goroutine转交给队列头的goroutine,
// 新请求锁的goroutine就算锁是空闲状态也不会去获取锁,并且也不会尝试自旋。它们只是排到队列的尾部。
//
// 如果一个goroutine获取到了锁之后,它会判断以下两种情况:
// 1. 它是队列中最后一个goroutine;
// 2. 它拿到锁所花的时间小于1ms;
// 以上只要有一个成立,它就会把锁转变回正常模式。
// 正常模式会有比较好的性能,因为即使有很多阻塞的等待锁的goroutine,
// 一个goroutine也可以尝试请求多次锁。
// 饥饿模式对于防止尾部延迟来说非常的重要。
在看看下Mutex
结构体代码:
type Mutex struct {
state int32
sema uint32
}
const (
mutexLocked = 1 << iota // 表示当前是否已经上锁,1是锁定,0是无锁
mutexWoken // 当前是不是唤醒状态, 1 表示唤醒
mutexStarving // 当前是否是饥饿状态,1 表示是饥饿
mutexWaiterShift = iota // state 右移3位表示 Waiter的个数
starvationThresholdNs = 1e6 // 等待时间超过这个数就变饥饿模式。
)
sema
这个字段比较简单,就是调用runtime_SemacquireMutex
和runtime_Semrelease
需要传的参数。state
里面不同的位表示不同的含义,如下图所示:
5.3 Lock
// 如果已经上锁了,这里会阻塞当前的goroutine直到mutex可用
func (m *Mutex) Lock() {
// 快路径,先尝试CAS把state从0改成锁定
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 慢路径
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// old&(mutexLocked|mutexStarving) 表示保留Locked和Starving两个bit位上的数据,其他的全部清空
// old&(mutexLocked|mutexStarving) == mutexLocked 表示是锁定状态但是不是饥饿状态。
// runtime_canSpin主要判断能不能自旋,它做了几件事
// 1. 自旋次数 < 4
// 2. 必须是多核CPU 且 GOMAXPROCS>1
// 3. P 并且本地运行队列为空.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 当前“唤醒” 标记为 0 ,然后还有其他g处于等待状态
// CAS 尝试设置唤醒状态标记位 = 1
// 告诉其他的 g ,我目前正在处于自旋抢锁状态
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// runtime_doSpin 就是调用的 procyield(active_spin_cnt)
// procyield 可以看 https://fanlv.fun/2022/10/05/runtime-mutex/#2-4-procyield-%E5%8A%9F%E8%83%BD
runtime_doSpin()
iter++
old = m.state // 读取下 m.state 新的值,可能已经被其他 g 改变了。
continue // 设置失败尝试继续自旋
}
new := old
if old&mutexStarving == 0 {
// 不是饥饿状态,尝试加锁
// 是饥饿状态,就不用设置了,下面Waiter+1,然后乖乖排队去就行了
new |= mutexLocked
}
// 如果mutexLocked 或者 mutexStarving = 1
// Waiter 数量加一
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果当前是 mutexLocked = 1(是锁定状态)
// 然后 starving = true (下面加锁等待时间超过1ms)
// 这个时候需要把 mutexStarving 标记位设置为 1
// 如果不是锁定状态,我就不设置了饥饿状态了。搞不好下面CAS一把设置就成功了。
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// 如果已经设置为唤醒状态, 需要清除唤醒标记, 因为后面要么获得了锁,要么进入休眠.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
// CAS 更新状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
// 老的状态是没有加锁,也不是饥饿,那表示我们直接加锁成功了
// 直接返回了
break // locked the mutex with CAS
}
// 走到这里,表示之前的锁可能是加锁状态可能是饥饿状态
// 无论是否是加锁、或者饥饿状态,都要调用信号量,去排队。
// waitStartTime != 0 表示是 sleep 以后被唤醒的 goroutine
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 请求信号量
// queueLifo = true 会放到 semTable suodg队列的头部。
// 如果没有可以用的信号量会阻塞到这句代码,底层其实是调用 gopark 休眠这个 g
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 这里表示有人释放了锁/信号量,我们这个g被唤醒了。
// 虽然我们是在队列头部被唤醒了,但是如果这个时候,业务代码有新的请求过来,刚刚好有代码调用 Lock。我们这个刚刚被唤醒的g,是要跟新的Lock调用场景去抢锁的。
// 等待时间超过 1ms ,直接设置starving=true
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state // 读取一下最新的 state 状态。现在也不知道被改成什么了。
if old&mutexStarving != 0 {
// 当前是饥饿状态 我们也不用再去抢锁了,默认就是给我们执行了
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
// 饥饿状态下不可能有(mutexWoken=0&& mutexLocked==0)这种情况
// mutexWaiter 也不可能 = 0 ,因为下面 mutexWaiter = 1 时候就退出了饥饿状态
throw("sync: inconsistent mutex state")
}
// 下面这个位操作,一个AddInt32 改变三个标记位状态.
// 设置第一位是1,然后 waiter - 1
// mutexLocked = 1 mutexWaiterShift = 3 delta = -7
// delta 第三位是 0 0 1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// 没有等待了退出饥饿状态
delta -= mutexStarving
}
// 修改state的状态。
atomic.AddInt32(&m.state, delta)
break
}
awoke = true // 设置被唤醒,因为之前调用了runtime_SemacquireMutex,执行到这表示重新被调度到了。
iter = 0
} else {
// atomic.CompareAndSwapInt32(&m.state, old, new)
// CAS 失败,重新读下当前状态,然后再循环来一次。
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
5.4 Unlock
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: CAS 取消无锁状态,0 就表示没有其他锁等待者了
// 没有成功就进入 slow path
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
// new = m.state-mutexLocked
// m.state&mutexLocked == 0 表示无锁。
// 如果是无锁,上面fast path就成功了.
// 所以理论不会有这种情况
fatal("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 { // 不是饥饿状态
old := new
for {
// 如果锁没有waiter,或者锁有其他以下已发生的情况之一,则后面的工作就不用做了,直接返回
// 1. 锁处于锁定状态,表示锁已经被其他goroutine获取了
// 2. 锁处于被唤醒状态,这表明有等待goroutine被唤醒,不用再尝试唤醒其他goroutine
// 3. 锁处于饥饿模式,那么锁之后会被直接交给等待队列队头goroutine
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 代码走到这,说明当前锁是空闲状态,等待队列中有waiter,且没有goroutine被唤醒
// waiter - 1 然后设置唤醒状态 = 1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {// 设置成功
runtime_Semrelease(&m.sema, false, 1) // 唤醒一个信号量
return
}
old = m.state // 对一下最新状态
}
} else {
// 饥饿模式下,唤醒信号量等待队列的头部的sudog。
// 饥饿状态过来的g都会放到信号量队列的尾部。
runtime_Semrelease(&m.sema, true, 1)
}
}
饥饿模式下做了个优化,会调用 readyWithTime 把队列头部的g
放到pp.runnext
里面。然后再调用goyield 把当前的g
放到p runnable queue
的尾部,然后调用 schedule 函数,这样就可以优先执行等待队列中的g
了。
详情可以看这个CR
:sync: yield to the waiter when unlocking a starving mutex
六、RWMutex
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // 写的信号量
readerSem uint32 // 读的信号量
readerCount int32 // 等待写的个数
readerWait int32 // 等待读的个数
}
// 加“读锁”
// 对readerCount + 1 。
// 然后看 readerCount是不是小于0
// 小于0表示 正在加写锁,然后阻塞到rw.readerSem 这个信号上。
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
// 释放 “读锁”
// 对readerCount - 1 。
// 然后看 readerCount是不是小于0
// 小于0表示 正在加写锁,然后调用rw.rUnlockSlow
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
}
// r+1 == -rwmutexMaxReaders 表示“读锁”已经释放,抛出异常
// rw.readerWait - 1
// rw.readerWait - 1 = 0 表示所有读锁都释放了
// 所有读锁都释放了可以唤醒 rw.writerSem 对应 写锁的lock方法继续执行
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
// mutex 加锁,保证写锁和写锁之间互斥
// rw.readerCount - rwmutexMaxReaders
// r 表示读锁数量
// rw.readerWait + 读lock的数量
// 等待 rw.writerSem 的信号 (读锁那边释放完了,会发这个信号)
func (rw *RWMutex) Lock() {
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
// rw.readerCount + rwmutexMaxReaders
// r 表示读锁的数量,大于 rwmutexMaxReaders 就抛出异常
// 发送 rw.readerSem 信号量,通知RLock 代码可以继续执行。
func (rw *RWMutex) Unlock() {
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
}
七、Sync.Map
7.1 Sync.RWMutex 多核的伸缩性问题
早在2016
的时候,@bcmills(这个哥们是Go
项目主要维护者之一) 在Go
的 Github
上提出了一个sync: RWMutex scales poorly with CPU count 的Issue
给大家讨论。简单说就是 Sync.RWMutex
这个读写锁,多核情况下扩展性很差。他贴的 Benchmark
测试代码如下:
func BenchmarkRWMutex(b *testing.B) {
for ng := 1; ng <= 256; ng <<= 2 { // ng 表示,开多少个 goroutine
b.Run(fmt.Sprintf("name[%d]", ng), func(b *testing.B) {
var mu sync.RWMutex
mu.Lock()
var wg sync.WaitGroup
wg.Add(ng)
n := b.N // n 表示下面要执行多少次 RLock 和 RUnlock
quota := n / ng // quota 表示分摊到每个 goroutine 上需要执行多少次 Lock 和 RUnlock
for g := ng; g > 0; g-- {
if g == 1 { // n / ng 不是整除的话,剩下余出来的数据,在g=1 的时候全部减掉,不然下面 n 不会等于0
quota = n
}
go func(quota int) {
for i := 0; i < quota; i++ { // 一个循环执行一次 RLock 和 RUnlock
mu.RLock()
mu.RUnlock()
}
wg.Done()
}(quota)
n -= quota
}
if n != 0 {
b.Fatalf("Incorrect quota assignments: %v remaining", n)
}
b.StartTimer() // 从这里开始计时
mu.Unlock() // 这里释放写锁,上面所有阻塞在 RLock 的 goroutine 同时唤醒去执行 RLock
wg.Wait() // 所有 goroutine 的 RLock 和 RUnlock 都执行完毕
b.StopTimer() // 从这里结束计时
})
}
}
Benchmark
的结果可以看出,在多个Gorutine
并发下,可以看到CPU
核数越多,RWLock
的性能越差。
# ./benchmarks.test -test.bench . -test.cpu 1,4,16,64
testing: warning: no tests to run
BenchmarkRWMutex/1 20000000 72.6 ns/op
BenchmarkRWMutex/1-4 20000000 72.4 ns/op
BenchmarkRWMutex/1-16 20000000 72.8 ns/op
BenchmarkRWMutex/1-64 20000000 72.5 ns/op
BenchmarkRWMutex/4 20000000 72.6 ns/op
BenchmarkRWMutex/4-4 20000000 105 ns/op
BenchmarkRWMutex/4-16 10000000 130 ns/op
BenchmarkRWMutex/4-64 20000000 160 ns/op
BenchmarkRWMutex/16 20000000 72.4 ns/op
BenchmarkRWMutex/16-4 10000000 125 ns/op
BenchmarkRWMutex/16-16 10000000 263 ns/op
BenchmarkRWMutex/16-64 5000000 287 ns/op
BenchmarkRWMutex/64 20000000 72.6 ns/op
BenchmarkRWMutex/64-4 10000000 137 ns/op
BenchmarkRWMutex/64-16 5000000 306 ns/op
BenchmarkRWMutex/64-64 3000000 517 ns/op
BenchmarkRWMutex/256 20000000 72.4 ns/op
BenchmarkRWMutex/256-4 20000000 137 ns/op
BenchmarkRWMutex/256-16 5000000 280 ns/op
BenchmarkRWMutex/256-64 3000000 602 ns/op
PASS
为什么多核下面会更慢?其实很简单,就是资源竞争会增加额外开销。RLock
和RUnlock
,底层实现是atomic.AddInt32
,atomic.AddInt32
对应的汇编代码如下:
// uint32 Xadd(uint32 volatile *val, int32 delta)
// Atomically:
// *val += delta;
// return *val;
TEXT ·Xadd(SB), NOSPLIT, $0-20
MOVQ ptr+0(FP), BX
MOVL delta+8(FP), AX
MOVL AX, CX
LOCK
XADDL AX, 0(BX)
ADDL CX, AX
MOVL AX, ret+16(FP)
RET
可以看到里面有 LOCK
前缀的指令,Lock
其实就是CPU
层面的一个锁,锁的单位是Cache Line
。多个核都要同时更新这个Cacheline
,所以性能就有所下降。
7.2 如何去优化?
我们知道,在业务中遇到锁的性能瓶颈时候,我们一般会下面几个方面去考虑优化锁。
- 优化锁的粒度
- 读写分离
- 减少锁持有时间。
- 使用
CAS
2、3、4 在这个读写锁场景都不试用(已经是读写锁了,且瓶颈也在CAS
对cacheline
的资源竞争),所以只能从锁的粒度方向考虑。
7.2.1 distributedrwmutex
@dvyukov(Go
小组成员之一) 提出了一个分布式读写锁的方案,
核心原理就是,一个P
对应一个读写锁,这样读锁在多核情况就没有竞争的问题了,因为每个核的读锁是独立的,互不影响(有点类似 ThreadLocal
的概念)。具体核心代码如下:
func (m *DistributedRWMutex) RUnlock() {
l := m.getLocal()
l.RUnlock()
}
func (m *DistributedRWMutex) getLocal() *sync.RWMutex {
v := runtime.GetProcLocal(m.slot)
p := (*sync.RWMutex)(unsafe.Pointer(uintptr(*v)))
if p == nil {
p = new(sync.RWMutex)
atomic.AddUint64(v, uint64(uintptr(unsafe.Pointer(p))))
}
return p
}
不过这个实现方式也有一个问题需要注意。就是Goroutine
和P
不是强绑定的。有可能你在某个P
执行Lock
以后,做了系统调用
这个时候M、G
和P
可能会解绑,系统调用完成回来的时候,可能绑定的是一个新的P
了。这个时候再去调用getLocal
可能拿到的已经是不一样的锁对象了,再去用这个锁对象去调用RUnlock
是有问题的。一般这种需要在Goroutine
里面直接拿到RWLock
锁对象。类似下面这种:
// ...balabala...
go func() {
rwx := rw.RLocker() // 这里拿到当前P对应的ReadLocker
rwx.Lock()
defer rwx.Unlock()
// ... balabala...
// syscall 这里切换P了也没影响
}()
// ...balabala...
还有一个 drwmutex 库也是这个思想,这里不过多赘述。
@bcmills 的回复说,老的RWMutex
接口,是允许在不同的Goroutine
或者P
里面调用RLock / RUnlock
,考虑兼容性问题,不太想做这样的改造。
7.2.2 Atomic.Value
还有更大的问题是当时(GO1.8
)一些基础库中大量使用了RWMutex
作为包级锁。比如reflect、http.statusMu、json.encoderCache、mime.mimeLock
@dvyukov 指出这些场景其实可以用Atomic.Value
去实现,类似场景有encoding/json/encode.go:cachedTypeFields
// cachedTypeFields is like typeFields but uses a cache to avoid repeated work.
func cachedTypeFields(t reflect.Type) []field {
m, _ := fieldCache.value.Load().(map[reflect.Type][]field)
f := m[t]
if f != nil {
return f
}
// Compute fields without lock.
// Might duplicate effort but won't hold other computations back.
f = typeFields(t)
if f == nil {
f = []field{}
}
fieldCache.mu.Lock()
m, _ = fieldCache.value.Load().(map[reflect.Type][]field)
newM := make(map[reflect.Type][]field, len(m)+1)
for k, v := range m {
newM[k] = v
}
newM[t] = f
fieldCache.value.Store(newM)
fieldCache.mu.Unlock()
return f
}
PS:Atomic.Load
转汇编其实就是简单的MOV
指令,没有LOCK
所以没有Cacheline
资源竞争的问题。
mime: use atomic.Value to store mime types 这个CL
也是尝试用atomic.Value
去替代sync.RWMutex
。
这个实现,虽然读的时候没有资源竞争的问题。但是写的时候是O(n)
的开销。这个方案对写太不友好。
7.2.3 基于二叉树实现 - dmap
@ianlancetaylor 基于二叉树实现了dmap
,dmap
的插入时间复杂度是O(LogN)
,insert
就是常规的写入操作,这里就不过多去赘述了。
// Insert inserts a key/value pair into a dmap.
func (d *dmap) insert(key, val interface{}) {
var n *node
for { // 判断根节点是不是为空。为空直接加锁然后写Root,否则就拿到根节点
root, _ := d.root.Load().(*node)
if root != nil {
n = root
break
}
root = &node{
key: key,
val: val,
}
d.mu.Lock()
if d.root.Load() == nil {
d.root.Store(root)
d.mu.Unlock()
return
}
d.mu.Unlock() // 走到这表示,有其他 goroutine 写了根节点,会循继续去 load 根节点
}
// 到这里,n 表示是 root 节点
for {
cmp := d.compare(key, n.key) // 判断两个 key是否相等
if cmp == 0 {
if val != n.val {
panic("invalid double-insert")
}
return
}
p := &n.left
if cmp > 0 { // key 大于当前节点key。就找右节点
p = &n.right
}
n2, _ := (*p).Load().(*node)
if n2 != nil { // 当前节点不为空,继续重新走循环,比较key和 n.key 大小
n = n2
} else { // 当前节点为空,尝试写入,写入失败,就继续重新走循环逻辑
n2 = &node{
key: key,
val: val,
}
n.mu.Lock()
if (*p).Load() == nil {
(*p).Store(n2)
n.mu.Unlock()
return
}
n.mu.Unlock()
}
}
}
查找的实现,有fastpath
和slowpath
两个路径,fastpath
用的是map
来查找,命中的话就直接返回,时间复杂度是O(1)
的,map
中没查到的话,会去二叉树里面查,时间复杂度是O(LogN)
。有个tricky
的地方是,没有命中map
但是在二叉树中查到这个key
的话,会对这个key
的count+1
,如果这个key
的miss count
大于map
的长度的话,会复制一下map
然后把新的map
回写到Atomic.Value
里面。
// Lookup looks up a key in the distributed map.
func (d *dmap) lookup(key interface{}) interface{} {
// Common values are cached in a map held in the root.
m, _ := d.m.Load().(map[interface{}]interface{})
if val, ok := m[key]; ok { // map里面找到了,直接返回
return val
}
n, _ := d.root.Load().(*node)
for n != nil {
cmp := d.compare(key, n.key)
if cmp == 0 {
count := atomic.AddUint32(&n.count, 1)
// Add this key/val pair to the map in the root,
// but only if it's worth copying the existing map.
if count < 0 || (count > 1 && int(count) > len(m)) {
newm := make(map[interface{}]interface{}, len(m)+1)
for k, v := range m {
newm[k] = v
}
newm[key] = n.val
// It's possible that some other
// goroutine has updated d.m since we
// loaded it. That means we did extra
// work but it's otherwise OK.
// 这里如果有多个 goroutine 写会导致有互相覆盖的问题
d.m.Store(newm)
}
return n.val
}
p := &n.left
if cmp > 0 {
p = &n.right
}
n, _ = (*p).Load().(*node)
}
return nil
}
7.2.4 两个map
@bcmills 基于上面 @ianlancetaylor 的二叉树加map
的思想优化了下。用map
替代了二叉树。具体实现如下:
// Map is a key-value map from which entries can be read without external
// synchronization.
type Map struct {
tenured atomic.Value // 年老代 map
liveNotTenured uint32 // 记录 miss count
mu sync.RWMutex // 对 live 读写的时候,需要用到这个读写锁
live map[interface{}]interface{}
}
读的话,先去tenured
里面去读,读tenured
不用加锁,读写live
用的是读写锁,然后根据misscount
把live
复制给tenured
func (b *Map) Load(key interface{}) (value interface{}, ok bool) {
m, _ := b.tenured.Load().(map[interface{}]interface{})
if value, ok = m[key]; ok {
return value, true
}
b.mu.RLock()
promote := false
if b.live != nil {
value, ok = b.live[key]
lnt := atomic.AddUint32(&b.liveNotTenured, 1)
if lnt >= 1<<31 || int(lnt) >= len(b.live) {
promote = true
}
}
b.mu.RUnlock()
if !promote {
return value, ok
}
b.mu.Lock()
lnt := atomic.LoadUint32(&b.liveNotTenured)
if b.live != nil && (lnt >= 1<<31 || int(lnt) >= len(b.live)) {
b.tenured.Store(b.live)
b.live = nil
atomic.StoreUint32(&b.liveNotTenured, 0)
}
b.mu.Unlock()
return value, ok
}
写的话,很简单,只写live
。
func (b *Map) StoreOrLoad(key, value interface{}) (actualValue interface{}, dup bool) {
b.mu.Lock()
if b.live == nil {
m, _ := b.tenured.Load().(map[interface{}]interface{})
b.live = make(map[interface{}]interface{}, len(m)+1)
for k, v := range m {
b.live[k] = v
}
}
actualValue, dup = b.live[key]
if !dup {
b.live[key] = value
actualValue = value
}
b.mu.Unlock()
return actualValue, dup
}
7.3、Sync.Map 的最终实现
经过一轮讨论以后,@bcmills 单独发了一个提案:sync: add a Map to replace RWLock+map usage 最终决定不去修复RWLock
的伸缩性问题,而是提供一个可伸缩并发安全的Map
来做。 这个并发安全的Map
实现方案就是用的上面双Map
实现。然后这个并发安全的Map
会先放在 x-Repositories 包中经过一段时间迭代,如果没问题了再收敛到Go
源码包中。具体可以看 syncmap: add a synchronized map implementation。
@bcmills 基于双map的demo,做了一些优化,新增了一些API
,比如Delete
、Range
等。提交了一个正式的 CR:
// A Map must not be copied after first use.
type Map struct {
mu sync.Mutex
// clean 是 fastpath 用的,读的时候不用加锁,没有cacheline竞争问题
clean atomic.Value // map[interface{}]interface{}
// dirty 读写都需要加锁
dirty map[interface{}]interface{}
// 如果clean没有查到,这个时候misses会加1
// 当 misses >= len(dirty),会把dirty赋值给clean,然后情况dirty
misses int
}
我们再来看下数据读取的实现,这个里面有几点需要注意,跟上面双map的demo 不同的事,这里的实现是clean
和dirty
两个map
只会有一个不为空。所以读的时候,如果clean
不为空就直接读clean
,并不会再去dirty
读一次。如果dirty
不为nil
,读取以后还会调用一下m.missLocked
,这个函数主要的作用是判断对m.misses
加1
,然后判断要不要把dirty
赋值给clean
,然后清空dirty
。
// Load returns the value stored in the map for a key, or nil if no
// value is present.
// The ok result indicates whether value was found in the map.
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
clean, _ := m.clean.Load().(map[interface{}]interface{})
if clean != nil {
value, ok = clean[key]
return value, ok
}
m.mu.Lock()
if m.dirty == nil {
clean, _ := m.clean.Load().(map[interface{}]interface{})
if clean == nil {
// Completely empty — promote to clean immediately.
m.clean.Store(map[interface{}]interface{}{})
} else {
value, ok = clean[key]
}
m.mu.Unlock()
return value, ok
}
value, ok = m.dirty[key]
m.missLocked()
m.mu.Unlock()
return value, ok
}
func (m *Map) missLocked() {
if m.misses++; m.misses >= len(m.dirty) {
m.clean.Store(m.dirty)
m.dirty = nil
}
}
Store
的函数就比较简单了。如果写入的时候,直接加锁,然后判断dirty
是否为空,如果是空,需要把clean
数据复制一份给dirty
然后清空clean
,然后再把数据赋值给dirty
。
// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
m.mu.Lock()
m.dirtyLocked()
m.dirty[key] = value
m.mu.Unlock()
}
// dirtyLocked prepares the map for a subsequent write.
// It ensures that the dirty field is non-nil and clean is nil by making a deep
// copy of clean.
func (m *Map) dirtyLocked() {
m.misses = 0
if m.dirty != nil {
return
}
clean, _ := m.clean.Load().(map[interface{}]interface{})
m.dirty = make(map[interface{}]interface{}, len(clean))
for k, v := range clean {
m.dirty[k] = v
}
m.clean.Store(map[interface{}]interface{}(nil))
}
这个实现其实有个很大的问题,就是如果有频繁读写交替的话,会导致数据一直在clean
和dirty
两个map
中来回copy
,如果map
很大的话,这个性能会很差,还会阻塞其他线程的读写,但是这个CR当时的场景是期望提供给Runtime包中一些读多写少的场景使用,所以看benchmark
跑的性能还是有很大的提升的。
代码合入的时候 @rsc 提了两点优化建议
- 如果允许
clean != nil and dirty != nil
会更好。 - 如果一个
key
没有被覆盖或者删除的话,它命中了lock-free path
后续理论上应该一直命中lock-free path
会更好一些。
7.4 进一步优化
过了几个月,基于 @rsc 之前合码的时候给的建议,@bcmills 又优化了一版,整个sync.Map
的结构变成了下面这样:
type Map struct {
mu sync.Mutex
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int
}
type readOnly struct { // readOnly 的 map
m map[interface{}]*entry
amended bool // amended=true m没有全部key的数据,没查到还需要去dirty查下.
}
var expunged = unsafe.Pointer(new(interface{})) // 表示这个数据已经不在dirty中了。
type entry struct {
p unsafe.Pointer // *interface{}
}
主要改动只读的map
,之前叫clean
类型是map[interface{}]interface{}
,现在改成了read
,类型是readOnly struct
,readOnly
还有个amended
表示当前readOnly.m
是不是全量数据。我们继续往下Store
的代码
func (m *Map) Store(key, value interface{}) {
// fast-path
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
// 如果这个 key 在 read 里面找到了以后,尝试直接调用 tryStore 去更新 value 数据
// tryStore 里面会做两件事
// 1. 判断当前 entry.p 是不是等于 expunged,等于 expunged 就不能更新,直接返回false。下面会走 slow-path去更新
// 2. 如果不是 expunged ,那就尝试更新 entry.p = &value,如果 CAS 设置成功了就返回。
// 如果是 expunged 状态,表面 dirty 里面已经没有这个 key了,如 read 里面更新这个东西,下次 dirty数据全量提升为 read 的时候,这个数据就会丢失。
return
}
// 下面是 slow-path
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() { // e.unexpungeLocked 尝试CAS(&e.p, expunged, nil)
m.dirty[key] = e // 把 e 赋值给 dirty
}
// 到这里 e.p 肯定不是等于 expunged 了
e.storeLocked(&value) // 设置 e.p = &value
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value) // 如果只在dirty里面有,直接设置 e.p = &value
} else {
if !read.amended { // 如果目前 read 有全量数据,但是 read 和 dirty 都没有这个 key
m.dirtyLocked() // dirtyLocked 这个函数主要做的就是,把 read 里面的 e.p != nil && e.p != expunged 的元素 copy 一份赋值给 dirty
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value) // dirty 保存这个 kv
}
m.mu.Unlock()
}
总结下Store
主要做了下几件事:
fast-path
路径- 看下
read
中是否有这个key
,有的话尝试调用tryStore
,把设置的value
保存到entry
对象中去。 tryStore
里面会判断entry.p
是不是expunged
状态,是的话就不能设置,需要走slow-path
- 如果不是的话保存成功就直接返回。
- 看下
slow-path
路径- 会先加互斥锁
- 看下
read
中是否有这个key
,有的话尝试调用unexpungeLocked
,CAS
方式清除entry.p
的expunged
状态。如果清楚成功,会在dirty
里面添加这个数据。如果没有清楚成功,说明状态不是expunged
,可以直接更新read
的entry.p=&value
就行了。 - 不在
read
里面,在dirty
里面,直接设置entry.p=&value
就行了。 read
和dirty
都没有找到这个key
,先看下read
是不是有全量数据,是的话,就调用m.dirtyLocked
,把read
数据copy
一份到dirty
,并设置read.amended=true
,表示read
里面已经没有全量数据了,需要去drity
里面找。- 最后设置
m.dirty[key] = newEntry(value)
,dirty 保存这个 kv
在来看下Load
相关代码:
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended { // 如果 read 没找到,且 read 没有全量数据
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key] // 加锁以后,这里需要 double check一下
if !ok && read.amended {
e, ok = m.dirty[key] // 去 dirty map 读
m.missLocked() // 这里面对 misscount+1 ,然后看下是否需要把 dirty 全部给 read,然后设置 dirty 为 nil。
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load() // 如果 e.p != nil && e.p == expunged , 就把 e.p 的指向的值转成Interface返回
}
最后再来看下Delete
的怎么做的,Delete
其实比较简单,就是设置,在read
里面找到这个entry
然后设置e.p=nil
,如果在dirty
中就直接调用delete
方法删除这个key
。
func (m *Map) Delete(key interface{}) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete()
}
}
func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}
7.5 思考:dirty 能否不全量拷贝 read?
正常思路,为了节省内存,dirty
里面只存增量数据,可以吗?反向推理下如果dirty
只存增量的数据,那就不需要read
到dirty
的数据同步操作了,那也不需要expunged
状态了。所以read
的中元素e.p=nil
的时候,表示删除了,由于没有了read
到dirty
的复制,所以需要定期滤掉read
中删除的数据(e.p = nil
)并重新给read
赋值,那Store
的时候,如果read
的e.p=nil
的话就不能再更新了。因为定期过滤掉read
中删除的数据可能会把这个entry
给删除掉,导致这个key
对应的数据丢失了。所以Store
和Load
伪代码如下:
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.p != nil {
ok:= CAS(e.p, old, &value) // 注意这里要是 old = nil 时候不能再继续尝试 CAS
if ok{
return
}
// cas 失败继续往下走
}
m.mu.Lock() // 加锁
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if atomic.Load(e.p) != nil{
atomic.Store(e.p,&value)
return
}
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
}
// read 查到了 e 但是 e.p == nil
// read 和 dirty 都没查到
m.dirty[key] = newEntry(value)
noNilMap := fliterNilIFNeed(read.m) // 过滤掉read.m 中为空的数据,如果没有空数据直接返回nil
m.read.Store(readOnly{m: noNilMap, amended: true})
m.mu.Unlock()
}
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended || (ok && atomic.Load(e.p) == nil){
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended || (ok && atomic.Load(e.p) == nil){
e, ok = m.dirty[key]
m.misses++
if m.misses >= len(m.dirty) {
noNilMap := fliterNilIFNeed(read.m) // 过滤掉read.m 中为空的数据,如果没有空数据直接返回nil
allDataMap := merge(noNilMap,m.dirty)
m.read.Store(readOnly{m: allDataMap})
m.dirty = nil
m.misses = 0
}
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
这样实现逻辑上好像也没有问题,不过每次Load
和Store
一个read
中的nil
,都需要加锁,然后会过滤read
的nil
数据,都有数据的拷贝操作。如果在删除以后立即读的场景性能可能会非常差。
总结:dirty 全量拷贝 read 数据,就是好一个空间换时间的操作。