一、 Go 同步原语

image.png

  • sync.Cond -> notifyList -> runtime.mutexatomic
  • sync.WaitGroup -> atomicruntime.sema
  • sync.Map -> sync.Mutexatomic
  • sync.Once -> sync.Mutexatomic
  • sync.RWMutex -> sync.Mutexatomic
  • sync.Mutex -> runtime.sema
  • channel -> runtime.mutex

sync.Mutex和runtime.mutext区别:简单说就是sync.Mutex是用户层的锁,Lock抢锁失败会造成goroutine阻塞(会调用gopark)。runtime.mutex 是给 runtime使用的锁,Lock抢锁失败,会造成m阻塞(线程阻塞,底层调用的futex)。

二、Atomic

Golang中的Atomic主要保证了三件事,原子性、可见性、有序性

我们先看下Go的源码里面AtomicAPI,主要包括SwapCASAddLoadStorePointer几类,在IA64 CPU上对应的汇编指令如下:

关于LOCK prefix和XCHG指令英特尔开发人员手册 section 8.2.5中,我们找到了如下的解释:

For the Intel486 and Pentium processors, the LOCK# signal is always asserted on the bus during a LOCK operation, even if the area of memory being locked is cached in the processor.

For the P6 and more recent processor families, if the area of memory being locked during a LOCK operation is cached in the processor that is performing the LOCK operation as write-back memory and is completely contained in a cache line, the processor may not assert the LOCK# signal on the bus. Instead, it will modify the memory location internally and allow it’s cache coherency mechanism to ensure that the operation is carried out atomically. This operation is called “cache locking.” The cache coherency mechanism automatically prevents two or more processors that have cached the same area of memory from simultaneously modifying data in that area.

The I/O instructions, locking instructions, the LOCK prefix, and serializing instructions force stronger orderingon the processor.

Synchronization mechanisms in multiple-processor systems may depend upon a strong memory-ordering model. Here, a program can use a locking instruction such as the XCHG instruction or the LOCK prefix to ensure that a read-modify-write operation on memory is carried out atomically. Locking operations typically operate like I/O operations in that they wait for all previous instructions to complete and for all buffered writes to drain to memory (see Section 8.1.2, “Bus Locking”).

从描述中,我们了解到:LOCK prefix 和 XCHG 指令前缀提供了强一致性的内(缓)存读写保证,可以保证 LOCK 之后的指令在带 LOCK 前缀的指令执行之后才会执行。同时,我们在手册中还了解到,现代的 CPU 中的 LOCK 操作并不是简单锁 CPU 和主存之间的通讯总线, Intel 在 cache 层实现了这个 LOCK 操作,此因此我们也无需为 LOCK 的执行效率担忧。

PS:Java中的volatile关键字也是基于 Lock prefix 实现的。

从上面可以看到SwapCASAddStore 都是基于LOCK prefixXCHG指令实现的,他能保证缓存读写的强一致性。

三、runtime.mutex

3.1 mutex 结构

runtimemutex 定义在runtime/runtime2.go中。定义如下:

type mutex struct {
    // Empty struct if lock ranking is disabled, otherwise includes the lock rank
    lockRankStruct
    // Futex-based impl treats it as uint32 key,
    // while sema-based impl as M* waitm.
    // Used to be a union, but unions break precise GC.
    key uintptr // lock、unlock、sleeping 三种状态 sleep 阻塞当前 m ,然后排队等系统唤醒
}

lockRankStruct这个是给runtime死锁 检测用的,只有设置了GOEXPERIMENT=staticlockrankinglockRankStruct才会有具体实现,否则的话这个结构体只会是个空Struct,空的Struct只要不是最后一个字段是不会占用任何空间的(详见final-zero-size-field),具体lockrankCR,可以看这个提交lookrank主要通过加锁顺序 来判断是否会死锁,如果加锁顺序不符合预期就会throw异常(注意这个不是panic不能被recover)。具体代码如下:

// checkRanks checks if goroutine g, which has mostly recently acquired a lock
// with rank 'prevRank', can now acquire a lock with rank 'rank'.
//
//go:systemstack
func checkRanks(gp *g, prevRank, rank lockRank) {
    rankOK := false
    if rank < prevRank {
        // If rank < prevRank, then we definitely have a rank error
        rankOK = false
    } else if rank == lockRankLeafRank {
        // If new lock is a leaf lock, then the preceding lock can
        // be anything except another leaf lock.
        rankOK = prevRank < lockRankLeafRank
    } else {
        // We've now verified the total lock ranking, but we
        // also enforce the partial ordering specified by
        // lockPartialOrder as well. Two locks with the same rank
        // can only be acquired at the same time if explicitly
        // listed in the lockPartialOrder table.
        list := lockPartialOrder[rank]
        for _, entry := range list {
            if entry == prevRank {
                rankOK = true
                break
            }
        }
    }
    if !rankOK {
        printlock()
        println(gp.m.procid, " ======")
        printHeldLocks(gp)
        throw("lock ordering problem")
    }
}

3.2 lock 实现

macOSWindowsruntime.mutex是基于pthread_mutex来实现的,详见 lock_sema。而在Linuxlock是基于futex来实现的,详见 lock_futex。这里我们只关注Linux下的实现。

func lock(l *mutex) {
    lockWithRank(l, getLockRank(l))
}

func lockWithRank(l *mutex, rank lockRank) {
    lock2(l)
}

func lock2(l *mutex) {
    gp := getg() // 获取当前的 goroutine

    if gp.m.locks < 0 {
        throw("runtime·lock: lock count")
    }
    gp.m.locks++ // g绑定的m的lock数量加1

      // l.key 只有三种状态 mutex_unlocked、mutex_locked、mutex_sleeping
      // mutex_unlocked 表示无锁状态
      // mutex_locked 正常加锁状态
      // mutex_sleeping 表示有线程调用futexsleep阻塞了
    // 设置状态为 mutex_locked ,**注意这里是直接设置,不是CAS**
    v := atomic.Xchg(key32(&l.key), mutex_locked)
    if v == mutex_unlocked { // 之前的状态是 mutex_unlocked 表示加锁成功了
        return
    }

     // 走到这里,表示没有加锁成功
     // 这里 v 不是 mutex_unlocked 所以只能是 MUTEX_LOCKED 或 MUTEX_SLEEPING
    // 所以 wait 可能是 MUTEX_LOCKED 或 MUTEX_SLEEPING
    // 如果我们将 l->key 从 MUTEX_SLEEPING 更改为其他值,我们必须小心在返回之前将其更改回 MUTEX_SLEEPING
    wait := v

    // 多核情况下尝试自旋4次,单个就不用自旋了
    spin := 0
    if ncpu > 1 {
        spin = active_spin // active_spin = 4
    }
    for {
        for i := 0; i < spin; i++ { 
                // 注意我们上面设置了 l.key = mutex_locked
                // 这里如果 key = mutex_unlocked,表示肯定是其他持有锁的线程进行了锁的释放
            for l.key == mutex_unlocked {
                    // CAS 抢锁成功直接返回,否则再尝试自旋
                    // 这里 wait 是 MUTEX_LOCKED 或 MUTEX_SLEEPING
                if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
                    return
                }
            }

            procyield(active_spin_cnt) // 执行 active_spin_cnt = 30 次 PAUSE指令
        }

        // passive_spin = 1 ,再尝试抢一次锁。
        for i := 0; i < passive_spin; i++ {
            for l.key == mutex_unlocked {
                if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
                    return
                }
            }
            osyield() // CAS 失败,系统调用`sched_yield`让出CPU
        }

        
        // 直接设置为 mutex_sleeping 状态
        v = atomic.Xchg(key32(&l.key), mutex_sleeping)
        if v == mutex_unlocked {
               // 注意这里,如果是从 mutex_unlocked => mutex_sleeping 也认为是加锁成功,然后直接返回,不会走futexsleep阻塞当前线程。
               // 造成的影响就是,解锁的时候执行,执行 futexwakeup了,但是没有需要唤醒的线程(功能上应该没有影响)
            return 
        }
        wait = mutex_sleeping // 设置 wait 状态为 mutex_sleeping 下次循环会设置为 mutex_sleeping 状态
        // l.key == mutex_sleeping 就 sleep,直到被唤醒。
        // 不然继续循环(说明在atomic.Xchg mutex_sleeping设置完这短短时间内,其他线程设置又重新设置了l.key状态比如设置为了mutex_locked或者mutex_unlocked。这个时候不会进入sleep,而是会去循环执行步骤1。)
        // 如果 *addr == val { 当前线程进入sleep状态 } ;不会阻塞超过ns,ns<0表示永远休眠
         // futexsleep(addr *uint32, val uint32, ns int64)
         // 阻塞 m
        futexsleep(key32(&l.key), mutex_sleeping, -1)
    }
}

runtime.mutext.lock.png

原图点我

lock主要步骤如下:

  1. 调用atomic.Xchg直接设置key的状态为mutex_locked(注意这里不是CAS,是直接设置)。
  2. 根据atomic.Xchg返回的状态v,来判断是否加锁成功了,如果v = mutex_unlocked表示加锁成功了(这个时候可以直接返回了)。否则就是加锁失败,这个时候v可能是MUTEX_LOCKED或者MUTEX_SLEEPING的状态。
  3. 如果是多核的话,会尝试自旋4,把l.key从状态mutex_unlocked改成wait。注意,我们在步骤1里面直接设置了keymutex_locked,如果这里l.key = mutex_unlocked,只能说明是其他持有锁的线程释放了锁。这个CAS成功,表示加锁成功。如果加锁失败,会调用下procyield优化下自旋性能。
  4. 自旋4次失败,会再尝试一次CAS,失败的话会调用osyield让出CPU
  5. osyield完成以后,继续执行,这个时候直接调用atomic.Xchg设置l.key = mutex_sleeping,表示当前准备调用futexsleep进行sleep
  6. 使用系统调用futexsleep,如果l.key == mutex_sleeping,则当前线程进入睡眠状态,直到有其他地方调用futexwakeup来唤醒。如果这个时候l.key != mutex_sleeping,说明在步骤5设置完这短短时间内,其他线程设置又重新设置了l.key状态比如设置为了mutex_locked或者mutex_unlocked。这个时候不会进入sleep,而是会去循环执行步骤1

3.3 unlock 实现

func unlock(l *mutex) {
    unlockWithRank(l)
}

func unlockWithRank(l *mutex) {
    unlock2(l)
}

func unlock2(l *mutex) {
    // 设置 l.key = mutex_unlocked
    v := atomic.Xchg(key32(&l.key), mutex_unlocked)
    if v == mutex_unlocked {// 重复调用 unlock,直接抛出异常。
        throw("unlock of unlocked lock")
    }
    if v == mutex_sleeping { 
        // 之前的状态是 mutex_sleeping,说明其他有线程在`sleep`,唤醒一个`sleep`的对象。
        // 如果任何线程阻塞在addr上,则唤醒至少cnt个阻塞的任务
        // futexwakeup(addr *uint32, cnt uint32) 
        futexwakeup(key32(&l.key), 1)
    }

    gp := getg()
    gp.m.locks--
    if gp.m.locks < 0 {
        throw("runtime·unlock: lock count")
    }
    if gp.m.locks == 0 && gp.preempt { // restore the preemption request in case we've cleared it in newstack
        gp.stackguard0 = stackPreempt
    }
}

image.png

unlock 实现总结:

  1. 调用atomic.Xchg设置l.key = mutex_unlocked
  2. 如果设置之前的状态就是mutex_unlocked,直接抛异常程序退出。
  3. 如果之前状态是mutex_sleeping,则唤醒一个阻塞在futexsleep的线程。
  4. m的锁数量减一,如果锁数量等0且当前g是被抢占状态,要标记gp.stackguard0stackPreempt,下次发生函数调用的时候,会主动让出这个g

四、runtime.semaphore

4.1 信号量 P/V 操作

  • P原语P是荷兰语Proberen(测试)的首字母。为阻塞原语,负责把当前进程由运行状态转换为阻塞状态,直到另外一个进程唤醒它。操作为:申请一个空闲资源(把信号量减1),若成功,则退出;若失败,则该进程被阻塞;
  • V原语V是荷兰语Verhogen(增加)的首字母。为唤醒原语,负责把一个被阻塞的进程唤醒,它有一个参数表,存放着等待被唤醒的进程信息。操作为:释放一个被占用的资源(把信号量加1),如果发现有被阻塞的进程,则选择一个唤醒之。

semaphore基本操作,在 src/sync/runtime.go 定义了下面几个方法:

// Semacquire等待*s > 0,然后原子递减它。
// 它是一个简单的睡眠原语,用于同步库使用。
func runtime_Semacquire(s *uint32)

// SemacquireMutex类似于Semacquire,用来阻塞互斥的对象
// 如果lifo为true,waiter将会被插入到队列的头部
// skipframes是跟踪过程中要省略的帧数
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)

// Semrelease会自动增加*s并通知一个被Semacquire阻塞的等待的goroutine
// 它是一个简单的唤醒原语,用于同步库
// 如果handoff为true, 传递信号到队列头部的waiter
// skipframes是跟踪过程中要省略的帧数,从这里开始计算
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

这个几个函数具体的实现在 src/runtime/sema.go

//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
    semacquire1(addr, false, semaBlockProfile, 0)
}

//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
func poll_runtime_Semacquire(addr *uint32) {
    semacquire1(addr, false, semaBlockProfile, 0)
}

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
    semrelease1(addr, handoff, skipframes)
}

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
    semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}

//go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease
func poll_runtime_Semrelease(addr *uint32) {
    semrelease(addr)
}

4.2 semtable

TreapBinary Search Tree+Heap的组合。

semTable 是一个长度为251的全局数组,每个semaRoot指向一个treap,主要用于存放阻塞在信号量(semaphore)上的sudog

var semtable [semTabSize]struct {
    root semaRoot
    pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte // 防止 flase sharing
}

semaRoot最早是双向链表,在某些场景下性能比较查,所以优化成了treap,具体可以看 CR37103

优化之后semtable存储的结构大概是这样:

image.png

4.3 semacquire 获取信号量

semaphore获取信号量操作步骤如下:

  1. 调用runtime_SemacquireMutex (比如sync.Mutex.Lock()场景)
  2. sync_runtime_SemacquireMutex
  3. semacquire1
  4. CAS(addr, v, v-1)状态成功就返回,失败继续往下
  5. 缓存池拿一个sudog,或者new一个sudogacquireSudog
  6. g相关的数据存到sudog中。
  7. 循环
    • 对当前semaRoot加锁
    • nwait++
    • cansemacquire/CAS(addr, v, v-1)
    • sudog加到semaRoottreap中/root.queue()
    • 可能要调整树的结构(左旋rotateRight/右旋rotateLeft)防止树退化为链表
    • goparkunlock让出当前g的执行
    • 被唤醒
    • CAS成功或者s.ticket != 0(当前没有其他竞争者了) 认为成功
    • 否则继续循环
  8. 最后释放sudog/releaseSudog

具体源码如下

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
    gp := getg()
    if gp != gp.m.curg {// 判断下g是不是当前m绑定的g
        throw("semacquire not on the G stack")
    }

    // CAS(addr, v, v-1) 成功就直接成功否则一直循环,如果 *addr = 0 返回 false 走下面 slowpath
    if cansemacquire(addr) {
        return
    }
    
    // 走到这里表示当前g要阻塞
    // 下面逻辑,就是把g封装成sudog,然后存到semTable中。
    // 最后调用 gopark 让出当前g


    s := acquireSudog() // 这个先去 p.sudogcache 拿,没拿到去全局sudohgcache拿
    root := semroot(addr) // 根据sema的地址,算出用到semTable中哪个semaRoot
    t0 := int64(0)
    s.releasetime = 0
    s.acquiretime = 0
    s.ticket = 0
    if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
        t0 = cputicks()
        s.releasetime = -1
    }
    if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
        if t0 == 0 {
            t0 = cputicks()
        }
        s.acquiretime = t0
    }
    for {
        lockWithRank(&root.lock, lockRankRoot) // 加锁,方面下面修改 semaRoot的属性
        // 对等待的计数加1,这样sema_release时候不会走快路径
        atomic.Xadd(&root.nwait, 1)
        // 看下是否有其他的goroutine调用了sema_release
        // 在尝试 CAS(addr, v, v-1) 试下
        if cansemacquire(addr) {
            atomic.Xadd(&root.nwait, -1)
            unlock(&root.lock)
            break
        }
        
        // 这里,就是这个新的 sudog 加到 semaTable中的
        root.queue(addr, s, lifo)
        goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes) // 这你会让出当前的goroutine
        
        
        // goroutine 被调度回来了,表示有 sema_release 以后唤醒了这个 sema
        // s.ticket != 0 表示是等待队列头部的 sudog,当前队列只有一个sudog了,所以直接结束
        // CAS(addr, v, v-1) 成功也结束
        if s.ticket != 0 || cansemacquire(addr) {
            break
        }
    }
    if s.releasetime > 0 {
        blockevent(s.releasetime-t0, 3+skipframes)
    }
    releaseSudog(s) // 释放 sudog
}


func cansemacquire(addr *uint32) bool {
    for {
        v := atomic.Load(addr)
        if v == 0 {
            return false
        }
        if atomic.Cas(addr, v, v-1) {
            return true
        }
    }
}

func acquireSudog() *sudog {
    // 设置禁止抢占
    mp := acquirem()
    pp := mp.p.ptr()
    //当前本地sudog缓存没有了,则去全局缓存中拉取一批
    if len(pp.sudogcache) == 0 {
        lock(&sched.sudoglock)
        // 首先尝试从全局缓存中获取sudog,直到本地容量达到50%
        for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {
            s := sched.sudogcache
            sched.sudogcache = s.next
            s.next = nil
            pp.sudogcache = append(pp.sudogcache, s)
        }
        unlock(&sched.sudoglock)
        // 如果全局缓存为空,则分配创建一个新的sudog
        if len(pp.sudogcache) == 0 {
            pp.sudogcache = append(pp.sudogcache, new(sudog))
        }
    }
    n := len(pp.sudogcache)
    s := pp.sudogcache[n-1]
    pp.sudogcache[n-1] = nil
    pp.sudogcache = pp.sudogcache[:n-1]
    if s.elem != nil {
        throw("acquireSudog: found s.elem != nil in cache")
    }
    //解除抢占限制
    releasem(mp)
    return s
}

4.4 semrelease 释放信号量

semaphore释放信号量操作步骤如下:

  1. 调用runtime_Semrelease,比如sync.Mutex.Unlock()场景。
  2. sync_runtime_Semrelease
  3. semrelease1
  4. 原子*addr++
  5. nwait=0,表示没有阻塞在这个信号量上的g直接返回。
  6. 有阻塞的gsemTable中找到对应的semaRoot,然后semaRoot`加锁。
  7. 再次checknwait=0,等于0直接返回。
  8. 拿到semaaddressemTable中对应的队列头部的seamRoot
  9. dequeue是否需要调整左旋rotateLeft或者右旋rotateRight调整树结构。
  10. readyWithTime,调用goread唤醒sudog绑定的g
  11. goyield

semrelease源码如下:

func semrelease1(addr *uint32, handoff bool, skipframes int) {
    root := semroot(addr)
    atomic.Xadd(addr, 1)

    // 没有等待者直接返回
    if atomic.Load(&root.nwait) == 0 {
        return
    }

    //查找一个等待着并唤醒它
    lockWithRank(&root.lock, lockRankRoot)
    if atomic.Load(&root.nwait) == 0 {
        //计数已经被其他goroutine消费,所以不需要唤醒其他goroutine
        unlock(&root.lock)
        return
    }
    s, t0 := root.dequeue(addr) //查找第一个出现的addr ( s.elem == unsafe.Pointer(addr) )
    if s != nil {
        atomic.Xadd(&root.nwait, -1)
    }
    unlock(&root.lock)
    if s != nil { // 可能比较慢 甚至被挂起所以先unlock
        acquiretime := s.acquiretime
        if acquiretime != 0 {
            mutexevent(t0-acquiretime, 3+skipframes)
        }
        if s.ticket != 0 {
            throw("corrupted semaphore ticket")
        }
        if handoff && cansemacquire(addr) {
            s.ticket = 1
        }
        
        readyWithTime(s, 5+skipframes) // -> goready(s.g,5)标记runnable 等待被重新调度
        if s.ticket == 1 && getg().m.locks == 0 {
            // 直接切换G
            // readyWithTime已经将等待的G作为runnext放到当前的P
            // 我们现在调用调度器可以立即执行等待的G
            // 注意waiter继承了我们的时间片:这是希望避免在P上无限得进行激烈的信号量竞争
            // goyield类似于Gosched,但是它是发送“被强占”的跟踪事件,更重要的是,将当前G放在本地runq,而不是全局队列。
            // 我们仅在饥饿状态下执行此操作(handoff=true)
            // 我们等待进入饥饿状体,然后开始进行ticket和P的手递手交接
            // See issue 33747 for discussion.
            // https://go-review.googlesource.com/c/go/+/206180
            
            goyield() // -> goyield -> goyield_m 
        }
    }
}


// goyield is like Gosched, but it:
// - does not emit a GoSched trace event
// - puts the current G on the runq of the current P instead of the globrunq
func goyield() {
    checkTimeouts()
    mcall(goyield_m)
}

func goyield_m(gp *g) {
    pp := gp.m.p.ptr()
    casgstatus(gp, _Grunning, _Grunnable)
    dropg()
    runqput(pp, gp, false)
    schedule()
}

4.5、总结

获取信号量操作主要尝试把sema地址CAS方式原子减1,成就直接返回,失败以后会把当前g打包成sudog然后保存到semTable,然后调用gopark让出当前的goroutine

释放信号量操作就是吧sema地址加1,然后看有没有等待中的g,没有直接返回,有的话去semaTable的等待队列取出然后调用goready唤醒对应的g

主要理解semaTable里面存储sudog的方式就好了。

五、Sync.Mutex 源码

5.1 发展历史

sync.Mutex第一版 代码2008年的时候 @rsc 提交的。最早的实现比较简单,是通过简单的CAS信号量(runtime-sema)的方式来实现的。

@dvyukov 2011年的时候,提交了第一次优化了 sync: improve Mutex to allow successive acquisitions,这一版中加入了mutexWoken唤醒状态和等待者计数的概念。

@dvyukov 2015年的时候,新增了第二次优化 sync: add active spinning to Mutex,这一版里面主要是加了自旋逻辑。

@dvyukov 2016年的时候,新增了第三次优化 sync: make Mutex more fair,这一版加入了饥饿模式,让锁在更公平一些。

5.2 Mutex结构分析

先看Mutex注释

// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.

翻译如下:

// 公平锁
//
// 锁有两种模式:正常模式和饥饿模式。
// 在正常模式下,所有的等待锁的 goroutine 都会存在一个先进先出的队列中(轮流被唤醒)
// 但是一个被唤醒的goroutine并不是直接获得锁,而是仍然需要和那些新请求锁的(new arrivial)
// 的goroutine竞争,而这其实是不公平的,因为新请求锁的goroutine有一个优势——它们正在CPU上
// 运行,并且数量可能会很多。所以一个被唤醒的goroutine拿到锁的概率是很小的。在这种情况下,
// 这个被唤醒的goroutine会加入到队列的头部。如果一个等待的goroutine有超过1ms
// 都没获取到锁,那么就会把锁转变为饥饿模式。
//
// 在饥饿模式中,锁的所有权会直接从释放锁(unlock)的goroutine转交给队列头的goroutine,
// 新请求锁的goroutine就算锁是空闲状态也不会去获取锁,并且也不会尝试自旋。它们只是排到队列的尾部。
//
// 如果一个goroutine获取到了锁之后,它会判断以下两种情况:
// 1. 它是队列中最后一个goroutine;
// 2. 它拿到锁所花的时间小于1ms;
// 以上只要有一个成立,它就会把锁转变回正常模式。

// 正常模式会有比较好的性能,因为即使有很多阻塞的等待锁的goroutine,
// 一个goroutine也可以尝试请求多次锁。
// 饥饿模式对于防止尾部延迟来说非常的重要。

在看看下Mutex结构体代码

type Mutex struct {
    state int32
    sema  uint32
}

const (
    mutexLocked = 1 << iota // 表示当前是否已经上锁,1是锁定,0是无锁
    mutexWoken // 当前是不是唤醒状态, 1 表示唤醒
    mutexStarving // 当前是否是饥饿状态,1 表示是饥饿
    mutexWaiterShift = iota // state 右移3位表示 Waiter的个数

    starvationThresholdNs = 1e6 // 等待时间超过这个数就变饥饿模式。
)

sema这个字段比较简单,就是调用runtime_SemacquireMutexruntime_Semrelease需要传的参数。state里面不同的位表示不同的含义,如下图所示:

image.png

5.3 Lock

// 如果已经上锁了,这里会阻塞当前的goroutine直到mutex可用
func (m *Mutex) Lock() {
    // 快路径,先尝试CAS把state从0改成锁定
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    
    // 慢路径
    m.lockSlow()
}

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        // old&(mutexLocked|mutexStarving) 表示保留Locked和Starving两个bit位上的数据,其他的全部清空
        // old&(mutexLocked|mutexStarving) == mutexLocked 表示是锁定状态但是不是饥饿状态。
        // runtime_canSpin主要判断能不能自旋,它做了几件事
        // 1. 自旋次数 < 4
        // 2. 必须是多核CPU 且 GOMAXPROCS>1
        // 3. P 并且本地运行队列为空.
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // 当前“唤醒” 标记为 0 ,然后还有其他g处于等待状态
           // CAS 尝试设置唤醒状态标记位 = 1
           // 告诉其他的 g ,我目前正在处于自旋抢锁状态
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            // runtime_doSpin 就是调用的 procyield(active_spin_cnt)
            // procyield 可以看 https://fanlv.fun/2022/10/05/runtime-mutex/#2-4-procyield-%E5%8A%9F%E8%83%BD
            runtime_doSpin()
            iter++
            old = m.state // 读取下 m.state 新的值,可能已经被其他 g 改变了。
            continue // 设置失败尝试继续自旋
        }
        
        new := old
        
        if old&mutexStarving == 0 {
           // 不是饥饿状态,尝试加锁
           // 是饥饿状态,就不用设置了,下面Waiter+1,然后乖乖排队去就行了
            new |= mutexLocked
        }
        
        // 如果mutexLocked 或者 mutexStarving = 1
        // Waiter 数量加一
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }

        
        // 如果当前是 mutexLocked = 1(是锁定状态)
        // 然后 starving = true (下面加锁等待时间超过1ms)
        // 这个时候需要把 mutexStarving 标记位设置为 1
        // 如果不是锁定状态,我就不设置了饥饿状态了。搞不好下面CAS一把设置就成功了。
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            // 如果已经设置为唤醒状态, 需要清除唤醒标记, 因为后面要么获得了锁,要么进入休眠.
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        
        // CAS 更新状态
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
               // 老的状态是没有加锁,也不是饥饿,那表示我们直接加锁成功了
               // 直接返回了
                break // locked the mutex with CAS
            }
            
            // 走到这里,表示之前的锁可能是加锁状态可能是饥饿状态
            // 无论是否是加锁、或者饥饿状态,都要调用信号量,去排队。

            
            // waitStartTime != 0 表示是 sleep 以后被唤醒的 goroutine
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            
            // 请求信号量
            // queueLifo = true 会放到 semTable suodg队列的头部。
            // 如果没有可以用的信号量会阻塞到这句代码,底层其实是调用 gopark 休眠这个 g
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            
            // 这里表示有人释放了锁/信号量,我们这个g被唤醒了。
            // 虽然我们是在队列头部被唤醒了,但是如果这个时候,业务代码有新的请求过来,刚刚好有代码调用 Lock。我们这个刚刚被唤醒的g,是要跟新的Lock调用场景去抢锁的。
            // 等待时间超过 1ms ,直接设置starving=true
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state // 读取一下最新的 state 状态。现在也不知道被改成什么了。
            if old&mutexStarving != 0 { 
                // 当前是饥饿状态 我们也不用再去抢锁了,默认就是给我们执行了
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                  // 饥饿状态下不可能有(mutexWoken=0&& mutexLocked==0)这种情况
                  // mutexWaiter 也不可能 = 0 ,因为下面 mutexWaiter = 1 时候就退出了饥饿状态
                    throw("sync: inconsistent mutex state")
                }
                
                // 下面这个位操作,一个AddInt32 改变三个标记位状态.
                // 设置第一位是1,然后 waiter - 1
                // mutexLocked = 1  mutexWaiterShift = 3 delta = -7
                // delta 第三位是 0 0 1
                delta := int32(mutexLocked - 1<<mutexWaiterShift) 
                if !starving || old>>mutexWaiterShift == 1 {
                    // 没有等待了退出饥饿状态
                    delta -= mutexStarving
                }
                // 修改state的状态。
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true // 设置被唤醒,因为之前调用了runtime_SemacquireMutex,执行到这表示重新被调度到了。
            iter = 0
        } else {
            // atomic.CompareAndSwapInt32(&m.state, old, new)
            // CAS 失败,重新读下当前状态,然后再循环来一次。
            old = m.state
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

5.4 Unlock

func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    // Fast path: CAS 取消无锁状态,0 就表示没有其他锁等待者了
    // 没有成功就进入 slow path
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        // Outlined slow path to allow inlining the fast path.
        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
        m.unlockSlow(new)
    }
}

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
       // new = m.state-mutexLocked
       // m.state&mutexLocked == 0 表示无锁。 
       // 如果是无锁,上面fast path就成功了.
       // 所以理论不会有这种情况
        fatal("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 { // 不是饥饿状态
        old := new
        for {

            // 如果锁没有waiter,或者锁有其他以下已发生的情况之一,则后面的工作就不用做了,直接返回
            // 1. 锁处于锁定状态,表示锁已经被其他goroutine获取了
            // 2. 锁处于被唤醒状态,这表明有等待goroutine被唤醒,不用再尝试唤醒其他goroutine
            // 3. 锁处于饥饿模式,那么锁之后会被直接交给等待队列队头goroutine
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }

            // 代码走到这,说明当前锁是空闲状态,等待队列中有waiter,且没有goroutine被唤醒
            
            // waiter - 1 然后设置唤醒状态 = 1
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {// 设置成功
                runtime_Semrelease(&m.sema, false, 1) // 唤醒一个信号量
                return
            }
            old = m.state // 对一下最新状态
        }
    } else {
     // 饥饿模式下,唤醒信号量等待队列的头部的sudog。
    // 饥饿状态过来的g都会放到信号量队列的尾部。
        runtime_Semrelease(&m.sema, true, 1)
    }
}

饥饿模式下做了个优化,会调用 readyWithTime 把队列头部的g放到pp.runnext里面。然后再调用goyield 把当前的g放到p runnable queue的尾部,然后调用 schedule 函数,这样就可以优先执行等待队列中的g了。

详情可以看这个CRsync: yield to the waiter when unlocking a starving mutex

六、RWMutex

type RWMutex struct {
   w           Mutex  // held if there are pending writers
   writerSem   uint32 // 写的信号量
   readerSem   uint32 // 读的信号量
   readerCount int32  // 等待写的个数
   readerWait  int32  // 等待读的个数
}


// 加“读锁”
// 对readerCount + 1 。
// 然后看 readerCount是不是小于0
// 小于0表示 正在加写锁,然后阻塞到rw.readerSem 这个信号上。
func (rw *RWMutex) RLock() {
   if atomic.AddInt32(&rw.readerCount, 1) < 0 {
      // A writer is pending, wait for it.
      runtime_SemacquireMutex(&rw.readerSem, false, 0)
   }
}


// 释放 “读锁”
// 对readerCount - 1 。
// 然后看 readerCount是不是小于0
// 小于0表示 正在加写锁,然后调用rw.rUnlockSlow
func (rw *RWMutex) RUnlock() {
   if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
      // Outlined slow-path to allow the fast-path to be inlined
      rw.rUnlockSlow(r)
   }
}


// r+1 == -rwmutexMaxReaders 表示“读锁”已经释放,抛出异常
// rw.readerWait - 1 
// rw.readerWait - 1 = 0 表示所有读锁都释放了
// 所有读锁都释放了可以唤醒 rw.writerSem 对应 写锁的lock方法继续执行
func (rw *RWMutex) rUnlockSlow(r int32) {
   if r+1 == 0 || r+1 == -rwmutexMaxReaders {
      race.Enable()
      throw("sync: RUnlock of unlocked RWMutex")
   }
   // A writer is pending.
   if atomic.AddInt32(&rw.readerWait, -1) == 0 {
      // The last reader unblocks the writer.
      runtime_Semrelease(&rw.writerSem, false, 1)
   }
}


// mutex 加锁,保证写锁和写锁之间互斥
// rw.readerCount - rwmutexMaxReaders
// r 表示读锁数量
// rw.readerWait + 读lock的数量 
// 等待 rw.writerSem 的信号 (读锁那边释放完了,会发这个信号)
func (rw *RWMutex) Lock() {
   // First, resolve competition with other writers.
   rw.w.Lock()
   // Announce to readers there is a pending writer.
   r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
   // Wait for active readers.
   if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
      runtime_SemacquireMutex(&rw.writerSem, false, 0)
   }
}


// rw.readerCount + rwmutexMaxReaders
// r 表示读锁的数量,大于 rwmutexMaxReaders 就抛出异常
// 发送 rw.readerSem  信号量,通知RLock 代码可以继续执行。
func (rw *RWMutex) Unlock() {
   // Announce to readers there is no active writer.
   r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
   if r >= rwmutexMaxReaders {
      race.Enable()
      throw("sync: Unlock of unlocked RWMutex")
   }
   // Unblock blocked readers, if any.
   for i := 0; i < int(r); i++ {
      runtime_Semrelease(&rw.readerSem, false, 0)
   }
}

七、Sync.Map

7.1 Sync.RWMutex 多核的伸缩性问题

早在2016的时候,@bcmills(这个哥们是Go项目主要维护者之一) 在GoGithub 上提出了一个sync: RWMutex scales poorly with CPU countIssue给大家讨论。简单说就是 Sync.RWMutex这个读写锁,多核情况下扩展性很差。他贴的 Benchmark 测试代码如下:

func BenchmarkRWMutex(b *testing.B) {
    for ng := 1; ng <= 256; ng <<= 2 { // ng 表示,开多少个 goroutine
        b.Run(fmt.Sprintf("name[%d]", ng), func(b *testing.B) {
            var mu sync.RWMutex
            mu.Lock()

            var wg sync.WaitGroup
            wg.Add(ng)

            n := b.N        // n 表示下面要执行多少次 RLock 和 RUnlock 
            quota := n / ng // quota 表示分摊到每个 goroutine 上需要执行多少次 Lock 和 RUnlock 

            for g := ng; g > 0; g-- {
                if g == 1 { //  n / ng 不是整除的话,剩下余出来的数据,在g=1 的时候全部减掉,不然下面 n 不会等于0
                    quota = n
                }

                go func(quota int) {
                    for i := 0; i < quota; i++ { // 一个循环执行一次 RLock 和 RUnlock
                        mu.RLock()
                        mu.RUnlock()
                    }
                    wg.Done()
                }(quota)

                n -= quota
            }

            if n != 0 {
                b.Fatalf("Incorrect quota assignments: %v remaining", n)
            }

            b.StartTimer() // 从这里开始计时
            mu.Unlock()    // 这里释放写锁,上面所有阻塞在 RLock 的 goroutine 同时唤醒去执行 RLock
            wg.Wait()      // 所有 goroutine 的 RLock 和 RUnlock 都执行完毕
            b.StopTimer()  // 从这里结束计时
        })
    }
}
    

Benchmark的结果可以看出,在多个Gorutine并发下,可以看到CPU核数越多,RWLock的性能越差。

# ./benchmarks.test -test.bench . -test.cpu 1,4,16,64
testing: warning: no tests to run
BenchmarkRWMutex/1      20000000                72.6 ns/op
BenchmarkRWMutex/1-4    20000000                72.4 ns/op
BenchmarkRWMutex/1-16   20000000                72.8 ns/op
BenchmarkRWMutex/1-64   20000000                72.5 ns/op
BenchmarkRWMutex/4      20000000                72.6 ns/op
BenchmarkRWMutex/4-4    20000000               105 ns/op
BenchmarkRWMutex/4-16   10000000               130 ns/op
BenchmarkRWMutex/4-64   20000000               160 ns/op
BenchmarkRWMutex/16     20000000                72.4 ns/op
BenchmarkRWMutex/16-4   10000000               125 ns/op
BenchmarkRWMutex/16-16  10000000               263 ns/op
BenchmarkRWMutex/16-64   5000000               287 ns/op
BenchmarkRWMutex/64     20000000                72.6 ns/op
BenchmarkRWMutex/64-4   10000000               137 ns/op
BenchmarkRWMutex/64-16   5000000               306 ns/op
BenchmarkRWMutex/64-64   3000000               517 ns/op
BenchmarkRWMutex/256                    20000000                72.4 ns/op
BenchmarkRWMutex/256-4                  20000000               137 ns/op
BenchmarkRWMutex/256-16                  5000000               280 ns/op
BenchmarkRWMutex/256-64                  3000000               602 ns/op
PASS

为什么多核下面会更慢?其实很简单,就是资源竞争会增加额外开销。RLockRUnlock,底层实现是atomic.AddInt32atomic.AddInt32对应的汇编代码如下:

// uint32 Xadd(uint32 volatile *val, int32 delta)
// Atomically:
//    *val += delta;
//    return *val;
TEXT ·Xadd(SB), NOSPLIT, $0-20
    MOVQ    ptr+0(FP), BX
    MOVL    delta+8(FP), AX
    MOVL    AX, CX
    LOCK
    XADDL    AX, 0(BX)
    ADDL    CX, AX
    MOVL    AX, ret+16(FP)
    RET

可以看到里面有 LOCK 前缀的指令,Lock其实就是CPU层面的一个锁,锁的单位是Cache Line 。多个核都要同时更新这个Cacheline,所以性能就有所下降。

7.2 如何去优化?

我们知道,在业务中遇到锁的性能瓶颈时候,我们一般会下面几个方面去考虑优化锁。

  1. 优化锁的粒度
  2. 读写分离
  3. 减少锁持有时间。
  4. 使用CAS

2、3、4 在这个读写锁场景都不试用(已经是读写锁了,且瓶颈也在CAScacheline的资源竞争),所以只能从锁的粒度方向考虑。

7.2.1 distributedrwmutex

@dvyukovGo小组成员之一) 提出了一个分布式读写锁的方案
核心原理就是,一个P对应一个读写锁,这样读锁在多核情况就没有竞争的问题了,因为每个核的读锁是独立的,互不影响(有点类似 ThreadLocal 的概念)。具体核心代码如下:

func (m *DistributedRWMutex) RUnlock() {
        l := m.getLocal()
        l.RUnlock()
}

func (m *DistributedRWMutex) getLocal() *sync.RWMutex {
        v := runtime.GetProcLocal(m.slot)
        p := (*sync.RWMutex)(unsafe.Pointer(uintptr(*v)))
        if p == nil {
                p = new(sync.RWMutex)
                atomic.AddUint64(v, uint64(uintptr(unsafe.Pointer(p))))
        }
        return p
}

不过这个实现方式也有一个问题需要注意。就是GoroutineP不是强绑定的。有可能你在某个P执行Lock以后,做了系统调用这个时候M、GP可能会解绑,系统调用完成回来的时候,可能绑定的是一个新的P了。这个时候再去调用getLocal可能拿到的已经是不一样的锁对象了,再去用这个锁对象去调用RUnlock是有问题的。一般这种需要在Goroutine里面直接拿到RWLock锁对象。类似下面这种:

// ...balabala...
go func() {
    rwx := rw.RLocker() // 这里拿到当前P对应的ReadLocker
    rwx.Lock()
    defer rwx.Unlock()
    // ... balabala...
    // syscall 这里切换P了也没影响

}()
// ...balabala...

还有一个 drwmutex 库也是这个思想,这里不过多赘述。

@bcmills 的回复说,老的RWMutex接口,是允许在不同的Goroutine或者P里面调用RLock / RUnlock,考虑兼容性问题,不太想做这样的改造。

7.2.2 Atomic.Value

还有更大的问题是当时(GO1.8)一些基础库中大量使用了RWMutex作为包级锁。比如reflecthttp.statusMujson.encoderCachemime.mimeLock

@dvyukov 指出这些场景其实可以用Atomic.Value去实现,类似场景有encoding/json/encode.go:cachedTypeFields

// cachedTypeFields is like typeFields but uses a cache to avoid repeated work.
func cachedTypeFields(t reflect.Type) []field {
    m, _ := fieldCache.value.Load().(map[reflect.Type][]field)
    f := m[t]
    if f != nil {
        return f
    }

    // Compute fields without lock.
    // Might duplicate effort but won't hold other computations back.
    f = typeFields(t)
    if f == nil {
        f = []field{}
    }

    fieldCache.mu.Lock()
    m, _ = fieldCache.value.Load().(map[reflect.Type][]field)
    newM := make(map[reflect.Type][]field, len(m)+1)
    for k, v := range m {
        newM[k] = v
    }
    newM[t] = f
    fieldCache.value.Store(newM)
    fieldCache.mu.Unlock()
    return f
}

PS:Atomic.Load转汇编其实就是简单的MOV指令,没有LOCK所以没有Cacheline资源竞争的问题。

mime: use atomic.Value to store mime types 这个CL也是尝试用atomic.Value去替代sync.RWMutex

这个实现,虽然读的时候没有资源竞争的问题。但是写的时候是O(n)的开销。这个方案对写太不友好。

7.2.3 基于二叉树实现 - dmap

@ianlancetaylor 基于二叉树实现了dmapdmap的插入时间复杂度是O(LogN)insert就是常规的写入操作,这里就不过多去赘述了。

// Insert inserts a key/value pair into a dmap.
func (d *dmap) insert(key, val interface{}) {
    var n *node
    for { // 判断根节点是不是为空。为空直接加锁然后写Root,否则就拿到根节点
        root, _ := d.root.Load().(*node)
        if root != nil {
            n = root
            break
        }
        root = &node{
            key: key,
            val: val,
        }
        d.mu.Lock()
        if d.root.Load() == nil {
            d.root.Store(root)
            d.mu.Unlock()
            return
        }
        d.mu.Unlock() // 走到这表示,有其他 goroutine 写了根节点,会循继续去 load 根节点
    }

    // 到这里,n 表示是 root 节点
    
    for {
        cmp := d.compare(key, n.key) // 判断两个 key是否相等
        if cmp == 0 {
            if val != n.val {
                panic("invalid double-insert")
            }
            return
        }
        p := &n.left 
        if cmp > 0 { // key 大于当前节点key。就找右节点
            p = &n.right
        }
        n2, _ := (*p).Load().(*node)
        if n2 != nil { // 当前节点不为空,继续重新走循环,比较key和 n.key 大小
            n = n2 
        } else { // 当前节点为空,尝试写入,写入失败,就继续重新走循环逻辑 
            n2 = &node{
                key: key,
                val: val,
            }
            n.mu.Lock()
            if (*p).Load() == nil {
                (*p).Store(n2)
                n.mu.Unlock()
                return
            }
            n.mu.Unlock()
        }
    }
}

查找的实现,有fastpathslowpath两个路径,fastpath用的是map来查找,命中的话就直接返回,时间复杂度是O(1)的,map中没查到的话,会去二叉树里面查,时间复杂度是O(LogN)。有个tricky的地方是,没有命中map但是在二叉树中查到这个key的话,会对这个keycount+1,如果这个keymiss count大于map的长度的话,会复制一下map然后把新的map回写到Atomic.Value里面。

// Lookup looks up a key in the distributed map.
func (d *dmap) lookup(key interface{}) interface{} {
    // Common values are cached in a map held in the root.
    m, _ := d.m.Load().(map[interface{}]interface{})
    if val, ok := m[key]; ok { // map里面找到了,直接返回
        return val
    }

    n, _ := d.root.Load().(*node)
    for n != nil {
        cmp := d.compare(key, n.key)
        if cmp == 0 {
            count := atomic.AddUint32(&n.count, 1)

            // Add this key/val pair to the map in the root,
            // but only if it's worth copying the existing map.
            if count < 0 || (count > 1 && int(count) > len(m)) {
                newm := make(map[interface{}]interface{}, len(m)+1)
                for k, v := range m {
                    newm[k] = v
                }
                newm[key] = n.val

                // It's possible that some other
                // goroutine has updated d.m since we
                // loaded it.  That means we did extra
                // work but it's otherwise OK.
                // 这里如果有多个 goroutine 写会导致有互相覆盖的问题
                d.m.Store(newm)
            }

            return n.val
        }

        p := &n.left
        if cmp > 0 {
            p = &n.right
        }
        n, _ = (*p).Load().(*node)
    }

    return nil
}

7.2.4 两个map

@bcmills 基于上面 @ianlancetaylor 的二叉树加map的思想优化了下。用map替代了二叉树。具体实现如下:

// Map is a key-value map from which entries can be read without external
// synchronization.
type Map struct {
    tenured        atomic.Value // 年老代 map
    liveNotTenured uint32 // 记录 miss count

    mu   sync.RWMutex // 对 live 读写的时候,需要用到这个读写锁
    live map[interface{}]interface{}
}

读的话,先去tenured里面去读,读tenured不用加锁,读写live用的是读写锁,然后根据misscountlive复制给tenured

func (b *Map) Load(key interface{}) (value interface{}, ok bool) {
    m, _ := b.tenured.Load().(map[interface{}]interface{})
    if value, ok = m[key]; ok {
        return value, true
    }

    b.mu.RLock()
    promote := false
    if b.live != nil {
        value, ok = b.live[key]
        lnt := atomic.AddUint32(&b.liveNotTenured, 1)
        if lnt >= 1<<31 || int(lnt) >= len(b.live) {
            promote = true
        }
    }
    b.mu.RUnlock()

    if !promote {
        return value, ok
    }

    b.mu.Lock()
    lnt := atomic.LoadUint32(&b.liveNotTenured)
    if b.live != nil && (lnt >= 1<<31 || int(lnt) >= len(b.live)) {
        b.tenured.Store(b.live)
        b.live = nil
        atomic.StoreUint32(&b.liveNotTenured, 0)
    }
    b.mu.Unlock()
    return value, ok
}

写的话,很简单,只写live

func (b *Map) StoreOrLoad(key, value interface{}) (actualValue interface{}, dup bool) {
    b.mu.Lock()
    if b.live == nil {
        m, _ := b.tenured.Load().(map[interface{}]interface{})
        b.live = make(map[interface{}]interface{}, len(m)+1)
        for k, v := range m {
            b.live[k] = v
        }
    }
    actualValue, dup = b.live[key]
    if !dup {
        b.live[key] = value
        actualValue = value
    }
    b.mu.Unlock()
    return actualValue, dup
}

7.3、Sync.Map 的最终实现

经过一轮讨论以后,@bcmills 单独发了一个提案:sync: add a Map to replace RWLock+map usage 最终决定不去修复RWLock的伸缩性问题,而是提供一个可伸缩并发安全的Map来做。 这个并发安全的Map实现方案就是用的上面双Map实现。然后这个并发安全的Map会先放在 x-Repositories 包中经过一段时间迭代,如果没问题了再收敛到Go源码包中。具体可以看 syncmap: add a synchronized map implementation

@bcmills 基于双map的demo,做了一些优化,新增了一些API,比如DeleteRange等。提交了一个正式的 CR

// A Map must not be copied after first use.
type Map struct {
    mu sync.Mutex

    // clean 是 fastpath 用的,读的时候不用加锁,没有cacheline竞争问题
    clean atomic.Value // map[interface{}]interface{}
    
    // dirty 读写都需要加锁
    dirty map[interface{}]interface{}
    
    // 如果clean没有查到,这个时候misses会加1
    // 当 misses >= len(dirty),会把dirty赋值给clean,然后情况dirty
    misses int
}

我们再来看下数据读取的实现,这个里面有几点需要注意,跟上面双map的demo 不同的事,这里的实现是cleandirty两个map只会有一个不为空。所以读的时候,如果clean不为空就直接读clean,并不会再去dirty读一次。如果dirty不为nil,读取以后还会调用一下m.missLocked,这个函数主要的作用是判断对m.misses1,然后判断要不要把dirty赋值给clean,然后清空dirty

// Load returns the value stored in the map for a key, or nil if no
// value is present.
// The ok result indicates whether value was found in the map.
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    clean, _ := m.clean.Load().(map[interface{}]interface{})
    if clean != nil {
        value, ok = clean[key]
        return value, ok
    }

    m.mu.Lock()
    if m.dirty == nil {
        clean, _ := m.clean.Load().(map[interface{}]interface{})
        if clean == nil {
            // Completely empty — promote to clean immediately.
            m.clean.Store(map[interface{}]interface{}{})
        } else {
            value, ok = clean[key]
        }
        m.mu.Unlock()
        return value, ok
    }
    value, ok = m.dirty[key]
    m.missLocked()
    m.mu.Unlock()
    return value, ok
}

func (m *Map) missLocked() {
    if m.misses++; m.misses >= len(m.dirty) {
        m.clean.Store(m.dirty)
        m.dirty = nil
    }
}

Store的函数就比较简单了。如果写入的时候,直接加锁,然后判断dirty是否为空,如果是空,需要把clean数据复制一份给dirty然后清空clean,然后再把数据赋值给dirty

// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
    m.mu.Lock()
    m.dirtyLocked()
    m.dirty[key] = value
    m.mu.Unlock()
}

// dirtyLocked prepares the map for a subsequent write.
// It ensures that the dirty field is non-nil and clean is nil by making a deep
// copy of clean.
func (m *Map) dirtyLocked() {
    m.misses = 0
    if m.dirty != nil {
        return
    }

    clean, _ := m.clean.Load().(map[interface{}]interface{})
    m.dirty = make(map[interface{}]interface{}, len(clean))
    for k, v := range clean {
        m.dirty[k] = v
    }
    m.clean.Store(map[interface{}]interface{}(nil))
}

这个实现其实有个很大的问题,就是如果有频繁读写交替的话,会导致数据一直在cleandirty两个map中来回copy,如果map很大的话,这个性能会很差,还会阻塞其他线程的读写,但是这个CR当时的场景是期望提供给Runtime包中一些读多写少的场景使用,所以看benchmark跑的性能还是有很大的提升的。

代码合入的时候 @rsc 提了两点优化建议

  1. 如果允许clean != nil and dirty != nil会更好。
  2. 如果一个key没有被覆盖或者删除的话,它命中了lock-free path后续理论上应该一直命中lock-free path会更好一些。

7.4 进一步优化

过了几个月,基于 @rsc 之前合码的时候给的建议,@bcmills 又优化了一版,整个sync.Map的结构变成了下面这样:

type Map struct {
    mu sync.Mutex
    read atomic.Value // readOnly
    dirty map[interface{}]*entry    
    misses int
}

type readOnly struct { // readOnly 的 map
    m       map[interface{}]*entry
    amended bool // amended=true m没有全部key的数据,没查到还需要去dirty查下.
}

var expunged = unsafe.Pointer(new(interface{})) // 表示这个数据已经不在dirty中了。

type entry struct {
    p unsafe.Pointer // *interface{}
}

主要改动只读的map,之前叫clean类型是map[interface{}]interface{},现在改成了read,类型是readOnly structreadOnly还有个amended表示当前readOnly.m是不是全量数据。我们继续往下Store的代码

func (m *Map) Store(key, value interface{}) {
    // fast-path
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok && e.tryStore(&value) { 
       // 如果这个 key 在 read 里面找到了以后,尝试直接调用 tryStore 去更新 value 数据
       // tryStore 里面会做两件事
       // 1. 判断当前 entry.p 是不是等于 expunged,等于 expunged 就不能更新,直接返回false。下面会走 slow-path去更新
       // 2. 如果不是 expunged ,那就尝试更新 entry.p = &value,如果 CAS 设置成功了就返回。
       // 如果是 expunged 状态,表面 dirty 里面已经没有这个 key了,如 read 里面更新这个东西,下次 dirty数据全量提升为 read 的时候,这个数据就会丢失。
        return 
    }

    // 下面是 slow-path
    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        if e.unexpungeLocked() { // e.unexpungeLocked 尝试CAS(&e.p, expunged, nil)
            m.dirty[key] = e // 把 e 赋值给 dirty
        }
        // 到这里 e.p 肯定不是等于 expunged 了
        e.storeLocked(&value) // 设置 e.p = &value
    } else if e, ok := m.dirty[key]; ok {
        e.storeLocked(&value) // 如果只在dirty里面有,直接设置 e.p = &value
    } else {
        if !read.amended { // 如果目前 read 有全量数据,但是 read 和 dirty 都没有这个 key
            m.dirtyLocked() // dirtyLocked 这个函数主要做的就是,把 read 里面的 e.p != nil && e.p != expunged 的元素 copy 一份赋值给 dirty
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        m.dirty[key] = newEntry(value) // dirty 保存这个 kv
    }
    m.mu.Unlock()
}

总结下Store主要做了下几件事:

  1. fast-path 路径
    • 看下 read 中是否有这个key,有的话尝试调用tryStore,把设置的value保存到entry对象中去。
    • tryStore 里面会判断entry.p是不是expunged状态,是的话就不能设置,需要走slow-path
    • 如果不是的话保存成功就直接返回。
  2. slow-path路径
    • 会先加互斥锁
    • 看下 read 中是否有这个key,有的话尝试调用unexpungeLockedCAS方式清除entry.pexpunged状态。如果清楚成功,会在dirty里面添加这个数据。如果没有清楚成功,说明状态不是expunged,可以直接更新readentry.p=&value就行了。
    • 不在read里面,在dirty里面,直接设置entry.p=&value就行了。
    • readdirty都没有找到这个key,先看下read是不是有全量数据,是的话,就调用m.dirtyLocked,把read数据copy一份到dirty,并设置read.amended=true,表示 read里面已经没有全量数据了,需要去drity里面找。
    • 最后设置 m.dirty[key] = newEntry(value),dirty 保存这个 kv

在来看下Load相关代码:

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended { // 如果 read 没找到,且 read 没有全量数据
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key] // 加锁以后,这里需要 double check一下
        if !ok && read.amended {
            e, ok = m.dirty[key] // 去 dirty map 读
            m.missLocked() // 这里面对 misscount+1 ,然后看下是否需要把 dirty 全部给 read,然后设置 dirty 为 nil。
        }
        m.mu.Unlock()
    }
    if !ok {
        return nil, false
    }
    return e.load() // 如果 e.p != nil && e.p == expunged , 就把 e.p 的指向的值转成Interface返回
}

最后再来看下Delete的怎么做的,Delete其实比较简单,就是设置,在read里面找到这个entry然后设置e.p=nil,如果在dirty中就直接调用delete方法删除这个key

func (m *Map) Delete(key interface{}) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            delete(m.dirty, key)
        }
        m.mu.Unlock()
    }
    if ok {
        e.delete()
    }
}

func (e *entry) delete() (hadValue bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == nil || p == expunged {
            return false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return true
        }
    }
}

7.5 思考:dirty 能否不全量拷贝 read?

正常思路,为了节省内存,dirty 里面只存增量数据,可以吗?反向推理下如果dirty只存增量的数据,那就不需要readdirty的数据同步操作了,那也不需要expunged状态了。所以read的中元素e.p=nil的时候,表示删除了,由于没有了readdirty的复制,所以需要定期滤掉read中删除的数据(e.p = nil)并重新给read赋值,那Store的时候,如果reade.p=nil的话就不能再更新了。因为定期过滤掉read中删除的数据可能会把这个entry给删除掉,导致这个key对应的数据丢失了。所以StoreLoad伪代码如下:

func (m *Map) Store(key, value interface{}) {
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok && e.p != nil {
        ok:= CAS(e.p, old, &value) // 注意这里要是 old = nil 时候不能再继续尝试 CAS
        if ok{
            return
        }
        // cas 失败继续往下走
    }

    m.mu.Lock() // 加锁
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        if atomic.Load(e.p) != nil{
           atomic.Store(e.p,&value)
            return
        }

    } else if e, ok := m.dirty[key]; ok {
        e.storeLocked(&value)
    } 

    // read 查到了 e 但是 e.p == nil
    // read 和 dirty 都没查到
    m.dirty[key] = newEntry(value)
    noNilMap := fliterNilIFNeed(read.m) // 过滤掉read.m 中为空的数据,如果没有空数据直接返回nil
    m.read.Store(readOnly{m: noNilMap, amended: true})

    m.mu.Unlock()
}


func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended || (ok && atomic.Load(e.p) == nil){
        m.mu.Lock()

        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended || (ok && atomic.Load(e.p) == nil){
            e, ok = m.dirty[key]

            m.misses++
            if m.misses >= len(m.dirty) {
                noNilMap := fliterNilIFNeed(read.m) // 过滤掉read.m 中为空的数据,如果没有空数据直接返回nil
                allDataMap := merge(noNilMap,m.dirty)
                m.read.Store(readOnly{m: allDataMap})
                m.dirty = nil
                m.misses = 0
            }
        }
        m.mu.Unlock()
    }
    if !ok {
        return nil, false
    }
    return e.load()
}

这样实现逻辑上好像也没有问题,不过每次LoadStore一个read中的nil,都需要加锁,然后会过滤readnil数据,都有数据的拷贝操作。如果在删除以后立即读的场景性能可能会非常差。

总结:dirty 全量拷贝 read 数据,就是好一个空间换时间的操作。