一、背景
前段时间有个朋友来问我Go
的Sync.Map
性能怎么样,一般什么场景推荐使用。一句话介绍的话,就是Sync.Map
底层有两个map
,一个是read
,一个是dirty
,读写read
中数据不需要加锁,读写dirty
不用需要加锁,适用于读多写少的场景。
碎碎念
其实2020
年的时候Go
源码里面一些比较常用的包都大致看了一遍,当时跟槊槊
、大飞哥
、周老板
空闲时间天天讨论各种技术细节,包括但不仅限于操作系统
、MySQL
、Redis
、分布式
、Go
、项目架构方法论
等。很多时候观点不合还会争的面红耳赤,最后还会上升到人生攻击,你不服我,我也不服你(实际上互有对错,我也被打过几次脸)。因为有的东西,网上有很多错误的资料,导致我养成了一个习惯,找资料的时候我一般都是去看一些权威的技术书
或者直接去看开源组件源码
,能用代码说的话绝不跟你多BB
(talk is cheap, show you the code
),用这些东西去反驳别人的观点。虽然有过很多次争吵,但是我们所有人的感觉都是一样,大家都觉得个人的技术能力
、技术眼界
都有了质的提升,精神上也有很大的满足感。现在三个老板,都去了其他大厂,也拿到了自己期望的PKG
了,也在各自忙着自己的工作了,技术交流的世界也少了。
关于这些成长经历,就想说三点,一个是费曼教学法,你学会的东西并不代表你一定懂了,你教懂别人才能表示你彻底懂了,因为别人会站他思考的角度来思考一些你可能没考虑到的细节,用这些细节来反问你。当你把这些细节都了解通,并能解答被人的全部问题了,你才是真的懂了。二是,持续跟身边优秀的人去交流沟通,你一定会有很大的成长。如果你觉得自己足够优秀了,那你去开源社区逛逛,那你一定会发现你远远没你想的那么优秀。三是,选择大于努力,有时候你总会觉得有的人能力不如你,确混的比你好。一是可能你没看到别人在其他方面的努力,还有一个就是别人路选择的比你好。每次做选择的时候都要想清楚自己的tradeoff
是什么,选择以后就接受现实,别再纠结为什么要这样选,记得有个本书叫《高效人士的七个习惯》,其他的都记不清楚了,就记得一个观点,一个人有自己的关注圈
和影响圈
,我们应该花更多的精力去做好自己能改变的影响圈
,比如提高个人能力
,减少去关注一些自己不能改变的东西比如经济形势
、就业形势
、个人工资
等。
人的精力是有限的,当你过度聚焦一件事,其他的事情可能很容易被忽略,比如聚焦管理技巧、沟通技巧,那你的技术成长速度就跟不上其他人。我个人更多的精力是关注技术
和技术本身能带来的业务价值
,所以我对开毫无意义的会
、权利的游戏
、无意义的social
都不感兴趣,但是我喜欢跟有技术追求的人social(交流技术)
,在字节这几年的确也碰到了不优秀的同事和老板。我给自己的定位是工作上成为业务专家,之前的飞书老板就是很好的一个业务专家,整个飞书IM
架构基本上都是他设计的。很多时候做方案的时候,只有足够的Context
,他大多时候给你选择一个最合适的方案。工作之外脱离业务,希望自己,能成为某个方面的技术专家,以后能持续做开源相关的事情。
目的
后面可能会系统性对Go
源码做总结,产出blog
,方便以后自己快速查阅。
还想对Go
的内存管理和GC
也做个系统总结。今年估计大概率弄不完,一个是工作太忙,周末休息还要陪娃。最近还打算看下Rust
(作为一个Geeker
,这么多大佬都在推崇这个,不了学习下说不过去)。
二、Sync.Map 产生的背景
2.1 Sync.RWMutex 多核的伸缩性问题
早在2016
的时候,@bcmills(这个哥们是Go
项目主要维护者之一) 在Go
的 Github
上提出了一个sync: RWMutex scales poorly with CPU count 的Issue
给大家讨论。简单说就是 Sync.RWMutex
这个读写锁,多核情况下扩展性很差。他贴的 Benchmark
测试代码如下:
func BenchmarkRWMutex(b *testing.B) {
for ng := 1; ng <= 256; ng <<= 2 { // ng 表示,开多少个 goroutine
b.Run(fmt.Sprintf("name[%d]", ng), func(b *testing.B) {
var mu sync.RWMutex
mu.Lock()
var wg sync.WaitGroup
wg.Add(ng)
n := b.N // n 表示下面要执行多少次 RLock 和 RUnlock
quota := n / ng // quota 表示分摊到每个 goroutine 上需要执行多少次 Lock 和 RUnlock
for g := ng; g > 0; g-- {
if g == 1 { // n / ng 不是整除的话,剩下余出来的数据,在g=1 的时候全部减掉,不然下面 n 不会等于0
quota = n
}
go func(quota int) {
for i := 0; i < quota; i++ { // 一个循环执行一次 RLock 和 RUnlock
mu.RLock()
mu.RUnlock()
}
wg.Done()
}(quota)
n -= quota
}
if n != 0 {
b.Fatalf("Incorrect quota assignments: %v remaining", n)
}
b.StartTimer() // 从这里开始计时
mu.Unlock() // 这里释放写锁,上面所有阻塞在 RLock 的 goroutine 同时唤醒去执行 RLock
wg.Wait() // 所有 goroutine 的 RLock 和 RUnlock 都执行完毕
b.StopTimer() // 从这里结束计时
})
}
}
Benchmark
的结果可以看出,在多个Gorutine
并发下,可以看到CPU
核数越多,RWLock
的性能越差。
# ./benchmarks.test -test.bench . -test.cpu 1,4,16,64
testing: warning: no tests to run
BenchmarkRWMutex/1 20000000 72.6 ns/op
BenchmarkRWMutex/1-4 20000000 72.4 ns/op
BenchmarkRWMutex/1-16 20000000 72.8 ns/op
BenchmarkRWMutex/1-64 20000000 72.5 ns/op
BenchmarkRWMutex/4 20000000 72.6 ns/op
BenchmarkRWMutex/4-4 20000000 105 ns/op
BenchmarkRWMutex/4-16 10000000 130 ns/op
BenchmarkRWMutex/4-64 20000000 160 ns/op
BenchmarkRWMutex/16 20000000 72.4 ns/op
BenchmarkRWMutex/16-4 10000000 125 ns/op
BenchmarkRWMutex/16-16 10000000 263 ns/op
BenchmarkRWMutex/16-64 5000000 287 ns/op
BenchmarkRWMutex/64 20000000 72.6 ns/op
BenchmarkRWMutex/64-4 10000000 137 ns/op
BenchmarkRWMutex/64-16 5000000 306 ns/op
BenchmarkRWMutex/64-64 3000000 517 ns/op
BenchmarkRWMutex/256 20000000 72.4 ns/op
BenchmarkRWMutex/256-4 20000000 137 ns/op
BenchmarkRWMutex/256-16 5000000 280 ns/op
BenchmarkRWMutex/256-64 3000000 602 ns/op
PASS
为什么多核下面会更慢?其实很简单,就是资源竞争会增加额外开销。RLock
和RUnlock
,底层实现是atomic.AddInt32
,atomic.AddInt32
对应的汇编代码如下:
// uint32 Xadd(uint32 volatile *val, int32 delta)
// Atomically:
// *val += delta;
// return *val;
TEXT ·Xadd(SB), NOSPLIT, $0-20
MOVQ ptr+0(FP), BX
MOVL delta+8(FP), AX
MOVL AX, CX
LOCK
XADDL AX, 0(BX)
ADDL CX, AX
MOVL AX, ret+16(FP)
RET
可以看到里面有 LOCK
前缀的指令,Lock
其实就是CPU
层面的一个锁,锁的单位是Cache Line
具体可以参考 Golang Memory Model 里面的详细介绍。多个核都要同时更新这个Cacheline
,所以性能就有所下降。
2.2 如何去优化?
我们知道,在业务中遇到锁的性能瓶颈时候,我们一般会下面几个方面去考虑优化锁。
- 优化锁的粒度
- 读写分离
- 减少锁持有时间。
- 使用
CAS
2、3、4 在这个读写锁场景都不试用(已经是读写锁了,且瓶颈也在CAS
对cacheline
的资源竞争),所以只能从锁的粒度方向考虑。
2.2.1 distributedrwmutex
@dvyukov(Go
小组成员之一) 提出了一个分布式读写锁的方案,
核心原理就是,一个P
对应一个读写锁,这样读锁在多核情况就没有竞争的问题了,因为每个核的读锁是独立的,互不影响(有点类似 ThreadLocal
的概念)。具体核心代码如下:
func (m *DistributedRWMutex) RUnlock() {
l := m.getLocal()
l.RUnlock()
}
func (m *DistributedRWMutex) getLocal() *sync.RWMutex {
v := runtime.GetProcLocal(m.slot)
p := (*sync.RWMutex)(unsafe.Pointer(uintptr(*v)))
if p == nil {
p = new(sync.RWMutex)
atomic.AddUint64(v, uint64(uintptr(unsafe.Pointer(p))))
}
return p
}
不过这个实现方式也有一个问题需要注意。就是Goroutine
和P
不是强绑定的。有可能你在某个P
执行Lock
以后,做了系统调用
这个时候M、G
和P
可能会解绑,系统调用完成回来的时候,可能绑定的是一个新的P
了。这个时候再去调用getLocal
可能拿到的已经是不一样的锁对象了,再去用这个锁对象去调用RUnlock
是有问题的。一般这种需要在Goroutine
里面直接拿到RWLock
锁对象。类似下面这种:
// ...balabala...
go func() {
rwx := rw.RLocker() // 这里拿到当前P对应的ReadLocker
rwx.Lock()
defer rwx.Unlock()
// ... balabala...
// syscall 这里切换P了也没影响
}()
// ...balabala...
还有一个 drwmutex 库也是这个思想,这里不过多赘述。
@bcmills 的回复说,老的RWMutex
接口,是允许在不同的Goroutine
或者P
里面调用RLock / RUnlock
,考虑兼容性问题,不太想做这样的改造。
2.2.2 Atomic.Value
还有更大的问题是当时(GO1.8
)一些基础库中大量使用了RWMutex
作为包级锁。比如reflect、http.statusMu、json.encoderCache、mime.mimeLock
@dvyukov 指出这些场景其实可以用Atomic.Value
去实现,类似场景有encoding/json/encode.go:cachedTypeFields
// cachedTypeFields is like typeFields but uses a cache to avoid repeated work.
func cachedTypeFields(t reflect.Type) []field {
m, _ := fieldCache.value.Load().(map[reflect.Type][]field)
f := m[t]
if f != nil {
return f
}
// Compute fields without lock.
// Might duplicate effort but won't hold other computations back.
f = typeFields(t)
if f == nil {
f = []field{}
}
fieldCache.mu.Lock()
m, _ = fieldCache.value.Load().(map[reflect.Type][]field)
newM := make(map[reflect.Type][]field, len(m)+1)
for k, v := range m {
newM[k] = v
}
newM[t] = f
fieldCache.value.Store(newM)
fieldCache.mu.Unlock()
return f
}
PS:Atomic.Load
转汇编其实就是简单的MOV
指令,没有LOCK
所以没有Cacheline
资源竞争的问题。
mime: use atomic.Value to store mime types 这个CL
也是尝试用atomic.Value
去替代sync.RWMutex
。
这个实现,虽然读的时候没有资源竞争的问题。但是写的时候是O(n)
的开销。这个方案对写太不友好。
2.2.3 基于二叉树实现 - dmap
@ianlancetaylor 基于二叉树实现了dmap
,dmap
的插入时间复杂度是O(LogN)
,insert
就是常规的写入操作,这里就不过多去赘述了。
// Insert inserts a key/value pair into a dmap.
func (d *dmap) insert(key, val interface{}) {
var n *node
for { // 判断根节点是不是为空。为空直接加锁然后写Root,否则就拿到根节点
root, _ := d.root.Load().(*node)
if root != nil {
n = root
break
}
root = &node{
key: key,
val: val,
}
d.mu.Lock()
if d.root.Load() == nil {
d.root.Store(root)
d.mu.Unlock()
return
}
d.mu.Unlock() // 走到这表示,有其他 goroutine 写了根节点,会循继续去 load 根节点
}
// 到这里,n 表示是 root 节点
for {
cmp := d.compare(key, n.key) // 判断两个 key是否相等
if cmp == 0 {
if val != n.val {
panic("invalid double-insert")
}
return
}
p := &n.left
if cmp > 0 { // key 大于当前节点key。就找右节点
p = &n.right
}
n2, _ := (*p).Load().(*node)
if n2 != nil { // 当前节点不为空,继续重新走循环,比较key和 n.key 大小
n = n2
} else { // 当前节点为空,尝试写入,写入失败,就继续重新走循环逻辑
n2 = &node{
key: key,
val: val,
}
n.mu.Lock()
if (*p).Load() == nil {
(*p).Store(n2)
n.mu.Unlock()
return
}
n.mu.Unlock()
}
}
}
查找的实现,有fastpath
和slowpath
两个路径,fastpath
用的是map
来查找,命中的话就直接返回,时间复杂度是O(1)
的,map
中没查到的话,会去二叉树里面查,时间复杂度是O(LogN)
。有个tricky
的地方是,没有命中map
但是在二叉树中查到这个key
的话,会对这个key
的count+1
,如果这个key
的miss count
大于map
的长度的话,会复制一下map
然后把新的map
回写到Atomic.Value
里面。
// Lookup looks up a key in the distributed map.
func (d *dmap) lookup(key interface{}) interface{} {
// Common values are cached in a map held in the root.
m, _ := d.m.Load().(map[interface{}]interface{})
if val, ok := m[key]; ok { // map里面找到了,直接返回
return val
}
n, _ := d.root.Load().(*node)
for n != nil {
cmp := d.compare(key, n.key)
if cmp == 0 {
count := atomic.AddUint32(&n.count, 1)
// Add this key/val pair to the map in the root,
// but only if it's worth copying the existing map.
if count < 0 || (count > 1 && int(count) > len(m)) {
newm := make(map[interface{}]interface{}, len(m)+1)
for k, v := range m {
newm[k] = v
}
newm[key] = n.val
// It's possible that some other
// goroutine has updated d.m since we
// loaded it. That means we did extra
// work but it's otherwise OK.
// 这里如果有多个 goroutine 写会导致有互相覆盖的问题
d.m.Store(newm)
}
return n.val
}
p := &n.left
if cmp > 0 {
p = &n.right
}
n, _ = (*p).Load().(*node)
}
return nil
}
2.2.4 两个map
@bcmills 基于上面 @ianlancetaylor 的二叉树加map
的思想优化了下。用map
替代了二叉树。具体实现如下:
// Map is a key-value map from which entries can be read without external
// synchronization.
type Map struct {
tenured atomic.Value // 年老代 map
liveNotTenured uint32 // 记录 miss count
mu sync.RWMutex // 对 live 读写的时候,需要用到这个读写锁
live map[interface{}]interface{}
}
读的话,先去tenured
里面去读,读tenured
不用加锁,读写live
用的是读写锁,然后根据misscount
把live
复制给tenured
func (b *Map) Load(key interface{}) (value interface{}, ok bool) {
m, _ := b.tenured.Load().(map[interface{}]interface{})
if value, ok = m[key]; ok {
return value, true
}
b.mu.RLock()
promote := false
if b.live != nil {
value, ok = b.live[key]
lnt := atomic.AddUint32(&b.liveNotTenured, 1)
if lnt >= 1<<31 || int(lnt) >= len(b.live) {
promote = true
}
}
b.mu.RUnlock()
if !promote {
return value, ok
}
b.mu.Lock()
lnt := atomic.LoadUint32(&b.liveNotTenured)
if b.live != nil && (lnt >= 1<<31 || int(lnt) >= len(b.live)) {
b.tenured.Store(b.live)
b.live = nil
atomic.StoreUint32(&b.liveNotTenured, 0)
}
b.mu.Unlock()
return value, ok
}
写的话,很简单,只写live
。
func (b *Map) StoreOrLoad(key, value interface{}) (actualValue interface{}, dup bool) {
b.mu.Lock()
if b.live == nil {
m, _ := b.tenured.Load().(map[interface{}]interface{})
b.live = make(map[interface{}]interface{}, len(m)+1)
for k, v := range m {
b.live[k] = v
}
}
actualValue, dup = b.live[key]
if !dup {
b.live[key] = value
actualValue = value
}
b.mu.Unlock()
return actualValue, dup
}
三、Sync.Map 的最终实现
经过一轮讨论以后,@bcmills 单独发了一个提案:sync: add a Map to replace RWLock+map usage 最终决定不去修复RWLock
的伸缩性问题,而是提供一个可伸缩并发安全的Map
来做。 这个并发安全的Map
实现方案就是用的上面双Map
实现。然后这个并发安全的Map
会先放在 x-Repositories 包中经过一段时间迭代,如果没问题了再收敛到Go
源码包中。具体可以看 syncmap: add a synchronized map implementation。
@bcmills 基于双map的demo,做了一些优化,新增了一些API
,比如Delete
、Range
等。提交了一个正式的 CR:
// A Map must not be copied after first use.
type Map struct {
mu sync.Mutex
// clean 是 fastpath 用的,读的时候不用加锁,没有cacheline竞争问题
clean atomic.Value // map[interface{}]interface{}
// dirty 读写都需要加锁
dirty map[interface{}]interface{}
// 如果clean没有查到,这个时候misses会加1
// 当 misses >= len(dirty),会把dirty赋值给clean,然后情况dirty
misses int
}
我们再来看下数据读取的实现,这个里面有几点需要注意,跟上面双map的demo 不同的事,这里的实现是clean
和dirty
两个map
只会有一个不为空。所以读的时候,如果clean
不为空就直接读clean
,并不会再去dirty
读一次。如果dirty
不为nil
,读取以后还会调用一下m.missLocked
,这个函数主要的作用是判断对m.misses
加1
,然后判断要不要把dirty
赋值给clean
,然后清空dirty
。
// Load returns the value stored in the map for a key, or nil if no
// value is present.
// The ok result indicates whether value was found in the map.
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
clean, _ := m.clean.Load().(map[interface{}]interface{})
if clean != nil {
value, ok = clean[key]
return value, ok
}
m.mu.Lock()
if m.dirty == nil {
clean, _ := m.clean.Load().(map[interface{}]interface{})
if clean == nil {
// Completely empty — promote to clean immediately.
m.clean.Store(map[interface{}]interface{}{})
} else {
value, ok = clean[key]
}
m.mu.Unlock()
return value, ok
}
value, ok = m.dirty[key]
m.missLocked()
m.mu.Unlock()
return value, ok
}
func (m *Map) missLocked() {
if m.misses++; m.misses >= len(m.dirty) {
m.clean.Store(m.dirty)
m.dirty = nil
}
}
Store
的函数就比较简单了。如果写入的时候,直接加锁,然后判断dirty
是否为空,如果是空,需要把clean
数据复制一份给dirty
然后清空clean
,然后再把数据赋值给dirty
。
// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
m.mu.Lock()
m.dirtyLocked()
m.dirty[key] = value
m.mu.Unlock()
}
// dirtyLocked prepares the map for a subsequent write.
// It ensures that the dirty field is non-nil and clean is nil by making a deep
// copy of clean.
func (m *Map) dirtyLocked() {
m.misses = 0
if m.dirty != nil {
return
}
clean, _ := m.clean.Load().(map[interface{}]interface{})
m.dirty = make(map[interface{}]interface{}, len(clean))
for k, v := range clean {
m.dirty[k] = v
}
m.clean.Store(map[interface{}]interface{}(nil))
}
这个实现其实有个很大的问题,就是如果有频繁读写交替的话,会导致数据一直在clean
和dirty
两个map
中来回copy
,如果map
很大的话,这个性能会很差,还会阻塞其他线程的读写,但是这个CR当时的场景是期望提供给Runtime包中一些读多写少的场景使用,所以看benchmark
跑的性能还是有很大的提升的。
代码合入的时候 @rsc 提了两点优化建议
- 如果允许
clean != nil and dirty != nil
会更好。 - 如果一个
key
没有被覆盖或者删除的话,它命中了lock-free path
后续理论上应该一直命中lock-free path
会更好一些。
3.1 进一步优化
过了几个月,基于 @rsc 之前合码的时候给的建议,@bcmills 又优化了一版,整个sync.Map
的结构变成了下面这样:
type Map struct {
mu sync.Mutex
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int
}
type readOnly struct { // readOnly 的 map
m map[interface{}]*entry
amended bool // amended=true m没有全部key的数据,没查到还需要去dirty查下.
}
var expunged = unsafe.Pointer(new(interface{})) // 表示这个数据已经不在dirty中了。
type entry struct {
p unsafe.Pointer // *interface{}
}
主要改动只读的map
,之前叫clean
类型是map[interface{}]interface{}
,现在改成了read
,类型是readOnly struct
,readOnly
还有个amended
表示当前readOnly.m
是不是全量数据。我们继续往下Store
的代码
func (m *Map) Store(key, value interface{}) {
// fast-path
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
// 如果这个 key 在 read 里面找到了以后,尝试直接调用 tryStore 去更新 value 数据
// tryStore 里面会做两件事
// 1. 判断当前 entry.p 是不是等于 expunged,等于 expunged 就不能更新,直接返回false。下面会走 slow-path去更新
// 2. 如果不是 expunged ,那就尝试更新 entry.p = &value,如果 CAS 设置成功了就返回。
// 如果是 expunged 状态,表面 dirty 里面已经没有这个 key了,如 read 里面更新这个东西,下次 dirty数据全量提升为 read 的时候,这个数据就会丢失。
return
}
// 下面是 slow-path
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() { // e.unexpungeLocked 尝试CAS(&e.p, expunged, nil)
m.dirty[key] = e // 把 e 赋值给 dirty
}
// 到这里 e.p 肯定不是等于 expunged 了
e.storeLocked(&value) // 设置 e.p = &value
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value) // 如果只在dirty里面有,直接设置 e.p = &value
} else {
if !read.amended { // 如果目前 read 有全量数据,但是 read 和 dirty 都没有这个 key
m.dirtyLocked() // dirtyLocked 这个函数主要做的就是,把 read 里面的 e.p != nil && e.p != expunged 的元素 copy 一份赋值给 dirty
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value) // dirty 保存这个 kv
}
m.mu.Unlock()
}
总结下Store
主要做了下几件事:
fast-path
路径- 看下
read
中是否有这个key
,有的话尝试调用tryStore
,把设置的value
保存到entry
对象中去。 tryStore
里面会判断entry.p
是不是expunged
状态,是的话就不能设置,需要走slow-path
- 如果不是的话保存成功就直接返回。
- 看下
slow-path
路径- 会先加互斥锁
- 看下
read
中是否有这个key
,有的话尝试调用unexpungeLocked
,CAS
方式清除entry.p
的expunged
状态。如果清楚成功,会在dirty
里面添加这个数据。如果没有清楚成功,说明状态不是expunged
,可以直接更新read
的entry.p=&value
就行了。 - 不在
read
里面,在dirty
里面,直接设置entry.p=&value
就行了。 read
和dirty
都没有找到这个key
,先看下read
是不是有全量数据,是的话,就调用m.dirtyLocked
,把read
数据copy
一份到dirty
,并设置read.amended=true
,表示read
里面已经没有全量数据了,需要去drity
里面找。- 最后设置
m.dirty[key] = newEntry(value)
,dirty 保存这个 kv
在来看下Load
相关代码:
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended { // 如果 read 没找到,且 read 没有全量数据
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key] // 加锁以后,这里需要 double check一下
if !ok && read.amended {
e, ok = m.dirty[key] // 去 dirty map 读
m.missLocked() // 这里面对 misscount+1 ,然后看下是否需要把 dirty 全部给 read,然后设置 dirty 为 nil。
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load() // 如果 e.p != nil && e.p == expunged , 就把 e.p 的指向的值转成Interface返回
}
最后再来看下Delete
的怎么做的,Delete
其实比较简单,就是设置,在read
里面找到这个entry
然后设置e.p=nil
,如果在dirty
中就直接调用delete
方法删除这个key
。
func (m *Map) Delete(key interface{}) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete()
}
}
func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}
3.2 思考:dirty 能否不全量拷贝 read?
正常思路,为了节省内存,dirty
里面只存增量数据,可以吗?反向推理下如果dirty
只存增量的数据,那就不需要read
到dirty
的数据同步操作了,那也不需要expunged
状态了。所以read
的中元素e.p=nil
的时候,表示删除了,由于没有了read
到dirty
的复制,所以需要定期滤掉read
中删除的数据(e.p = nil
)并重新给read
赋值,那Store
的时候,如果read
的e.p=nil
的话就不能再更新了。因为定期过滤掉read
中删除的数据可能会把这个entry
给删除掉,导致这个key
对应的数据丢失了。所以Store
和Load
伪代码如下:
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.p != nil {
ok:= CAS(e.p, old, &value) // 注意这里要是 old = nil 时候不能再继续尝试 CAS
if ok{
return
}
// cas 失败继续往下走
}
m.mu.Lock() // 加锁
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if atomic.Load(e.p) != nil{
atomic.Store(e.p,&value)
return
}
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
}
// read 查到了 e 但是 e.p == nil
// read 和 dirty 都没查到
m.dirty[key] = newEntry(value)
noNilMap := fliterNilIFNeed(read.m) // 过滤掉read.m 中为空的数据,如果没有空数据直接返回nil
m.read.Store(readOnly{m: noNilMap, amended: true})
m.mu.Unlock()
}
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended || (ok && atomic.Load(e.p) == nil){
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended || (ok && atomic.Load(e.p) == nil){
e, ok = m.dirty[key]
m.misses++
if m.misses >= len(m.dirty) {
noNilMap := fliterNilIFNeed(read.m) // 过滤掉read.m 中为空的数据,如果没有空数据直接返回nil
allDataMap := merge(noNilMap,m.dirty)
m.read.Store(readOnly{m: allDataMap})
m.dirty = nil
m.misses = 0
}
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
这样实现逻辑上好像也没有问题,不过每次Load
和Store
一个read
中的nil
,都需要加锁,然后会过滤read
的nil
数据,都有数据的拷贝操作。如果在删除以后立即读的场景性能可能会非常差。
总结:dirty 全量拷贝 read 数据,就是好一个空间换时间的操作。
3.3 可以不要 expunged 状态吗?
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read := m.loadReadOnly()
m.dirty = make(map[any]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := e.p.Load()
for p == nil {
if e.p.CompareAndSwap(nil, expunged) {
return true
}
p = e.p.Load()
}
return p == expunged
}
expunged
状态本质是一个中间标记,保证read
往dirty
同步过程中,是线程安全的,然后这样读写read
的时候可以不用加速。不然的话如下图
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read := m.loadReadOnly()
m.dirty = make(map[any]*entry, len(read.m))
for k, e := range read.m {
if e.p == nil {
// 这里不能保证是原子的,这个过程中,可能 e.p 又被赋值了。
m.dirty[k] = e
}
}
}
3.4 runtime 库使用sync.Map 优化
syncmap
在x-Repositories 里面给其他用户试用几个月以后,最终被合入了Go
源码包中,详见sync: import Map from x/sync/syncmap
然后又优化了一波源码中使用RWMutex
的代码,都改成了Sync.Map
,修改以后代码相对之前也更简单一些了。
reflect: use sync.Map instead of RWMutex for type caches
expvar: replace RWMutex usage with sync.Map and atomics
mime: use sync.Map instead of RWMutex for type lookups
mime: use sync.Map instead of RWMutex for type lookups
encoding/xml: replace tinfoMap RWMutex with sync.Map
net/http: use sync.Map instead of RWMutex for ServeMux.m
net/rpc: use a sync.Map for serviceMap instead of RWMutex
archive/zip: replace RWMutex with sync.Map
四、总结
很多时候都没有十全十美的方案,方案设计的越general
,需要考虑的场景就越多,需要做tradeoff
的地方也就越多。