一、背景

前段时间有个朋友来问我GoSync.Map性能怎么样,一般什么场景推荐使用。一句话介绍的话,就是Sync.Map底层有两个map,一个是read,一个是dirty,读写read中数据不需要加锁,读写dirty不用需要加锁,适用于读多写少的场景。

碎碎念

其实2020年的时候Go源码里面一些比较常用的包都大致看了一遍,当时跟槊槊大飞哥周老板空闲时间天天讨论各种技术细节,包括但不仅限于操作系统MySQLRedis分布式Go项目架构方法论等。很多时候观点不合还会争的面红耳赤,最后还会上升到人生攻击,你不服我,我也不服你(实际上互有对错,我也被打过几次脸)。因为有的东西,网上有很多错误的资料,导致我养成了一个习惯,找资料的时候我一般都是去看一些权威的技术书或者直接去看开源组件源码,能用代码说的话绝不跟你多BBtalk is cheap, show you the code),用这些东西去反驳别人的观点。虽然有过很多次争吵,但是我们所有人的感觉都是一样,大家都觉得个人的技术能力技术眼界都有了质的提升,精神上也有很大的满足感。现在三个老板,都去了其他大厂,也拿到了自己期望的PKG了,也在各自忙着自己的工作了,技术交流的世界也少了。

关于这些成长经历,就想说三点,一个是费曼教学法,你学会的东西并不代表你一定懂了,你教懂别人才能表示你彻底懂了,因为别人会站他思考的角度来思考一些你可能没考虑到的细节,用这些细节来反问你。当你把这些细节都了解通,并能解答被人的全部问题了,你才是真的懂了。二是,持续跟身边优秀的人去交流沟通,你一定会有很大的成长。如果你觉得自己足够优秀了,那你去开源社区逛逛,那你一定会发现你远远没你想的那么优秀。三是,选择大于努力,有时候你总会觉得有的人能力不如你,确混的比你好。一是可能你没看到别人在其他方面的努力,还有一个就是别人路选择的比你好。每次做选择的时候都要想清楚自己的tradeoff是什么,选择以后就接受现实,别再纠结为什么要这样选,记得有个本书叫《高效人士的七个习惯》,其他的都记不清楚了,就记得一个观点,一个人有自己的关注圈影响圈,我们应该花更多的精力去做好自己能改变的影响圈,比如提高个人能力,减少去关注一些自己不能改变的东西比如经济形势就业形势个人工资等。

人的精力是有限的,当你过度聚焦一件事,其他的事情可能很容易被忽略,比如聚焦管理技巧、沟通技巧,那你的技术成长速度就跟不上其他人。我个人更多的精力是关注技术技术本身能带来的业务价值,所以我对开毫无意义的会权利的游戏无意义的social都不感兴趣,但是我喜欢跟有技术追求的人social(交流技术),在字节这几年的确也碰到了不优秀的同事和老板。我给自己的定位是工作上成为业务专家,之前的飞书老板就是很好的一个业务专家,整个飞书IM架构基本上都是他设计的。很多时候做方案的时候,只有足够的Context,他大多时候给你选择一个最合适的方案。工作之外脱离业务,希望自己,能成为某个方面的技术专家,以后能持续做开源相关的事情。

目的

后面可能会系统性对Go源码做总结,产出blog,方便以后自己快速查阅。

还想对Go的内存管理和GC也做个系统总结。今年估计大概率弄不完,一个是工作太忙,周末休息还要陪娃。最近还打算看下Rust(作为一个Geeker,这么多大佬都在推崇这个,不了学习下说不过去)。

二、Sync.Map 产生的背景

2.1 Sync.RWMutex 多核的伸缩性问题

早在2016的时候,@bcmills(这个哥们是Go项目主要维护者之一) 在GoGithub 上提出了一个sync: RWMutex scales poorly with CPU countIssue给大家讨论。简单说就是 Sync.RWMutex这个读写锁,多核情况下扩展性很差。他贴的 Benchmark 测试代码如下:

func BenchmarkRWMutex(b *testing.B) {
    for ng := 1; ng <= 256; ng <<= 2 { // ng 表示,开多少个 goroutine
        b.Run(fmt.Sprintf("name[%d]", ng), func(b *testing.B) {
            var mu sync.RWMutex
            mu.Lock()

            var wg sync.WaitGroup
            wg.Add(ng)

            n := b.N        // n 表示下面要执行多少次 RLock 和 RUnlock 
            quota := n / ng // quota 表示分摊到每个 goroutine 上需要执行多少次 Lock 和 RUnlock 

            for g := ng; g > 0; g-- {
                if g == 1 { //  n / ng 不是整除的话,剩下余出来的数据,在g=1 的时候全部减掉,不然下面 n 不会等于0
                    quota = n
                }

                go func(quota int) {
                    for i := 0; i < quota; i++ { // 一个循环执行一次 RLock 和 RUnlock
                        mu.RLock()
                        mu.RUnlock()
                    }
                    wg.Done()
                }(quota)

                n -= quota
            }

            if n != 0 {
                b.Fatalf("Incorrect quota assignments: %v remaining", n)
            }

            b.StartTimer() // 从这里开始计时
            mu.Unlock()    // 这里释放写锁,上面所有阻塞在 RLock 的 goroutine 同时唤醒去执行 RLock
            wg.Wait()      // 所有 goroutine 的 RLock 和 RUnlock 都执行完毕
            b.StopTimer()  // 从这里结束计时
        })
    }
}
    

Benchmark的结果可以看出,在多个Gorutine并发下,可以看到CPU核数越多,RWLock的性能越差。

# ./benchmarks.test -test.bench . -test.cpu 1,4,16,64
testing: warning: no tests to run
BenchmarkRWMutex/1      20000000                72.6 ns/op
BenchmarkRWMutex/1-4    20000000                72.4 ns/op
BenchmarkRWMutex/1-16   20000000                72.8 ns/op
BenchmarkRWMutex/1-64   20000000                72.5 ns/op
BenchmarkRWMutex/4      20000000                72.6 ns/op
BenchmarkRWMutex/4-4    20000000               105 ns/op
BenchmarkRWMutex/4-16   10000000               130 ns/op
BenchmarkRWMutex/4-64   20000000               160 ns/op
BenchmarkRWMutex/16     20000000                72.4 ns/op
BenchmarkRWMutex/16-4   10000000               125 ns/op
BenchmarkRWMutex/16-16  10000000               263 ns/op
BenchmarkRWMutex/16-64   5000000               287 ns/op
BenchmarkRWMutex/64     20000000                72.6 ns/op
BenchmarkRWMutex/64-4   10000000               137 ns/op
BenchmarkRWMutex/64-16   5000000               306 ns/op
BenchmarkRWMutex/64-64   3000000               517 ns/op
BenchmarkRWMutex/256                    20000000                72.4 ns/op
BenchmarkRWMutex/256-4                  20000000               137 ns/op
BenchmarkRWMutex/256-16                  5000000               280 ns/op
BenchmarkRWMutex/256-64                  3000000               602 ns/op
PASS

为什么多核下面会更慢?其实很简单,就是资源竞争会增加额外开销。RLockRUnlock,底层实现是atomic.AddInt32atomic.AddInt32对应的汇编代码如下:

// uint32 Xadd(uint32 volatile *val, int32 delta)
// Atomically:
//    *val += delta;
//    return *val;
TEXT ·Xadd(SB), NOSPLIT, $0-20
    MOVQ    ptr+0(FP), BX
    MOVL    delta+8(FP), AX
    MOVL    AX, CX
    LOCK
    XADDL    AX, 0(BX)
    ADDL    CX, AX
    MOVL    AX, ret+16(FP)
    RET

可以看到里面有 LOCK 前缀的指令,Lock其实就是CPU层面的一个锁,锁的单位是Cache Line 具体可以参考 Golang Memory Model 里面的详细介绍。多个核都要同时更新这个Cacheline,所以性能就有所下降。

2.2 如何去优化?

我们知道,在业务中遇到锁的性能瓶颈时候,我们一般会下面几个方面去考虑优化锁。

  1. 优化锁的粒度
  2. 读写分离
  3. 减少锁持有时间。
  4. 使用CAS

2、3、4 在这个读写锁场景都不试用(已经是读写锁了,且瓶颈也在CAScacheline的资源竞争),所以只能从锁的粒度方向考虑。

2.2.1 distributedrwmutex

@dvyukovGo小组成员之一) 提出了一个分布式读写锁的方案
核心原理就是,一个P对应一个读写锁,这样读锁在多核情况就没有竞争的问题了,因为每个核的读锁是独立的,互不影响(有点类似 ThreadLocal 的概念)。具体核心代码如下:

func (m *DistributedRWMutex) RUnlock() {
        l := m.getLocal()
        l.RUnlock()
}

func (m *DistributedRWMutex) getLocal() *sync.RWMutex {
        v := runtime.GetProcLocal(m.slot)
        p := (*sync.RWMutex)(unsafe.Pointer(uintptr(*v)))
        if p == nil {
                p = new(sync.RWMutex)
                atomic.AddUint64(v, uint64(uintptr(unsafe.Pointer(p))))
        }
        return p
}

不过这个实现方式也有一个问题需要注意。就是GoroutineP不是强绑定的。有可能你在某个P执行Lock以后,做了系统调用这个时候M、GP可能会解绑,系统调用完成回来的时候,可能绑定的是一个新的P了。这个时候再去调用getLocal可能拿到的已经是不一样的锁对象了,再去用这个锁对象去调用RUnlock是有问题的。一般这种需要在Goroutine里面直接拿到RWLock锁对象。类似下面这种:

// ...balabala...
go func() {
    rwx := rw.RLocker() // 这里拿到当前P对应的ReadLocker
    rwx.Lock()
    defer rwx.Unlock()
    // ... balabala...
    // syscall 这里切换P了也没影响

}()
// ...balabala...

还有一个 drwmutex 库也是这个思想,这里不过多赘述。

@bcmills 的回复说,老的RWMutex接口,是允许在不同的Goroutine或者P里面调用RLock / RUnlock,考虑兼容性问题,不太想做这样的改造。

2.2.2 Atomic.Value

还有更大的问题是当时(GO1.8)一些基础库中大量使用了RWMutex作为包级锁。比如reflecthttp.statusMujson.encoderCachemime.mimeLock

@dvyukov 指出这些场景其实可以用Atomic.Value去实现,类似场景有encoding/json/encode.go:cachedTypeFields

// cachedTypeFields is like typeFields but uses a cache to avoid repeated work.
func cachedTypeFields(t reflect.Type) []field {
    m, _ := fieldCache.value.Load().(map[reflect.Type][]field)
    f := m[t]
    if f != nil {
        return f
    }

    // Compute fields without lock.
    // Might duplicate effort but won't hold other computations back.
    f = typeFields(t)
    if f == nil {
        f = []field{}
    }

    fieldCache.mu.Lock()
    m, _ = fieldCache.value.Load().(map[reflect.Type][]field)
    newM := make(map[reflect.Type][]field, len(m)+1)
    for k, v := range m {
        newM[k] = v
    }
    newM[t] = f
    fieldCache.value.Store(newM)
    fieldCache.mu.Unlock()
    return f
}

PS:Atomic.Load转汇编其实就是简单的MOV指令,没有LOCK所以没有Cacheline资源竞争的问题。

mime: use atomic.Value to store mime types 这个CL也是尝试用atomic.Value去替代sync.RWMutex

这个实现,虽然读的时候没有资源竞争的问题。但是写的时候是O(n)的开销。这个方案对写太不友好。

2.2.3 基于二叉树实现 - dmap

@ianlancetaylor 基于二叉树实现了dmapdmap的插入时间复杂度是O(LogN)insert就是常规的写入操作,这里就不过多去赘述了。

// Insert inserts a key/value pair into a dmap.
func (d *dmap) insert(key, val interface{}) {
    var n *node
    for { // 判断根节点是不是为空。为空直接加锁然后写Root,否则就拿到根节点
        root, _ := d.root.Load().(*node)
        if root != nil {
            n = root
            break
        }
        root = &node{
            key: key,
            val: val,
        }
        d.mu.Lock()
        if d.root.Load() == nil {
            d.root.Store(root)
            d.mu.Unlock()
            return
        }
        d.mu.Unlock() // 走到这表示,有其他 goroutine 写了根节点,会循继续去 load 根节点
    }

    // 到这里,n 表示是 root 节点
    
    for {
        cmp := d.compare(key, n.key) // 判断两个 key是否相等
        if cmp == 0 {
            if val != n.val {
                panic("invalid double-insert")
            }
            return
        }
        p := &n.left 
        if cmp > 0 { // key 大于当前节点key。就找右节点
            p = &n.right
        }
        n2, _ := (*p).Load().(*node)
        if n2 != nil { // 当前节点不为空,继续重新走循环,比较key和 n.key 大小
            n = n2 
        } else { // 当前节点为空,尝试写入,写入失败,就继续重新走循环逻辑 
            n2 = &node{
                key: key,
                val: val,
            }
            n.mu.Lock()
            if (*p).Load() == nil {
                (*p).Store(n2)
                n.mu.Unlock()
                return
            }
            n.mu.Unlock()
        }
    }
}

查找的实现,有fastpathslowpath两个路径,fastpath用的是map来查找,命中的话就直接返回,时间复杂度是O(1)的,map中没查到的话,会去二叉树里面查,时间复杂度是O(LogN)。有个tricky的地方是,没有命中map但是在二叉树中查到这个key的话,会对这个keycount+1,如果这个keymiss count大于map的长度的话,会复制一下map然后把新的map回写到Atomic.Value里面。

// Lookup looks up a key in the distributed map.
func (d *dmap) lookup(key interface{}) interface{} {
    // Common values are cached in a map held in the root.
    m, _ := d.m.Load().(map[interface{}]interface{})
    if val, ok := m[key]; ok { // map里面找到了,直接返回
        return val
    }

    n, _ := d.root.Load().(*node)
    for n != nil {
        cmp := d.compare(key, n.key)
        if cmp == 0 {
            count := atomic.AddUint32(&n.count, 1)

            // Add this key/val pair to the map in the root,
            // but only if it's worth copying the existing map.
            if count < 0 || (count > 1 && int(count) > len(m)) {
                newm := make(map[interface{}]interface{}, len(m)+1)
                for k, v := range m {
                    newm[k] = v
                }
                newm[key] = n.val

                // It's possible that some other
                // goroutine has updated d.m since we
                // loaded it.  That means we did extra
                // work but it's otherwise OK.
                // 这里如果有多个 goroutine 写会导致有互相覆盖的问题
                d.m.Store(newm)
            }

            return n.val
        }

        p := &n.left
        if cmp > 0 {
            p = &n.right
        }
        n, _ = (*p).Load().(*node)
    }

    return nil
}

2.2.4 两个map

@bcmills 基于上面 @ianlancetaylor 的二叉树加map的思想优化了下。用map替代了二叉树。具体实现如下:

// Map is a key-value map from which entries can be read without external
// synchronization.
type Map struct {
    tenured        atomic.Value // 年老代 map
    liveNotTenured uint32 // 记录 miss count

    mu   sync.RWMutex // 对 live 读写的时候,需要用到这个读写锁
    live map[interface{}]interface{}
}

读的话,先去tenured里面去读,读tenured不用加锁,读写live用的是读写锁,然后根据misscountlive复制给tenured

func (b *Map) Load(key interface{}) (value interface{}, ok bool) {
    m, _ := b.tenured.Load().(map[interface{}]interface{})
    if value, ok = m[key]; ok {
        return value, true
    }

    b.mu.RLock()
    promote := false
    if b.live != nil {
        value, ok = b.live[key]
        lnt := atomic.AddUint32(&b.liveNotTenured, 1)
        if lnt >= 1<<31 || int(lnt) >= len(b.live) {
            promote = true
        }
    }
    b.mu.RUnlock()

    if !promote {
        return value, ok
    }

    b.mu.Lock()
    lnt := atomic.LoadUint32(&b.liveNotTenured)
    if b.live != nil && (lnt >= 1<<31 || int(lnt) >= len(b.live)) {
        b.tenured.Store(b.live)
        b.live = nil
        atomic.StoreUint32(&b.liveNotTenured, 0)
    }
    b.mu.Unlock()
    return value, ok
}

写的话,很简单,只写live

func (b *Map) StoreOrLoad(key, value interface{}) (actualValue interface{}, dup bool) {
    b.mu.Lock()
    if b.live == nil {
        m, _ := b.tenured.Load().(map[interface{}]interface{})
        b.live = make(map[interface{}]interface{}, len(m)+1)
        for k, v := range m {
            b.live[k] = v
        }
    }
    actualValue, dup = b.live[key]
    if !dup {
        b.live[key] = value
        actualValue = value
    }
    b.mu.Unlock()
    return actualValue, dup
}

三、Sync.Map 的最终实现

经过一轮讨论以后,@bcmills 单独发了一个提案:sync: add a Map to replace RWLock+map usage 最终决定不去修复RWLock的伸缩性问题,而是提供一个可伸缩并发安全的Map来做。 这个并发安全的Map实现方案就是用的上面双Map实现。然后这个并发安全的Map会先放在 x-Repositories 包中经过一段时间迭代,如果没问题了再收敛到Go源码包中。具体可以看 syncmap: add a synchronized map implementation

@bcmills 基于双map的demo,做了一些优化,新增了一些API,比如DeleteRange等。提交了一个正式的 CR

// A Map must not be copied after first use.
type Map struct {
    mu sync.Mutex

    // clean 是 fastpath 用的,读的时候不用加锁,没有cacheline竞争问题
    clean atomic.Value // map[interface{}]interface{}
    
    // dirty 读写都需要加锁
    dirty map[interface{}]interface{}
    
    // 如果clean没有查到,这个时候misses会加1
    // 当 misses >= len(dirty),会把dirty赋值给clean,然后情况dirty
    misses int
}

我们再来看下数据读取的实现,这个里面有几点需要注意,跟上面双map的demo 不同的事,这里的实现是cleandirty两个map只会有一个不为空。所以读的时候,如果clean不为空就直接读clean,并不会再去dirty读一次。如果dirty不为nil,读取以后还会调用一下m.missLocked,这个函数主要的作用是判断对m.misses1,然后判断要不要把dirty赋值给clean,然后清空dirty

// Load returns the value stored in the map for a key, or nil if no
// value is present.
// The ok result indicates whether value was found in the map.
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    clean, _ := m.clean.Load().(map[interface{}]interface{})
    if clean != nil {
        value, ok = clean[key]
        return value, ok
    }

    m.mu.Lock()
    if m.dirty == nil {
        clean, _ := m.clean.Load().(map[interface{}]interface{})
        if clean == nil {
            // Completely empty — promote to clean immediately.
            m.clean.Store(map[interface{}]interface{}{})
        } else {
            value, ok = clean[key]
        }
        m.mu.Unlock()
        return value, ok
    }
    value, ok = m.dirty[key]
    m.missLocked()
    m.mu.Unlock()
    return value, ok
}

func (m *Map) missLocked() {
    if m.misses++; m.misses >= len(m.dirty) {
        m.clean.Store(m.dirty)
        m.dirty = nil
    }
}

Store的函数就比较简单了。如果写入的时候,直接加锁,然后判断dirty是否为空,如果是空,需要把clean数据复制一份给dirty然后清空clean,然后再把数据赋值给dirty

// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
    m.mu.Lock()
    m.dirtyLocked()
    m.dirty[key] = value
    m.mu.Unlock()
}

// dirtyLocked prepares the map for a subsequent write.
// It ensures that the dirty field is non-nil and clean is nil by making a deep
// copy of clean.
func (m *Map) dirtyLocked() {
    m.misses = 0
    if m.dirty != nil {
        return
    }

    clean, _ := m.clean.Load().(map[interface{}]interface{})
    m.dirty = make(map[interface{}]interface{}, len(clean))
    for k, v := range clean {
        m.dirty[k] = v
    }
    m.clean.Store(map[interface{}]interface{}(nil))
}

这个实现其实有个很大的问题,就是如果有频繁读写交替的话,会导致数据一直在cleandirty两个map中来回copy,如果map很大的话,这个性能会很差,还会阻塞其他线程的读写,但是这个CR当时的场景是期望提供给Runtime包中一些读多写少的场景使用,所以看benchmark跑的性能还是有很大的提升的。

代码合入的时候 @rsc 提了两点优化建议

  1. 如果允许clean != nil and dirty != nil会更好。
  2. 如果一个key没有被覆盖或者删除的话,它命中了lock-free path后续理论上应该一直命中lock-free path会更好一些。

3.1 进一步优化

过了几个月,基于 @rsc 之前合码的时候给的建议,@bcmills 又优化了一版,整个sync.Map的结构变成了下面这样:

type Map struct {
    mu sync.Mutex
    read atomic.Value // readOnly
    dirty map[interface{}]*entry    
    misses int
}

type readOnly struct { // readOnly 的 map
    m       map[interface{}]*entry
    amended bool // amended=true m没有全部key的数据,没查到还需要去dirty查下.
}

var expunged = unsafe.Pointer(new(interface{})) // 表示这个数据已经不在dirty中了。

type entry struct {
    p unsafe.Pointer // *interface{}
}

主要改动只读的map,之前叫clean类型是map[interface{}]interface{},现在改成了read,类型是readOnly structreadOnly还有个amended表示当前readOnly.m是不是全量数据。我们继续往下Store的代码

func (m *Map) Store(key, value interface{}) {
    // fast-path
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok && e.tryStore(&value) { 
       // 如果这个 key 在 read 里面找到了以后,尝试直接调用 tryStore 去更新 value 数据
       // tryStore 里面会做两件事
       // 1. 判断当前 entry.p 是不是等于 expunged,等于 expunged 就不能更新,直接返回false。下面会走 slow-path去更新
       // 2. 如果不是 expunged ,那就尝试更新 entry.p = &value,如果 CAS 设置成功了就返回。
       // 如果是 expunged 状态,表面 dirty 里面已经没有这个 key了,如 read 里面更新这个东西,下次 dirty数据全量提升为 read 的时候,这个数据就会丢失。
        return 
    }

    // 下面是 slow-path
    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        if e.unexpungeLocked() { // e.unexpungeLocked 尝试CAS(&e.p, expunged, nil)
            m.dirty[key] = e // 把 e 赋值给 dirty
        }
        // 到这里 e.p 肯定不是等于 expunged 了
        e.storeLocked(&value) // 设置 e.p = &value
    } else if e, ok := m.dirty[key]; ok {
        e.storeLocked(&value) // 如果只在dirty里面有,直接设置 e.p = &value
    } else {
        if !read.amended { // 如果目前 read 有全量数据,但是 read 和 dirty 都没有这个 key
            m.dirtyLocked() // dirtyLocked 这个函数主要做的就是,把 read 里面的 e.p != nil && e.p != expunged 的元素 copy 一份赋值给 dirty
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        m.dirty[key] = newEntry(value) // dirty 保存这个 kv
    }
    m.mu.Unlock()
}

总结下Store主要做了下几件事:

  1. fast-path 路径
    • 看下 read 中是否有这个key,有的话尝试调用tryStore,把设置的value保存到entry对象中去。
    • tryStore 里面会判断entry.p是不是expunged状态,是的话就不能设置,需要走slow-path
    • 如果不是的话保存成功就直接返回。
  2. slow-path路径
    • 会先加互斥锁
    • 看下 read 中是否有这个key,有的话尝试调用unexpungeLockedCAS方式清除entry.pexpunged状态。如果清楚成功,会在dirty里面添加这个数据。如果没有清楚成功,说明状态不是expunged,可以直接更新readentry.p=&value就行了。
    • 不在read里面,在dirty里面,直接设置entry.p=&value就行了。
    • readdirty都没有找到这个key,先看下read是不是有全量数据,是的话,就调用m.dirtyLocked,把read数据copy一份到dirty,并设置read.amended=true,表示 read里面已经没有全量数据了,需要去drity里面找。
    • 最后设置 m.dirty[key] = newEntry(value),dirty 保存这个 kv

在来看下Load相关代码:

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended { // 如果 read 没找到,且 read 没有全量数据
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key] // 加锁以后,这里需要 double check一下
        if !ok && read.amended {
            e, ok = m.dirty[key] // 去 dirty map 读
            m.missLocked() // 这里面对 misscount+1 ,然后看下是否需要把 dirty 全部给 read,然后设置 dirty 为 nil。
        }
        m.mu.Unlock()
    }
    if !ok {
        return nil, false
    }
    return e.load() // 如果 e.p != nil && e.p == expunged , 就把 e.p 的指向的值转成Interface返回
}

最后再来看下Delete的怎么做的,Delete其实比较简单,就是设置,在read里面找到这个entry然后设置e.p=nil,如果在dirty中就直接调用delete方法删除这个key

func (m *Map) Delete(key interface{}) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            delete(m.dirty, key)
        }
        m.mu.Unlock()
    }
    if ok {
        e.delete()
    }
}

func (e *entry) delete() (hadValue bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == nil || p == expunged {
            return false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return true
        }
    }
}

3.2 思考:dirty 能否不全量拷贝 read?

正常思路,为了节省内存,dirty 里面只存增量数据,可以吗?反向推理下如果dirty只存增量的数据,那就不需要readdirty的数据同步操作了,那也不需要expunged状态了。所以read的中元素e.p=nil的时候,表示删除了,由于没有了readdirty的复制,所以需要定期滤掉read中删除的数据(e.p = nil)并重新给read赋值,那Store的时候,如果reade.p=nil的话就不能再更新了。因为定期过滤掉read中删除的数据可能会把这个entry给删除掉,导致这个key对应的数据丢失了。所以StoreLoad伪代码如下:

func (m *Map) Store(key, value interface{}) {
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok && e.p != nil {
        ok:= CAS(e.p, old, &value) // 注意这里要是 old = nil 时候不能再继续尝试 CAS
        if ok{
            return
        }
        // cas 失败继续往下走
    }

    m.mu.Lock() // 加锁
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        if atomic.Load(e.p) != nil{
           atomic.Store(e.p,&value)
            return
        }

    } else if e, ok := m.dirty[key]; ok {
        e.storeLocked(&value)
    } 

    // read 查到了 e 但是 e.p == nil
    // read 和 dirty 都没查到
    m.dirty[key] = newEntry(value)
    noNilMap := fliterNilIFNeed(read.m) // 过滤掉read.m 中为空的数据,如果没有空数据直接返回nil
    m.read.Store(readOnly{m: noNilMap, amended: true})

    m.mu.Unlock()
}


func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended || (ok && atomic.Load(e.p) == nil){
        m.mu.Lock()

        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended || (ok && atomic.Load(e.p) == nil){
            e, ok = m.dirty[key]

            m.misses++
            if m.misses >= len(m.dirty) {
                noNilMap := fliterNilIFNeed(read.m) // 过滤掉read.m 中为空的数据,如果没有空数据直接返回nil
                allDataMap := merge(noNilMap,m.dirty)
                m.read.Store(readOnly{m: allDataMap})
                m.dirty = nil
                m.misses = 0
            }
        }
        m.mu.Unlock()
    }
    if !ok {
        return nil, false
    }
    return e.load()
}

这样实现逻辑上好像也没有问题,不过每次LoadStore一个read中的nil,都需要加锁,然后会过滤readnil数据,都有数据的拷贝操作。如果在删除以后立即读的场景性能可能会非常差。

总结:dirty 全量拷贝 read 数据,就是好一个空间换时间的操作。

3.3 可以不要 expunged 状态吗?

func (m *Map) dirtyLocked() {
    if m.dirty != nil {
        return
    }

    read := m.loadReadOnly()
    m.dirty = make(map[any]*entry, len(read.m))
    for k, e := range read.m {
        if !e.tryExpungeLocked() {
            m.dirty[k] = e
        }
    }
}

func (e *entry) tryExpungeLocked() (isExpunged bool) {
    p := e.p.Load()
    for p == nil {
        if e.p.CompareAndSwap(nil, expunged) {
            return true
        }
        p = e.p.Load()
    }
    return p == expunged
}

expunged 状态本质是一个中间标记,保证readdirty同步过程中,是线程安全的,然后这样读写read的时候可以不用加速。不然的话如下图

func (m *Map) dirtyLocked() {
    if m.dirty != nil {
        return
    }

    read := m.loadReadOnly()
    m.dirty = make(map[any]*entry, len(read.m))
    for k, e := range read.m {
        if e.p == nil { 
           // 这里不能保证是原子的,这个过程中,可能 e.p 又被赋值了。
            m.dirty[k] = e
        }
    }
}

image.png
image.png

3.4 runtime 库使用sync.Map 优化

syncmapx-Repositories 里面给其他用户试用几个月以后,最终被合入了Go源码包中,详见sync: import Map from x/sync/syncmap

然后又优化了一波源码中使用RWMutex的代码,都改成了Sync.Map,修改以后代码相对之前也更简单一些了。

reflect: use sync.Map instead of RWMutex for type caches

expvar: replace RWMutex usage with sync.Map and atomics

mime: use sync.Map instead of RWMutex for type lookups

mime: use sync.Map instead of RWMutex for type lookups

encoding/xml: replace tinfoMap RWMutex with sync.Map

net/http: use sync.Map instead of RWMutex for ServeMux.m

net/rpc: use a sync.Map for serviceMap instead of RWMutex

archive/zip: replace RWMutex with sync.Map

四、总结

很多时候都没有十全十美的方案,方案设计的越general,需要考虑的场景就越多,需要做tradeoff的地方也就越多。