Posted 0001-01-01 00:00 +0800 by ZhangJie ‐ 6 min read

分享:  

终于来到了go语言相关的设计实现,go中sync.Mutex的设计有很多设计方面的考虑。

我们看下对应的加锁解锁部分,对应的源码 see https://sourcegraph.com/github.com/golang/go/-/blob/src/sync/mutex.go#L72

首先,了解下锁的定义,我们看到里面有个state字段,这个字段表示的是锁的状态,为0表示锁是解锁状态,其他状态可以参考下源码中的定义。

// A Mutex is a mutual exclusion lcok.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutext struct {
    state	int32
    sema	uint32
}

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
    mutexWaiterShift = iota
)

下面看下加解锁实现,多注意下state相关的逻辑,比较容易好理解。

mutex.Lock()

fastpath

首先,会执行fastpath,会尝试CAS加锁一次,如果没有很多锁竞争,且锁处于未加锁状态(state=0),大概率会加锁成功(state=1)成功返回。

如果fastpath加锁失败了,比如尝试加锁前state != 0:

  • state可能为1

    此时,表示锁已经被锁定,新goroutine尝试加锁请求失败,这种很好理解;

  • state != 0,但也不是1(mutexLocked)

    这种情况就比较特殊了,涉及到go的一些锁优化,拿个例子来说一下。比如state有可能为4(mutexStarving),即它确实处于解锁状态(state&mutexLocked=0),但却处于starvation模式下,这说明之前有尝试加锁的goroutine很久没有拿到锁了,所以将当前锁的模式从normal修改为了starvation。

    为了避免调度延迟过大,go会优先受理部分goroutine的加锁请求,所以,这种情况新加入抢锁的goroutine也是不能拿到锁的。

OK,有了这个大致的了解之后,我们继续看加锁失败后的处理路径,继续执行slowpath。

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow()
}

slowpath

这部分就有很多优化措施了,感兴趣的可以阅读这里的源码,我们先尝试总结下。

see https://sourcegraph.com/github.com/golang/go/-/blob/src/sync/mutex.go#L84

func (m *Mutex) lockSlow() {
	...
	old := m.state
	for {
		// Don't spin in starvation mode, ownership is handed off to waiters
		// so we won't be able to acquire the mutex anyway.
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// Active spinning makes sense.
			// Try to set mutexWoken flag to inform Unlock
			// to not wake other blocked goroutines.
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}
        ...
    }
    ...
}

加锁失败后,失败原因有多种,要么是锁已经被锁定了,要么是处于饥饿模式。对应的处理方式也不一样,所以开头先判断下。

如果old&(mutexLocked|mutexStarvig) == mutexLocked为true,则表示之前加锁的失败原因是,锁已经被锁定了。那怎么办呢?难道要让goroutine立即去睡觉觉?goroutine睡着后再被唤醒参与调度这个开销和线程比是小,但是还是有的嘛,能不能再尝试几次,避免过早睡眠?当然可以。

那就让当前goroutine自旋+重新加锁几次试试,就是这里的runtime_canSpin(iter)来控制能否自旋了。

// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
	// sync.Mutex is cooperative, so we are conservative with spinning.
	// Spin only few times and only if running on a multicore machine and
	// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
	// As opposed to runtime mutex we don't do passive spinning here,
	// because there can be work on global runq or on other Ps.
	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

最多自旋4次,当然还有其他要求,就是必须运行在多核机器上,并且GOMAXPROCS>1,并且至少有另外一个正在运行的P且其runq为空。大家可以想一下为什么?如果不这么限制,那谁来释放锁呢,当前goroutine大概率自旋无效,也优化不了什么。

这些条件满足时,检查当前g运行的P上runq是否为空,如果为空才允许自旋,为什么?会影响到runq中的goroutine的调度执行吧。

最后再看lockSlow中的代码逻辑:

func (m *Mutex) lockSlow() {
	...
	old := m.state
	for {
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// Active spinning makes sense.
			// Try to set mutexWoken flag to inform Unlock
			// to not wake other blocked goroutines.
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}
        ...
    }
    ...
}

大家看刚开加锁失败后awoke=false, 并且假定old=mutexLocked,old»mutexWaiterShift这个写法,让人猜测m.state中还存储了waiter相关的信息,然后尝试将m.state设置上mutexWoken,awoke=true,没看出这是在干啥。

然后runtime_doSpin开始空转CPU,可以理解成一个for循环从30减到0,结束。这么做无非就是想等其他goroutine把锁释放掉。

const (
    active_spin_cnt = 30
)

//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
	procyield(active_spin_cnt)
}

func procyield(cycles uint32)

TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX
again:
	PAUSE
	SUBL	$1, AX
	JNZ	again
	RET

然后iter++,表示自旋次数+1(最多4次),更新锁状态,注意此时mutexWoken设置了,现在可以猜测下mutexWoken表示啥了,它表示的是mutex有没有唤醒协程来抢锁。

自旋之后continue,进入循环体的下一次循环,继续检查锁的状态:

  • 如果锁依旧被锁定,且当前可以继续自旋,则继续自旋;
  • 如果锁依旧被锁定,且当前超过了自旋次数,则执行下面的逻辑;
  • 如果锁被解锁了,则也执行下面的逻辑;

下面的new表示当前代码执行完后锁的状态,有这么几种情况:

  • 锁还没被释放,锁必然处于锁定状态,new&mutexLocked==1;
  • 锁已经被释放,锁如果处于normal模式,当前goroutine必抢锁成功,所以new|=mutexLocked也很好理解;
  • 锁已经被释放,锁如果处于starvation模式,当前goroutine抢锁失败,入队等待,但是这个锁将直接递交给等待队列中的第一个waiter,不用这个waiter被唤醒后抢锁,所以其new|=mutexLocked没什么疑问了;

继续看waiter标志位相关的设置:检查old状态,如果仍是锁定或者饿死状态,则直接将new中设置mutexWaiter标记。干嘛用的,表示有waiter在等待锁释放啊。

继续看starvation标志位相关的设置:如果发现锁之前的状态就是饥饿模式了,并且没有被锁定,那么锁的最新状态还是饥饿模式(new|=mutexStarving岂不是多余?)

func (m *Mutex) lockSlow() {
	...
	old := m.state
	for {
		new := old
		// Don't try to acquire starving mutex, new arriving goroutines must queue.
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		// The current goroutine switches mutex to starvation mode.
		// But if the mutex is currently unlocked, don't do the switch.
		// Unlock expects that starving mutex has waiters, which will not
		// be true in this case.
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
        ...
    }
    ....
}

继续看,如果当前mutex已经有被唤醒的goroutine尝试抢锁,那么new里面mutexWoken应该为1,如果为0是一种不一致状态,报错。然后从new中清除这一标记位,也需mutexwoken代表的就是当前goroutine吧,一次唤醒一个嘛。

func (m *Mutex) lockSlow() {
	...
	old := m.state
	for {
		new := old
		...
        
		if awoke {
			// The goroutine has been woken from sleep,
			// so we need to reset the flag in either case.
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			new &^= mutexWoken
		}
        ...
     }
     ...
}

继续看,CAS更新下锁状态m.state,注意此时new里面设置了locked(starvation可能有也可能没有)。

继续检查,如果之前锁状态不是锁定状态、不是饥饿状态,那么现在肯定就是锁定成功了,退出循环结束加锁过程。

如果发现waitStartTime不为0,说明之前已经有几轮循环来尝试过获得锁了,现在要算一下当前这次加锁操作总共等待了多久了。

func (m *Mutex) lockSlow() {
	var waitStartTime int64
    ...
	old := m.state
	for {
		new := old
		...

		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
			// If we were already waiting before, queue at the front of the queue.
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
            ... 
        }
        ...
    }
    ...
}

现在来算一下当前goroutine加锁等待了多久了,这个时间很好算,直接拿当前时间减去第第一次开始的时间就算出来了,runtime_nanotime()-waitStartTime(),并且发现,如果这个时间超过阈值1ms,就会将starving设为true,意味着mutex将被设置为饥饿模式,当然如果以前就是饥饿模式,现在肯定也是饥饿模式了。

另外注意queueLifo的值,如果是新抢锁的goroutine,那么为false,调用runtime_Semacquire时会将该goroutine假如到队列的末尾排队,如果是之前唤醒过的goroutine,则会将其添加到队列的对首,如果锁变成了饥饿模式且被释放了,则直接交给对首的goroutine执行。

这个函数runtime_semacquiremutex,还挺关键的:

信号量semaroot好理解,它是地址addr处对应的信号量,本身内部维护了一个等待信号量的sudog(等待执行的g)列表,对mutex而言就是等待抢锁的goroutines/waiters列表。

mutex里面为什么要加一个字段sema,其实就是为了间接具备这样一种能力,维护一个&sema的信号量,维护一个因为抢锁而阻塞的goroutine列表,以方便在锁被释放的时候,再把它们唤醒。

看下这个函数是怎么实现的吧,它调用了semacquire1这个函数,这个函数内部就是获得&addr处对应的信号量,然后尝试对semaroot.lock加锁,这个锁是一个runtime.mutex,

我擦,我发现这个函数里面调用了semacquire1(…),这个函数里面调用了lockWithRank,lockWithRank调用了lock2(),lock2内部使用了Linux系统调用futex这个能够把线程挂起的重量级锁!

打脸,前面总结说sync.Mutex没有使用futex,哇擦!

m.sema这个变量表示的是一个信号量,但是这个信号量是用来通知啥的呢?这个信号量对应的有一个等待队列的,如果lifo为true,则表示将当前的goroutine对应的后续sudog放入队列的头部,这样方便饿死模式下直接将mutex的拥有权交给这个对首的waiter,避免其等锁等太久。

  • 如果拿到了这个信号量就立即返回了;
  • 如果拿不到这个信号量就要做后面的处理了;

可能有多个并发的goroutine来抢锁的情况,可能之前已经有没抢到阻塞的goroutine了,这里先找一个阻塞的goroutine,获得信号量地址对应的一个数据结构,记为semaroot,这个维护了mutex上因为抢锁失败而等待的goroutine(waiters)队列。

对这个semaroot.lock的加锁操作,用了自旋、CAS、futex,为啥用futex呢?这不相当于sync.Mutex间接用了futex嘛!那如果加锁失败不是直接阻塞线程了吗!

  • 这种情况下好的情况是几个线程自旋一下抢到sync.Mutex.sema,大概率会成功,但是锁竞争严重的时候还是会失败,怎么办总不能一直自旋不干活啊,注意了,这里调用了
func (m *Mutex) lockSlow() {
	...
	old := m.state
	for {
		new := old
		...

		if atomic.CompareAndSwapInt32(&m.state, old, new) {
            ...
			queueLifo := waitStartTime != 0
           	...
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			if old&mutexStarving != 0 {
				// If this goroutine was woken and mutex is in starvation mode,
				// ownership was handed off to us but mutex is in somewhat
				// inconsistent state: mutexLocked is not set and we are still
				// accounted as waiter. Fix that.
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					// Exit starvation mode.
					// Critical to do it here and consider wait time.
					// Starvation mode is so inefficient, that two goroutines
					// can go lock-step infinitely once they switch mutex
					// to starvation mode.
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}

	...
}

sync.Mutex里面的锁实现,不使用futex,即当Lock失败时,不会导致调用的进程线程被阻塞,而只是将当前goroutine阻塞,go runtime scheduler仍然可以在当前线程上调度执行其他的goroutine,等锁被Unlock时,就有机会再唤醒之前Lock失败的goroutine执行。

另外,go sync.Mutex做了很多优化,大致总结一下。sync.Mutex有两种工作模式:normal mode 和 starvation mode,两种模式对执行Lock、Unlock的goroutine会产生不同的影响。

  • normal mode

    该模式下,waiters(goroutines)会按照加锁申请进入一个FIFO的队列,一个被唤醒的waiter不一定能够立即持有锁,它要和所有新的发起加锁请求的goroutines竞争。新到达的goroutines通常有一个优势——它们已经在CPU上运行了,并且有很多,所以一个刚被唤醒的waiter大概率会竞争锁失败。

    这种情况下,这个失败的waiter会被加入到这个FIFO队列的对首,如果一个waiter竞争锁超过1ms还没有成功,就会将mutex从normal mode切换为startvation mode。

  • starvation mode

    该模式下,当一个goroutine释放锁时,锁的拥有者立即从该goroutine转交给对首的waiter。新到达的goroutines不会尝试获得锁,尽管它能观察到锁好像被释放掉了。这种模式下,新到达的goroutines会追加到FIFO的队列的末尾。

当一个waiter收到一个mutex的拥有者权限时,它会检查,如果:1)它是这个锁竞争等待队列中的最后一个waiter;或者 2)它的加锁等待时间小于1ms,此时将把mutex从starvation mode切换为normal mode。

mutex.Unlock()

fastpath

先尝试CAS去掉加锁标志位,其实返回的是锁的新状态,如果之前状态是locked,现在unlocked去掉了这个标志位,如果新状态state==0,表示没什么其他要处理的了,直接返回就可以了,反之,则说明锁可能被设置了其他的状态,如前面提到的锁的normal、starvation mode,还需要进入slowpath进一步处理。

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

slowpath

锁的状态不只是有持有、未持有这几种,那看来这里是要处理其他几种锁状态的情况了。

const (
	mutexLocked = 1 << iota // mutex is locked
	mutexWoken
	mutexStarving
	mutexWaiterShift = iota
)

继续看源码,首先做个检查,当前goroutine调用Unlock之前是否有持有这把锁,很好比较,只要检查 (new+mutexLocked) & mutexLocked 下便知道。

QA:如果当前goroutine没有持有过锁,前面fastpath中却去掉了锁标志位,走到这里检查发现之前没有持有锁,这是很严重的问题,直接throw了,也没什么善后的,直接退出啦,程序员自己抓紧改bug吧,继续跑也会造成bug!

然后这里面主要是这个函数runtime_Semrelease:

  • 如果是正常模式,这个函数从对头取出一个sudog(等待锁的goroutine),然后将其丢入runq等待被调度;
  • 如果是饥饿模式,这个函数从头取出一个sudog(等待锁的goroutine),然后将其丢入runq,并立即让出CPU,相当于让这个sudog尽可能快地被执行到。
func (m *Mutex) unlockSlow(new int32) {
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
	if new&mutexStarving == 0 {
		old := new
		for {
			// If there are no waiters or a goroutine has already
			// been woken or grabbed the lock, no need to wake anyone.
			// In starvation mode ownership is directly handed off from unlocking
			// goroutine to the next waiter. We are not part of this chain,
			// since we did not observe mutexStarving when we unlocked the mutex above.
			// So get off the way.
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// Grab the right to wake someone.
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// Starving mode: handoff mutex ownership to the next waiter, and yield
		// our time slice so that the next waiter can start to run immediately.
		// Note: mutexLocked is not set, the waiter will set it after wakeup.
		// But mutex is still considered locked if mutexStarving is set,
		// so new coming goroutines won't acquire it.
		runtime_Semrelease(&m.sema, true, 1)
	}
}