一、背景

sync.Mutex是我们常用到的一把锁。网上讲这个锁的文章也比较多,这里面主要是为了简单做个自我总结。

Sync.Mutex 慢路径底层依赖的是runtime_SemacquireMutexruntime_Semrelease,对这个不了解可以先去看下 runtime.semaphore

二、Sync.Mutex 源码

2.1 发展历史

sync.Mutex第一版 代码2008年的时候 @rsc 提交的。最早的实现比较简单,是通过简单的CAS信号量的方式来实现的。信号量具体可以参考 runtime-sema 这篇文章。

@dvyukov 2011年的时候,提交了第一次优化了 sync: improve Mutex to allow successive acquisitions,这一版中加入了mutexWoken唤醒状态和等待者计数的概念。

@dvyukov 2015年的时候,新增了第二次优化 sync: add active spinning to Mutex,这一版里面主要是加了自旋逻辑。

@dvyukov 2016年的时候,新增了第三次优化 sync: make Mutex more fair,这一版加入了饥饿模式,让锁在更公平一些。

2.2 Mutex结构分析

先看Mutex注释

// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.

翻译如下:

// 公平锁
//
// 锁有两种模式:正常模式和饥饿模式。
// 在正常模式下,所有的等待锁的 goroutine 都会存在一个先进先出的队列中(轮流被唤醒)
// 但是一个被唤醒的goroutine并不是直接获得锁,而是仍然需要和那些新请求锁的(new arrivial)
// 的goroutine竞争,而这其实是不公平的,因为新请求锁的goroutine有一个优势——它们正在CPU上
// 运行,并且数量可能会很多。所以一个被唤醒的goroutine拿到锁的概率是很小的。在这种情况下,
// 这个被唤醒的goroutine会加入到队列的头部。如果一个等待的goroutine有超过1ms
// 都没获取到锁,那么就会把锁转变为饥饿模式。
//
// 在饥饿模式中,锁的所有权会直接从释放锁(unlock)的goroutine转交给队列头的goroutine,
// 新请求锁的goroutine就算锁是空闲状态也不会去获取锁,并且也不会尝试自旋。它们只是排到队列的尾部。
//
// 如果一个goroutine获取到了锁之后,它会判断以下两种情况:
// 1. 它是队列中最后一个goroutine;
// 2. 它拿到锁所花的时间小于1ms;
// 以上只要有一个成立,它就会把锁转变回正常模式。

// 正常模式会有比较好的性能,因为即使有很多阻塞的等待锁的goroutine,
// 一个goroutine也可以尝试请求多次锁。
// 饥饿模式对于防止尾部延迟来说非常的重要。

在看看下Mutex结构体代码

type Mutex struct {
    state int32
    sema  uint32
}

const (
    mutexLocked = 1 << iota // 表示当前是否已经上锁,1是锁定,0是无锁
    mutexWoken // 当前是不是唤醒状态, 1 表示唤醒
    mutexStarving // 当前是否是饥饿状态,1 表示是饥饿
    mutexWaiterShift = iota // state 右移3位表示 Waiter的个数

    starvationThresholdNs = 1e6 // 等待时间超过这个数就变饥饿模式。
)

sema这个字段比较简单,就是调用runtime_SemacquireMutexruntime_Semrelease需要传的参数。state里面不同的位表示不同的含义,如下图所示:

image.png

2.3 Lock

// 如果已经上锁了,这里会阻塞当前的goroutine直到mutex可用
func (m *Mutex) Lock() {
    // 快路径,先尝试CAS把state从0改成锁定
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    
    // 慢路径
    m.lockSlow()
}

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        // old&(mutexLocked|mutexStarving) 表示保留Locked和Starving两个bit位上的数据,其他的全部清空
        // old&(mutexLocked|mutexStarving) == mutexLocked 表示是锁定状态但是不是饥饿状态。
        // runtime_canSpin主要判断能不能自旋,它做了几件事
        // 1. 自旋次数 < 4
        // 2. 必须是多核CPU 且 GOMAXPROCS>1
        // 3. P 并且本地运行队列为空.
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // 当前“唤醒” 标记为 0 ,然后还有其他g处于等待状态
           // CAS 尝试设置唤醒状态标记位 = 1
           // 告诉其他的 g ,我目前正在处于自旋抢锁状态
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            // runtime_doSpin 就是调用的 procyield(active_spin_cnt)
            // procyield 可以看 https://fanlv.fun/2022/10/05/runtime-mutex/#2-4-procyield-%E5%8A%9F%E8%83%BD
            runtime_doSpin()
            iter++
            old = m.state // 读取下 m.state 新的值,可能已经被其他 g 改变了。
            continue // 设置失败尝试继续自旋
        }
        
        new := old
        
        if old&mutexStarving == 0 {
           // 不是饥饿状态,尝试加锁
           // 是饥饿状态,就不用设置了,下面Waiter+1,然后乖乖排队去就行了
            new |= mutexLocked
        }
        
        // 如果mutexLocked 或者 mutexStarving = 1
        // Waiter 数量加一
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }

        
        // 如果当前是 mutexLocked = 1(是锁定状态)
        // 然后 starving = true (下面加锁等待时间超过1ms)
        // 这个时候需要把 mutexStarving 标记位设置为 1
        // 如果不是锁定状态,我就不设置了饥饿状态了。搞不好下面CAS一把设置就成功了。
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            // 如果已经设置为唤醒状态, 需要清除唤醒标记, 因为后面要么获得了锁,要么进入休眠.
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        
        // CAS 更新状态
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
               // 老的状态是没有加锁,也不是饥饿,那表示我们直接加锁成功了
               // 直接返回了
                break // locked the mutex with CAS
            }
            
            // 走到这里,表示之前的锁可能是加锁状态也可能是饥饿状态
            // 无论是否是加锁、或者饥饿状态,都要调用信号量,去排队。

            
            // waitStartTime != 0 表示是 sleep 以后被唤醒的 goroutine , queueLifo = true
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            
            // 请求信号量
            // queueLifo = true 会放到 semTable suodg队列的头部。
            // 信号量相关的可以看这个 https://fanlv.fun/2022/10/06/runtime-sema/
            // 如果没有可以用的信号量会阻塞到这句代码,底层其实是调用 gopark 休眠这个 g
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            
            // 这里表示有人释放了锁/信号量,我们这个g被唤醒了。
            // 虽然我们是在队列头部被唤醒了,但是如果这个时候,业务代码有新的请求过来,刚刚好有代码调用 Lock。我们这个刚刚被唤醒的g,是要跟新的Lock调用场景去抢锁的。
            // 等待时间超过 1ms ,直接设置starving=true
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state // 读取一下最新的 state 状态。现在也不知道被改成什么了。
            if old&mutexStarving != 0 { 
                // 当前是饥饿状态 我们也不用再去抢锁了,默认就是给我们执行了
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                  // 饥饿状态下不可能有(mutexWoken=0&& mutexLocked==0)这种情况
                  // mutexWaiter 也不可能 = 0 ,因为下面 mutexWaiter = 1 时候就退出了饥饿状态
                    throw("sync: inconsistent mutex state")
                }
                
                // 下面这个位操作,一个AddInt32 改变三个标记位状态,很骚,很难看懂。
                // 设置第一位是1,然后 waiter - 1
                // mutexLocked = 1  mutexWaiterShift = 3 delta = -7
                // delta 第三位是11111 0 0 1
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // 没有等待了,就要退出了
                    delta -= mutexStarving
                }
                // 修改state的状态。
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            // atomic.CompareAndSwapInt32(&m.state, old, new)
            // CAS 失败,重新读下当前状态,然后再循环来一次。
            old = m.state
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

2.4 Unlock

func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    // Fast path: CAS 取消无锁状态,0 就表示没有其他锁等待者了
    // 没有成功就进入 slow path
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        // Outlined slow path to allow inlining the fast path.
        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
        m.unlockSlow(new)
    }
}

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
       // new = m.state-mutexLocked
       // m.state&mutexLocked == 0 表示无锁。 
       // 如果是无锁,上面fast path就成功了.
       // 所以理论不会有这种情况
        fatal("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 { // 不是饥饿状态
        old := new
        for {

            // 如果锁没有waiter,或者锁有其他以下已发生的情况之一,则后面的工作就不用做了,直接返回
            // 1. 锁处于锁定状态,表示锁已经被其他goroutine获取了
            // 2. 锁处于被唤醒状态,这表明有等待goroutine被唤醒,不用再尝试唤醒其他goroutine
            // 3. 锁处于饥饿模式,那么锁之后会被直接交给等待队列队头goroutine
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }

            // 代码走到这,说明当前锁是空闲状态,等待队列中有waiter,且没有goroutine被唤醒
            
            // waiter - 1 然后设置唤醒状态 = 1
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {// 设置成功
                runtime_Semrelease(&m.sema, false, 1) // 唤醒一个信号量
                return
            }
            old = m.state // 对一下最新状态
        }
    } else {
     // 饥饿模式下,唤醒信号量等待队列的头部的sudog。
    // 饥饿状态过来的g都会放到信号量队列的尾部。
        runtime_Semrelease(&m.sema, true, 1)
    }
}

饥饿模式下做了个优化,会调用 readyWithTime 把队列头部的g放到pp.runnext里面。然后再调用goyield 把当前的g放到p runnable queue的尾部,然后调用 schedule 函数,这样就可以优先执行等待队列中的g了。

详情可以看这个CRsync: yield to the waiter when unlocking a starving mutex

三、总结

理解Sync.Mutex主要先理解 runtime.semaphore ,然后再根据注释理解一下normalstarving模式就好了。

没有一定的技术深度,要设计一个bugfree而且高性能的锁还是挺难的。理解是一回事,离自己要去实现一个锁还差十万八千里。