一、背景
sync.Mutex
是我们常用到的一把锁。网上讲这个锁的文章也比较多,这里面主要是为了简单做个自我总结。
Sync.Mutex
慢路径底层依赖的是runtime_SemacquireMutex
和runtime_Semrelease
,对这个不了解可以先去看下 runtime.semaphore 。
二、Sync.Mutex 源码
2.1 发展历史
sync.Mutex
第一版 代码 是2008
年的时候 @rsc 提交的。最早的实现比较简单,是通过简单的CAS
加信号量
的方式来实现的。信号量具体可以参考 runtime-sema 这篇文章。
@dvyukov 2011
年的时候,提交了第一次优化了 sync: improve Mutex to allow successive acquisitions,这一版中加入了mutexWoken
唤醒状态和等待者计数的概念。
@dvyukov 2015
年的时候,新增了第二次优化 sync: add active spinning to Mutex,这一版里面主要是加了自旋逻辑。
@dvyukov 2016
年的时候,新增了第三次优化 sync: make Mutex more fair,这一版加入了饥饿模式,让锁在更公平一些。
2.2 Mutex结构分析
先看Mutex
的 注释:
// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
翻译如下:
// 公平锁
//
// 锁有两种模式:正常模式和饥饿模式。
// 在正常模式下,所有的等待锁的 goroutine 都会存在一个先进先出的队列中(轮流被唤醒)
// 但是一个被唤醒的goroutine并不是直接获得锁,而是仍然需要和那些新请求锁的(new arrivial)
// 的goroutine竞争,而这其实是不公平的,因为新请求锁的goroutine有一个优势——它们正在CPU上
// 运行,并且数量可能会很多。所以一个被唤醒的goroutine拿到锁的概率是很小的。在这种情况下,
// 这个被唤醒的goroutine会加入到队列的头部。如果一个等待的goroutine有超过1ms
// 都没获取到锁,那么就会把锁转变为饥饿模式。
//
// 在饥饿模式中,锁的所有权会直接从释放锁(unlock)的goroutine转交给队列头的goroutine,
// 新请求锁的goroutine就算锁是空闲状态也不会去获取锁,并且也不会尝试自旋。它们只是排到队列的尾部。
//
// 如果一个goroutine获取到了锁之后,它会判断以下两种情况:
// 1. 它是队列中最后一个goroutine;
// 2. 它拿到锁所花的时间小于1ms;
// 以上只要有一个成立,它就会把锁转变回正常模式。
// 正常模式会有比较好的性能,因为即使有很多阻塞的等待锁的goroutine,
// 一个goroutine也可以尝试请求多次锁。
// 饥饿模式对于防止尾部延迟来说非常的重要。
在看看下Mutex
结构体代码:
type Mutex struct {
state int32
sema uint32
}
const (
mutexLocked = 1 << iota // 表示当前是否已经上锁,1是锁定,0是无锁
mutexWoken // 当前是不是唤醒状态, 1 表示唤醒
mutexStarving // 当前是否是饥饿状态,1 表示是饥饿
mutexWaiterShift = iota // state 右移3位表示 Waiter的个数
starvationThresholdNs = 1e6 // 等待时间超过这个数就变饥饿模式。
)
sema
这个字段比较简单,就是调用runtime_SemacquireMutex
和runtime_Semrelease
需要传的参数。state
里面不同的位表示不同的含义,如下图所示:
2.3 Lock
// 如果已经上锁了,这里会阻塞当前的goroutine直到mutex可用
func (m *Mutex) Lock() {
// 快路径,先尝试CAS把state从0改成锁定
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 慢路径
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// old&(mutexLocked|mutexStarving) 表示保留Locked和Starving两个bit位上的数据,其他的全部清空
// old&(mutexLocked|mutexStarving) == mutexLocked 表示是锁定状态但是不是饥饿状态。
// runtime_canSpin主要判断能不能自旋,它做了几件事
// 1. 自旋次数 < 4
// 2. 必须是多核CPU 且 GOMAXPROCS>1
// 3. P 并且本地运行队列为空.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 当前“唤醒” 标记为 0 ,然后还有其他g处于等待状态
// CAS 尝试设置唤醒状态标记位 = 1
// 告诉其他的 g ,我目前正在处于自旋抢锁状态
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// runtime_doSpin 就是调用的 procyield(active_spin_cnt)
// procyield 可以看 https://fanlv.fun/2022/10/05/runtime-mutex/#2-4-procyield-%E5%8A%9F%E8%83%BD
runtime_doSpin()
iter++
old = m.state // 读取下 m.state 新的值,可能已经被其他 g 改变了。
continue // 设置失败尝试继续自旋
}
new := old
if old&mutexStarving == 0 {
// 不是饥饿状态,尝试加锁
// 是饥饿状态,就不用设置了,下面Waiter+1,然后乖乖排队去就行了
new |= mutexLocked
}
// 如果mutexLocked 或者 mutexStarving = 1
// Waiter 数量加一
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果当前是 mutexLocked = 1(是锁定状态)
// 然后 starving = true (下面加锁等待时间超过1ms)
// 这个时候需要把 mutexStarving 标记位设置为 1
// 如果不是锁定状态,我就不设置了饥饿状态了。搞不好下面CAS一把设置就成功了。
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// 如果已经设置为唤醒状态, 需要清除唤醒标记, 因为后面要么获得了锁,要么进入休眠.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
// CAS 更新状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
// 老的状态是没有加锁,也不是饥饿,那表示我们直接加锁成功了
// 直接返回了
break // locked the mutex with CAS
}
// 走到这里,表示之前的锁可能是加锁状态也可能是饥饿状态
// 无论是否是加锁、或者饥饿状态,都要调用信号量,去排队。
// waitStartTime != 0 表示是 sleep 以后被唤醒的 goroutine , queueLifo = true
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 请求信号量
// queueLifo = true 会放到 semTable suodg队列的头部。
// 信号量相关的可以看这个 https://fanlv.fun/2022/10/06/runtime-sema/
// 如果没有可以用的信号量会阻塞到这句代码,底层其实是调用 gopark 休眠这个 g
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 这里表示有人释放了锁/信号量,我们这个g被唤醒了。
// 虽然我们是在队列头部被唤醒了,但是如果这个时候,业务代码有新的请求过来,刚刚好有代码调用 Lock。我们这个刚刚被唤醒的g,是要跟新的Lock调用场景去抢锁的。
// 等待时间超过 1ms ,直接设置starving=true
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state // 读取一下最新的 state 状态。现在也不知道被改成什么了。
if old&mutexStarving != 0 {
// 当前是饥饿状态 我们也不用再去抢锁了,默认就是给我们执行了
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
// 饥饿状态下不可能有(mutexWoken=0&& mutexLocked==0)这种情况
// mutexWaiter 也不可能 = 0 ,因为下面 mutexWaiter = 1 时候就退出了饥饿状态
throw("sync: inconsistent mutex state")
}
// 下面这个位操作,一个AddInt32 改变三个标记位状态,很骚,很难看懂。
// 设置第一位是1,然后 waiter - 1
// mutexLocked = 1 mutexWaiterShift = 3 delta = -7
// delta 第三位是11111 0 0 1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// 没有等待了,就要退出了
delta -= mutexStarving
}
// 修改state的状态。
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
// atomic.CompareAndSwapInt32(&m.state, old, new)
// CAS 失败,重新读下当前状态,然后再循环来一次。
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
2.4 Unlock
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: CAS 取消无锁状态,0 就表示没有其他锁等待者了
// 没有成功就进入 slow path
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
// new = m.state-mutexLocked
// m.state&mutexLocked == 0 表示无锁。
// 如果是无锁,上面fast path就成功了.
// 所以理论不会有这种情况
fatal("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 { // 不是饥饿状态
old := new
for {
// 如果锁没有waiter,或者锁有其他以下已发生的情况之一,则后面的工作就不用做了,直接返回
// 1. 锁处于锁定状态,表示锁已经被其他goroutine获取了
// 2. 锁处于被唤醒状态,这表明有等待goroutine被唤醒,不用再尝试唤醒其他goroutine
// 3. 锁处于饥饿模式,那么锁之后会被直接交给等待队列队头goroutine
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 代码走到这,说明当前锁是空闲状态,等待队列中有waiter,且没有goroutine被唤醒
// waiter - 1 然后设置唤醒状态 = 1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {// 设置成功
runtime_Semrelease(&m.sema, false, 1) // 唤醒一个信号量
return
}
old = m.state // 对一下最新状态
}
} else {
// 饥饿模式下,唤醒信号量等待队列的头部的sudog。
// 饥饿状态过来的g都会放到信号量队列的尾部。
runtime_Semrelease(&m.sema, true, 1)
}
}
饥饿模式下做了个优化,会调用 readyWithTime 把队列头部的g
放到pp.runnext
里面。然后再调用goyield 把当前的g
放到p runnable queue
的尾部,然后调用 schedule 函数,这样就可以优先执行等待队列中的g
了。
详情可以看这个CR
:sync: yield to the waiter when unlocking a starving mutex
三、总结
理解Sync.Mutex
主要先理解 runtime.semaphore ,然后再根据注释理解一下normal
和starving
模式就好了。
没有一定的技术深度,要设计一个bugfree
而且高性能的锁还是挺难的。理解是一回事,离自己要去实现一个锁还差十万八千里。