一、背景

sync.Mutex里面用了runtime_SemacquireMutexruntime_Semrelease,所以看下这个runtime的信号量是如何实现的。

二、基础知识

2.1 信号量

信号量(英语:semaphore)又称为信号标,是一个同步对象,用于保持在0至指定最大值之间的一个计数值。当线程完成一次对该semaphore对象的等待(wait)时,该计数值减一;当线程完成一次对semaphore对象的释放(release)时,计数值加一。当计数值为0,则线程等待该semaphore对象不再能成功直至该semaphore对象变成signaled状态。semaphore对象的计数值大于0,为signaled状态;计数值等于0,为nonsignaled状态。

信号量的概念是由荷兰计算机科学家艾兹赫尔·戴克斯特拉(Edsger W. Dijkstra)发明的,广泛的应用于不同的操作系统中。在系统中,给予每一个进程一个信号量,代表每个进程目前的状态,未得到控制权的进程会在特定地方被强迫停下来,等待可以继续进行的信号到来。如果信号量是一个任意的整数,通常被称为计数信号量(Counting semaphore),或一般信号量(general semaphore);如果信号量只有二进制的01,称为二进制信号量(binary semaphore)。

计数信号量具备两种操作动作,称为Vsignal())与Pwait())(即部分参考书常称的PV操作)。V操作会增加信号标S的数值,P操作会减少它。

  • P原语P是荷兰语Proberen(测试)的首字母。为阻塞原语,负责把当前进程由运行状态转换为阻塞状态,直到另外一个进程唤醒它。操作为:申请一个空闲资源(把信号量减1),若成功,则退出;若失败,则该进程被阻塞;
  • V原语V是荷兰语Verhogen(增加)的首字母。为唤醒原语,负责把一个被阻塞的进程唤醒,它有一个参数表,存放着等待被唤醒的进程信息。操作为:释放一个被占用的资源(把信号量加1),如果发现有被阻塞的进程,则选择一个唤醒之。

Semaphore - wiki

2.2 Treap

TreapBinary Search Tree+Heap的组合。

二叉查找树(Binary Search Tree),它或者是一棵空树,或者是具有下列性质的二叉树:若它的左子树不空,则左子树上所有结点的值均小于它的根结点的值;若它的右子树不空,则右子树上所有结点的值均大于它的根结点的值; 它的左、右子树也分别为二叉排序树。

堆有大顶堆小顶堆

  • 大顶堆:每个节点的值都大于或者等于他的左右孩子节点的值。
  • 小顶堆:每个结点的值都小于或等于其左孩子和右孩子结点的值。

Treap既是一棵二叉查找树,也是一个二叉堆。但是这两种数据结构貌是矛盾的存在,如果是二叉查找树,就不能是一个堆,如果是一个堆,那么必然不是二叉查找树。

所以Treap用了一个很巧妙的方式解决这个问题:给每个键值一个随机附加的优先级,让键值满足二叉查找树的结构,让优先级满足二叉堆的结构

Treap它的最大优点就是实现简单,没有太多复杂的操作,但是我们前面也说了,它是通过随机的priority来控制树的平衡的,那么它显然无法做到完美平衡,只能做到不落入最坏的情况

image.png

Treap——堆和二叉树的完美结合,性价比极值的搜索树

2.3 x/sync/semaphore

GoX-Repositories 提供了一种带权重的信号量实现方式 sync.semaphore。这个跟runtime.semaphore其实没太大关系。主要是提供了个high-level的信号量给Go开发者使用。实现方式如下:

type Weighted struct {
    size    int64 // 资源的总数,Acquire(n) 的时候会消耗这个资源,
    cur     int64 // 当前已申请资源数,Acquire(n)成功的话,cur=cur+n
    mu      sync.Mutex // 互斥锁,所有Acquire、Release 都要加锁
    waiters list.List // 阻塞的队列
}

type waiter struct {
    n     int64
    ready chan<- struct{} // 使用 channle 来通信
}
    

Weighted就是一个权重的信号量,主要提供Acquire(n)Release(n)两个操作。实现逻辑比较简单。

Acquire(n)申请n个资源,申请成功的话会设置cur=cur+n,如果没有资源可以申请了,会new一个waiter,然后把这个waiter加到waiters这个等待的队列中,并阻塞在waiter.ready的读上面。

Release(n)释放n个资源,然后设置cur=cur-n,在waiters这个等待的队列中,循环取出取waiters头部的waiter(直到s.size-s.cur < w.n终止),调用close(waiter.ready),这样阻塞在waiter.ready读上的goroutine会被唤醒。

下面写了个简单Demo

func main() {
    var (
        wg  sync.WaitGroup
        ctx = context.Background()
    )

    cpuNum := runtime.GOMAXPROCS(0)
    sem := semaphore.NewWeighted(int64(cpuNum)) // 设置 goroutine 最大并发数 = cpuNum
    for i := 0; i < 100; i++ {
        wg.Add(1)
        err := sem.Acquire(ctx, 1)
        if err != nil {
            panic(err)
        }

        go func(i int) {
            fmt.Println("ng: ", runtime.NumGoroutine(), " i = ", i)
            sem.Release(1)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

2.4 sudog

sudog表示为了一个等待队列中的goroutine,比如因为读写channel阻塞,或者Lock导致阻塞(Lock底层其实就是semaphore)等的goroutine,由于这些阻塞的goroutine不确定什么时候能被调度(取消阻塞比如unlock、读写channel),所以这种阻塞的goroutine不适合一直放在p的本地运行队列中,这个时候会把阻塞的gorutine打包成一个sudog里面会有一些这个g运行的上下文。然后存到另外一个地方,比如channel是存在 recvqsendq 中,而阻塞在信号量上的goroutine是存在 semTable 中。

具体sudog结构体如下

// sudog 代表一个处于等待队列的g,比如阻塞在读写 channle上 的 goroutine
// sduog 和 g 是多对多的关系,一个 g 可以在多个 wait lists 上,所以一个 g 可以对应多个 sduog
// sudogs 会有两级缓存,优先去p的sudogcache取,取不到则去全局的sudogcache取一批(直到本地容量达到50%)
// sudogs 通过 acquireSudog 和 releaseSudog 去申请或释放
type sudog struct {
    // 在channel的场景中sudog中所有字段都受hchan.lock保护
    g *g

    next *sudog // 双向链表,指向下一个 sduog
    prev *sudog // 双向链表,指向上一个 sduog
    
    // channel场景存的是,读写的数据。
    // semaphore 场景存的是信号量的地址。
    elem unsafe.Pointer 

    // 下面这些字段不会被并发访问
    // For channels, waitlink is only accessed by g.
    // For semaphores, 所有字段需要拿到semaRoot的lock才能访问

    acquiretime int64 // semaphore 场景使用的,记录获取信号量时间
    releasetime int64 // 释放时间
    ticket      uint32 // treap 里面堆用的随机的权重

    // 只在select 场景使用的字段,表明当前g是否被选中,然后唤醒
    isSelect bool

    // 只在channel场景使用的
    success bool

    // 只在 semaphore 场景使用
    parent   *sudog // semaRoot binary tree
    waitlink *sudog // semaRoot 节点对应的等待列表
    waittail *sudog // semaRoot 等待列表的尾部节点
    c        *hchan // 只在channel场景使用的,关联的channel
}

sodug结构体看出来,sudog里面的字段分别跟channelsemaphoreselect几个场景有关。某些字段只有在特点场景才会用到,感觉全部都耦合在一个Struct不够优雅

sudog on channel

2.5 semtable

semTable 是一个长度为251的全局数组,每个semaRoot指向一个treap,主要用于存放阻塞在信号量(semaphore)上的sudog

var semtable [semTabSize]struct {
    root semaRoot
    pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte // 防止 flase sharing
}

semaRoot最早是双向链表,在某些场景下性能比较查,所以优化成了treap,具体可以看 CR37103

优化之后semtable存储的结构大概是这样:

image.png

2.6 mutex

runtime包里面的锁都是使用的runtime内部实现的mutex,具体是使用CAS+futex来实现的。更多详见Go源码——runtime.mutex

三、semaphore 源码分析

semaphore基本操作,在 src/sync/runtime.go 定义了下面几个方法:

// Semacquire等待*s > 0,然后原子递减它。
// 它是一个简单的睡眠原语,用于同步库使用。
func runtime_Semacquire(s *uint32)

// SemacquireMutex类似于Semacquire,用来阻塞互斥的对象
// 如果lifo为true,waiter将会被插入到队列的头部
// skipframes是跟踪过程中要省略的帧数
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)

// Semrelease会自动增加*s并通知一个被Semacquire阻塞的等待的goroutine
// 它是一个简单的唤醒原语,用于同步库
// 如果handoff为true, 传递信号到队列头部的waiter
// skipframes是跟踪过程中要省略的帧数,从这里开始计算
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

这个几个函数具体的实现在 src/runtime/sema.go

//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
    semacquire1(addr, false, semaBlockProfile, 0)
}

//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
func poll_runtime_Semacquire(addr *uint32) {
    semacquire1(addr, false, semaBlockProfile, 0)
}

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
    semrelease1(addr, handoff, skipframes)
}

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
    semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}

//go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease
func poll_runtime_Semrelease(addr *uint32) {
    semrelease(addr)
}

3.1 获取信号量

semaphore获取信号量操作步骤如下:

  1. 调用runtime_SemacquireMutex (比如sync.Mutex.Lock()场景)
  2. sync_runtime_SemacquireMutex
  3. semacquire1
  4. CAS(addr, v, v-1)状态成功就返回,失败继续往下
  5. 缓存池拿一个sudog,或者new一个sudogacquireSudog
  6. g相关的数据存到sudog中。
  7. 循环
    • 对当前semaRoot加锁
    • nwait++
    • cansemacquire/CAS(addr, v, v-1)
    • sudog加到semaRoottreap中/root.queue()
    • 可能要调整树的结构(左旋rotateRight/右旋rotateLeft)防止树退化为链表
    • goparkunlock让出当前g的执行
    • 被唤醒
    • CAS成功或者s.ticket != 0(当前没有其他竞争者了) 认为成功
    • 否则继续循环
  8. 最后释放sudog/releaseSudog

具体源码如下

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
    gp := getg()
    if gp != gp.m.curg {// 判断下g是不是当前m绑定的g
        throw("semacquire not on the G stack")
    }

    // CAS(addr, v, v-1) 成功就直接成功否则一直循环,如果 *addr = 0 返回 false 走下面 slowpath
    if cansemacquire(addr) {
        return
    }
    
    // 走到这里表示当前g要阻塞
    // 下面逻辑,就是把g封装成sudog,然后存到semTable中。
    // 最后调用 gopark 让出当前g


    s := acquireSudog() // 这个先去 p.sudogcache 拿,没拿到去全局sudohgcache拿
    root := semroot(addr) // 根据sema的地址,算出用到semTable中哪个semaRoot
    t0 := int64(0)
    s.releasetime = 0
    s.acquiretime = 0
    s.ticket = 0
    if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
        t0 = cputicks()
        s.releasetime = -1
    }
    if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
        if t0 == 0 {
            t0 = cputicks()
        }
        s.acquiretime = t0
    }
    for {
        lockWithRank(&root.lock, lockRankRoot) // 加锁,方面下面修改 semaRoot的属性
        // 对等待的计数加1,这样sema_release时候不会走快路径
        atomic.Xadd(&root.nwait, 1)
        // 看下是否有其他的goroutine调用了sema_release
        // 在尝试 CAS(addr, v, v-1) 试下
        if cansemacquire(addr) {
            atomic.Xadd(&root.nwait, -1)
            unlock(&root.lock)
            break
        }
        
        // 这里,就是这个新的 sudog 加到 semaTable中的
        root.queue(addr, s, lifo)
        goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes) // 这你会让出当前的goroutine
        
        
        // goroutine 被调度回来了,表示有 sema_release 以后唤醒了这个 sema
        // s.ticket != 0 表示是等待队列头部的 sudog,当前队列只有一个sudog了,所以直接结束
        // CAS(addr, v, v-1) 成功也结束
        if s.ticket != 0 || cansemacquire(addr) {
            break
        }
    }
    if s.releasetime > 0 {
        blockevent(s.releasetime-t0, 3+skipframes)
    }
    releaseSudog(s) // 释放 sudog
}


func cansemacquire(addr *uint32) bool {
    for {
        v := atomic.Load(addr)
        if v == 0 {
            return false
        }
        if atomic.Cas(addr, v, v-1) {
            return true
        }
    }
}

func acquireSudog() *sudog {
    // 设置禁止抢占
    mp := acquirem()
    pp := mp.p.ptr()
    //当前本地sudog缓存没有了,则去全局缓存中拉取一批
    if len(pp.sudogcache) == 0 {
        lock(&sched.sudoglock)
        // 首先尝试从全局缓存中获取sudog,直到本地容量达到50%
        for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {
            s := sched.sudogcache
            sched.sudogcache = s.next
            s.next = nil
            pp.sudogcache = append(pp.sudogcache, s)
        }
        unlock(&sched.sudoglock)
        // 如果全局缓存为空,则分配创建一个新的sudog
        if len(pp.sudogcache) == 0 {
            pp.sudogcache = append(pp.sudogcache, new(sudog))
        }
    }
    n := len(pp.sudogcache)
    s := pp.sudogcache[n-1]
    pp.sudogcache[n-1] = nil
    pp.sudogcache = pp.sudogcache[:n-1]
    if s.elem != nil {
        throw("acquireSudog: found s.elem != nil in cache")
    }
    //解除抢占限制
    releasem(mp)
    return s
}

3.2 释放信号量

semaphore释放信号量操作步骤如下:

  1. 调用runtime_Semrelease,比如sync.Mutex.Unlock()场景。
  2. sync_runtime_Semrelease
  3. semrelease1
  4. 原子*addr++
  5. nwait=0,表示没有阻塞在这个信号量上的g直接返回。
  6. 有阻塞的gsemTable中找到对应的semaRoot,然后semaRoot`加锁。
  7. 再次checknwait=0,等于0直接返回。
  8. 拿到semaaddressemTable中对应的队列头部的seamRoot
  9. dequeue是否需要调整左旋rotateLeft或者右旋rotateRight调整树结构。
  10. readyWithTime,调用goread唤醒sudog绑定的g
  11. goyield

semrelease源码如下:

func semrelease1(addr *uint32, handoff bool, skipframes int) {
    root := semroot(addr)
    atomic.Xadd(addr, 1)

    // 没有等待者直接返回
    if atomic.Load(&root.nwait) == 0 {
        return
    }

    //查找一个等待着并唤醒它
    lockWithRank(&root.lock, lockRankRoot)
    if atomic.Load(&root.nwait) == 0 {
        //计数已经被其他goroutine消费,所以不需要唤醒其他goroutine
        unlock(&root.lock)
        return
    }
    s, t0 := root.dequeue(addr) //查找第一个出现的addr
    if s != nil {
        atomic.Xadd(&root.nwait, -1)
    }
    unlock(&root.lock)
    if s != nil { // 可能比较慢 甚至被挂起所以先unlock
        acquiretime := s.acquiretime
        if acquiretime != 0 {
            mutexevent(t0-acquiretime, 3+skipframes)
        }
        if s.ticket != 0 {
            throw("corrupted semaphore ticket")
        }
        if handoff && cansemacquire(addr) {
            s.ticket = 1
        }
        //goready(s.g,5)标记runnable 等待被重新调度

        readyWithTime(s, 5+skipframes)
        if s.ticket == 1 && getg().m.locks == 0 {
            // 直接切换G
            // readyWithTime已经将等待的G作为runnext放到当前的P
            // 我们现在调用调度器可以立即执行等待的G
            // 注意waiter继承了我们的时间片:这是希望避免在P上无限得进行激烈的信号量竞争
            // goyield类似于Gosched,但是它是发送“被强占”的跟踪事件,更重要的是,将当前G放在本地runq
            // 而不是全局队列。
            // 我们仅在饥饿状态下执行此操作(handoff=true),因为非饥饿状态下,当我们yielding/scheduling时,
            // 其他waiter可能会获得信号量,这将是浪费的。我们等待进入饥饿状体,然后开始进行ticket和P的手递手交接
            // See issue 33747 for discussion.
            // https://go-review.googlesource.com/c/go/+/206180
            goyield()
        }
    }
}

四、总结

获取信号量操作主要尝试把sema地址CAS方式原子减1,成就直接返回,失败以后会把当前g打包成sudog然后保存到semTable,然后调用gopark让出当前的goroutine

释放信号量操作就是吧sema地址加1,然后看有没有等待中的g,没有直接返回,有的话去semaTable的等待队列取出然后调用goready唤醒对应的g

主要理解semaTable里面存储sudog的方式就好了。

参考资料

手摸手Go 并发编程基建Semaphore

一文读懂go中semaphore(信号量)源码