一、背景
sync.Mutex
里面用了runtime_SemacquireMutex
和runtime_Semrelease
,所以看下这个runtime
的信号量是如何实现的。
二、基础知识
2.1 信号量
信号量(英语:semaphore
)又称为信号标,是一个同步对象,用于保持在0
至指定最大值之间的一个计数值。当线程完成一次对该semaphore
对象的等待(wait
)时,该计数值减一;当线程完成一次对semaphore
对象的释放(release
)时,计数值加一。当计数值为0
,则线程等待该semaphore
对象不再能成功直至该semaphore
对象变成signaled
状态。semaphore
对象的计数值大于0
,为signaled
状态;计数值等于0
,为nonsignaled
状态。
信号量的概念是由荷兰计算机科学家艾兹赫尔·戴克斯特拉(Edsger W. Dijkstra
)发明的,广泛的应用于不同的操作系统中。在系统中,给予每一个进程一个信号量,代表每个进程目前的状态,未得到控制权的进程会在特定地方被强迫停下来,等待可以继续进行的信号到来。如果信号量是一个任意的整数,通常被称为计数信号量(Counting semaphore
),或一般信号量(general semaphore
);如果信号量只有二进制的0
或1
,称为二进制信号量(binary semaphore
)。
计数信号量具备两种操作动作,称为V
(signal()
)与P
(wait()
)(即部分参考书常称的PV操作
)。V
操作会增加信号标S
的数值,P
操作会减少它。
P原语
:P
是荷兰语Proberen
(测试)的首字母。为阻塞原语,负责把当前进程由运行状态转换为阻塞状态,直到另外一个进程唤醒它。操作为:申请一个空闲资源(把信号量减1
),若成功,则退出;若失败,则该进程被阻塞;V原语
:V
是荷兰语Verhogen
(增加)的首字母。为唤醒原语,负责把一个被阻塞的进程唤醒,它有一个参数表,存放着等待被唤醒的进程信息。操作为:释放一个被占用的资源(把信号量加1
),如果发现有被阻塞的进程,则选择一个唤醒之。
2.2 Treap
Treap
是Binary Search Tree
+Heap
的组合。
二叉查找树(Binary Search Tree
),它或者是一棵空树,或者是具有下列性质的二叉树:若它的左子树不空,则左子树上所有结点的值均小于它的根结点的值;若它的右子树不空,则右子树上所有结点的值均大于它的根结点的值; 它的左、右子树也分别为二叉排序树。
堆有大顶堆
和小顶堆
:
大顶堆
:每个节点的值都大于或者等于他的左右孩子节点的值。小顶堆
:每个结点的值都小于或等于其左孩子和右孩子结点的值。
Treap
既是一棵二叉查找树,也是一个二叉堆。但是这两种数据结构貌是矛盾的存在,如果是二叉查找树,就不能是一个堆,如果是一个堆,那么必然不是二叉查找树。
所以Treap
用了一个很巧妙的方式解决这个问题:给每个键值一个随机附加的优先级,让键值满足二叉查找树的结构,让优先级满足二叉堆的结构。
Treap
它的最大优点就是实现简单,没有太多复杂的操作,但是我们前面也说了,它是通过随机的priority
来控制树的平衡的,那么它显然无法做到完美平衡,只能做到不落入最坏的情况。
2.3 x/sync/semaphore
Go
的 X-Repositories 提供了一种带权重的信号量实现方式 sync.semaphore。这个跟runtime.semaphore
其实没太大关系。主要是提供了个high-level
的信号量给Go
开发者使用。实现方式如下:
type Weighted struct {
size int64 // 资源的总数,Acquire(n) 的时候会消耗这个资源,
cur int64 // 当前已申请资源数,Acquire(n)成功的话,cur=cur+n
mu sync.Mutex // 互斥锁,所有Acquire、Release 都要加锁
waiters list.List // 阻塞的队列
}
type waiter struct {
n int64
ready chan<- struct{} // 使用 channle 来通信
}
Weighted
就是一个权重的信号量,主要提供Acquire(n)
和Release(n)
两个操作。实现逻辑比较简单。
Acquire(n)
申请n
个资源,申请成功的话会设置cur=cur+n
,如果没有资源可以申请了,会new
一个waiter
,然后把这个waiter
加到waiters
这个等待的队列中,并阻塞在waiter.ready
的读上面。
Release(n)
释放n
个资源,然后设置cur=cur-n
,在waiters
这个等待的队列中,循环取出取waiters
头部的waiter
(直到s.size-s.cur < w.n
终止),调用close(waiter.ready)
,这样阻塞在waiter.ready
读上的goroutine
会被唤醒。
下面写了个简单Demo
:
func main() {
var (
wg sync.WaitGroup
ctx = context.Background()
)
cpuNum := runtime.GOMAXPROCS(0)
sem := semaphore.NewWeighted(int64(cpuNum)) // 设置 goroutine 最大并发数 = cpuNum
for i := 0; i < 100; i++ {
wg.Add(1)
err := sem.Acquire(ctx, 1)
if err != nil {
panic(err)
}
go func(i int) {
fmt.Println("ng: ", runtime.NumGoroutine(), " i = ", i)
sem.Release(1)
wg.Done()
}(i)
}
wg.Wait()
}
2.4 sudog
sudog
表示为了一个等待队列中的goroutine
,比如因为读写channel
阻塞,或者Lock
导致阻塞(Lock
底层其实就是semaphore
)等的goroutine
,由于这些阻塞的goroutine
不确定什么时候能被调度(取消阻塞比如unlock
、读写channel
),所以这种阻塞的goroutine
不适合一直放在p
的本地运行队列中,这个时候会把阻塞的gorutine
打包成一个sudog
里面会有一些这个g
运行的上下文。然后存到另外一个地方,比如channel
是存在 recvq 和 sendq 中,而阻塞在信号量上的goroutine
是存在 semTable 中。
具体sudog
的结构体如下:
// sudog 代表一个处于等待队列的g,比如阻塞在读写 channle上 的 goroutine
// sduog 和 g 是多对多的关系,一个 g 可以在多个 wait lists 上,所以一个 g 可以对应多个 sduog
// sudogs 会有两级缓存,优先去p的sudogcache取,取不到则去全局的sudogcache取一批(直到本地容量达到50%)
// sudogs 通过 acquireSudog 和 releaseSudog 去申请或释放
type sudog struct {
// 在channel的场景中sudog中所有字段都受hchan.lock保护
g *g
next *sudog // 双向链表,指向下一个 sduog
prev *sudog // 双向链表,指向上一个 sduog
// channel场景存的是,读写的数据。
// semaphore 场景存的是信号量的地址。
elem unsafe.Pointer
// 下面这些字段不会被并发访问
// For channels, waitlink is only accessed by g.
// For semaphores, 所有字段需要拿到semaRoot的lock才能访问
acquiretime int64 // semaphore 场景使用的,记录获取信号量时间
releasetime int64 // 释放时间
ticket uint32 // treap 里面堆用的随机的权重
// 只在select 场景使用的字段,表明当前g是否被选中,然后唤醒
isSelect bool
// 只在channel场景使用的
success bool
// 只在 semaphore 场景使用
parent *sudog // semaRoot binary tree
waitlink *sudog // semaRoot 节点对应的等待列表
waittail *sudog // semaRoot 等待列表的尾部节点
c *hchan // 只在channel场景使用的,关联的channel
}
从sodug
结构体看出来,sudog
里面的字段分别跟channel
、semaphore
、select
几个场景有关。某些字段只有在特点场景才会用到,感觉全部都耦合在一个Struct
不够优雅
。
2.5 semtable
semTable 是一个长度为251
的全局数组,每个semaRoot
指向一个treap
,主要用于存放阻塞在信号量(semaphore
)上的sudog
var semtable [semTabSize]struct {
root semaRoot
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte // 防止 flase sharing
}
semaRoot
最早是双向链表,在某些场景下性能比较查,所以优化成了treap
,具体可以看 CR37103
优化之后semtable
存储的结构大概是这样:
2.6 mutex
runtime
包里面的锁都是使用的runtime
内部实现的mutex
,具体是使用CAS
+futex
来实现的。更多详见Go源码——runtime.mutex
三、semaphore 源码分析
semaphore
基本操作,在 src/sync/runtime.go 定义了下面几个方法:
// Semacquire等待*s > 0,然后原子递减它。
// 它是一个简单的睡眠原语,用于同步库使用。
func runtime_Semacquire(s *uint32)
// SemacquireMutex类似于Semacquire,用来阻塞互斥的对象
// 如果lifo为true,waiter将会被插入到队列的头部
// skipframes是跟踪过程中要省略的帧数
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
// Semrelease会自动增加*s并通知一个被Semacquire阻塞的等待的goroutine
// 它是一个简单的唤醒原语,用于同步库
// 如果handoff为true, 传递信号到队列头部的waiter
// skipframes是跟踪过程中要省略的帧数,从这里开始计算
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
这个几个函数具体的实现在 src/runtime/sema.go。
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0)
}
//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
func poll_runtime_Semacquire(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0)
}
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}
//go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease
func poll_runtime_Semrelease(addr *uint32) {
semrelease(addr)
}
3.1 获取信号量
semaphore
获取信号量操作步骤如下:
- 调用
runtime_SemacquireMutex
(比如sync.Mutex.Lock()
场景) sync_runtime_SemacquireMutex
semacquire1
CAS(addr, v, v-1)
状态成功就返回,失败继续往下- 缓存池拿一个
sudog
,或者new
一个sudog
(acquireSudog
) - 把
g
相关的数据存到sudog
中。 - 循环
- 对当前
semaRoot
加锁 nwait++
cansemacquire/CAS(addr, v, v-1)
sudog
加到semaRoot
的treap
中/root.queue()
- 可能要调整树的结构(左旋
rotateRight
/右旋rotateLeft
)防止树退化为链表 goparkunlock
让出当前g
的执行- 被唤醒
CAS
成功或者s.ticket != 0
(当前没有其他竞争者了) 认为成功- 否则继续循环
- 对当前
- 最后释放
sudog
/releaseSudog
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
gp := getg()
if gp != gp.m.curg {// 判断下g是不是当前m绑定的g
throw("semacquire not on the G stack")
}
// CAS(addr, v, v-1) 成功就直接成功否则一直循环,如果 *addr = 0 返回 false 走下面 slowpath
if cansemacquire(addr) {
return
}
// 走到这里表示当前g要阻塞
// 下面逻辑,就是把g封装成sudog,然后存到semTable中。
// 最后调用 gopark 让出当前g
s := acquireSudog() // 这个先去 p.sudogcache 拿,没拿到去全局sudohgcache拿
root := semroot(addr) // 根据sema的地址,算出用到semTable中哪个semaRoot
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lockWithRank(&root.lock, lockRankRoot) // 加锁,方面下面修改 semaRoot的属性
// 对等待的计数加1,这样sema_release时候不会走快路径
atomic.Xadd(&root.nwait, 1)
// 看下是否有其他的goroutine调用了sema_release
// 在尝试 CAS(addr, v, v-1) 试下
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// 这里,就是这个新的 sudog 加到 semaTable中的
root.queue(addr, s, lifo)
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes) // 这你会让出当前的goroutine
// goroutine 被调度回来了,表示有 sema_release 以后唤醒了这个 sema
// s.ticket != 0 表示是等待队列头部的 sudog,当前队列只有一个sudog了,所以直接结束
// CAS(addr, v, v-1) 成功也结束
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3+skipframes)
}
releaseSudog(s) // 释放 sudog
}
func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}
}
func acquireSudog() *sudog {
// 设置禁止抢占
mp := acquirem()
pp := mp.p.ptr()
//当前本地sudog缓存没有了,则去全局缓存中拉取一批
if len(pp.sudogcache) == 0 {
lock(&sched.sudoglock)
// 首先尝试从全局缓存中获取sudog,直到本地容量达到50%
for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {
s := sched.sudogcache
sched.sudogcache = s.next
s.next = nil
pp.sudogcache = append(pp.sudogcache, s)
}
unlock(&sched.sudoglock)
// 如果全局缓存为空,则分配创建一个新的sudog
if len(pp.sudogcache) == 0 {
pp.sudogcache = append(pp.sudogcache, new(sudog))
}
}
n := len(pp.sudogcache)
s := pp.sudogcache[n-1]
pp.sudogcache[n-1] = nil
pp.sudogcache = pp.sudogcache[:n-1]
if s.elem != nil {
throw("acquireSudog: found s.elem != nil in cache")
}
//解除抢占限制
releasem(mp)
return s
}
3.2 释放信号量
semaphore
释放信号量操作步骤如下:
- 调用
runtime_Semrelease
,比如sync.Mutex.Unlock()
场景。 sync_runtime_Semrelease
semrelease1
- 原子
*addr++
nwait=0
,表示没有阻塞在这个信号量上的g
直接返回。- 有阻塞的
g
在semTable
中找到对应的semaRoot
,然后对
semaRoot`加锁。 - 再次
check
下nwait=0
,等于0
直接返回。 - 拿到
sema
的addres
在semTable
中对应的队列头部的seamRoot
。 dequeue
是否需要调整左旋rotateLeft
或者右旋rotateRight
调整树结构。readyWithTime
,调用goread
唤醒sudog
绑定的g
。goyield
func semrelease1(addr *uint32, handoff bool, skipframes int) {
root := semroot(addr)
atomic.Xadd(addr, 1)
// 没有等待者直接返回
if atomic.Load(&root.nwait) == 0 {
return
}
//查找一个等待着并唤醒它
lockWithRank(&root.lock, lockRankRoot)
if atomic.Load(&root.nwait) == 0 {
//计数已经被其他goroutine消费,所以不需要唤醒其他goroutine
unlock(&root.lock)
return
}
s, t0 := root.dequeue(addr) //查找第一个出现的addr
if s != nil {
atomic.Xadd(&root.nwait, -1)
}
unlock(&root.lock)
if s != nil { // 可能比较慢 甚至被挂起所以先unlock
acquiretime := s.acquiretime
if acquiretime != 0 {
mutexevent(t0-acquiretime, 3+skipframes)
}
if s.ticket != 0 {
throw("corrupted semaphore ticket")
}
if handoff && cansemacquire(addr) {
s.ticket = 1
}
//goready(s.g,5)标记runnable 等待被重新调度
readyWithTime(s, 5+skipframes)
if s.ticket == 1 && getg().m.locks == 0 {
// 直接切换G
// readyWithTime已经将等待的G作为runnext放到当前的P
// 我们现在调用调度器可以立即执行等待的G
// 注意waiter继承了我们的时间片:这是希望避免在P上无限得进行激烈的信号量竞争
// goyield类似于Gosched,但是它是发送“被强占”的跟踪事件,更重要的是,将当前G放在本地runq
// 而不是全局队列。
// 我们仅在饥饿状态下执行此操作(handoff=true),因为非饥饿状态下,当我们yielding/scheduling时,
// 其他waiter可能会获得信号量,这将是浪费的。我们等待进入饥饿状体,然后开始进行ticket和P的手递手交接
// See issue 33747 for discussion.
// https://go-review.googlesource.com/c/go/+/206180
goyield()
}
}
}
四、总结
获取信号量操作主要尝试把sema
地址CAS
方式原子减1
,成就直接返回,失败以后会把当前g
打包成sudog
然后保存到semTable
,然后调用gopark
让出当前的goroutine
。
释放信号量操作就是吧sema
地址加1
,然后看有没有等待中的g
,没有直接返回,有的话去semaTable
的等待队列取出然后调用goready
唤醒对应的g
。
主要理解semaTable
里面存储sudog
的方式就好了。