互斥锁的底层实现

参考解析

题目来源: 陌陌

答案:

互斥锁是并发程序中对共享资源进行访问控制的主要手段,Mutex是go语言提供的简单易用的互斥锁。Mutex的结构很简单,暴露的方法也只有2个,一个加锁 一个解锁。那么我们每天用的Mutex互斥锁是如何实现的呢?

  1. type Mutex struct {
  2. // 状态码
  3. state int32
  4. // 信号量,用于向处于 Gwaitting 的 G 发送信号
  5. sema uint32
  6. }
  7. const(
  8. // 值=1 表示是否锁住 1=锁 0=未锁
  9. mutexLocked = 1 << iota // mutex is locked
  10. // 值=2 表示是否被唤醒 1=唤醒 0=未唤醒
  11. mutexWoken
  12. // 是否为饥渴模式(等待超过1秒则为饥渴模式)
  13. mutexStarving
  14. // 右移3位,为等待的数量
  15. mutexWaiterShift = iota
  16. // 饥饿模式的时间
  17. starvationThresholdNs = 1e6
  18. )

加锁操作

  1. func (m *Mutex) Lock() {
  2. // 利用atomic包中的cas操作判断是否上锁
  3. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  4. // 判断是否启用了race检测
  5. if race.Enabled {
  6. race.Acquire(unsafe.Pointer(m))
  7. }
  8. // m.state = 1 直接返回 其他goroutine调用lock会发现已经被上锁
  9. return
  10. }
  11. // 等待时间
  12. var waitStartTime int64
  13. // 饥饿模式标志位
  14. starving := false
  15. // 唤醒标志位
  16. awoke := false
  17. // 自旋迭代的次数
  18. iter := 0
  19. // 保存 mutex 当前状态
  20. old := m.state
  21. // 循环
  22. for {
  23. // 判断 如果不是饥饿模式并且是否能够执行自旋函数(判断自旋次数)
  24. // old&(0001|0100) == 0001 ==> old&0101
  25. // 当old为0001为非饥饿模式 0001 == 0001 true 当old为0101饥饿模式 0101 == 0001 false
  26. // runtime_canSpin 判断自旋少于4次,并且是多核机器上并且GOMAXPROCS>1
  27. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
  28. // 判断条件:
  29. // 未被唤醒 && 等待数量不为0 && 使用CAS设置状态为已唤醒
  30. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
  31. atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
  32. // 设置激活为true
  33. awoke = true
  34. }
  35. // 自旋函数 自旋次数+1
  36. runtime_doSpin()
  37. iter++
  38. old = m.state
  39. continue
  40. }
  41. // 如果不能执行自旋函数 记录一个new状态 然后判断改变new 最终使用CAS替换尝试设置state属性
  42. new := old
  43. // 当前的mutex.state处于正常模式,则将new的锁位设置为1
  44. if old&mutexStarving == 0 {
  45. new |= mutexLocked
  46. }
  47. // 如果当前锁锁状态为锁定状态或者处于饥饿模式,则将等待的线程数量+1
  48. if old&(mutexLocked|mutexStarving) != 0 {
  49. new += 1 << mutexWaiterShift
  50. }
  51. // 如果starving变量为true并且处于锁定状态,则new的饥饿状态位打开
  52. if starving && old&mutexLocked != 0 {
  53. new |= mutexStarving
  54. }
  55. // 对于状态的验证
  56. if awoke {
  57. // The goroutine has been woken from sleep,
  58. // so we need to reset the flag in either case.
  59. if new&mutexWoken == 0 {
  60. throw("sync: inconsistent mutex state")
  61. }
  62. new &^= mutexWoken
  63. }
  64. // new已经判断设置完,如果mutex的state没有变动过的话 则替换成new
  65. if atomic.CompareAndSwapInt32(&m.state, old, new) {
  66. // 如果未被锁定并且并不是出于饥饿状态 退出循环 goroutine获取到锁
  67. if old&(mutexLocked|mutexStarving) == 0 {
  68. break // locked the mutex with CAS
  69. }
  70. // 如果当前的 goroutine 之前已经在排队了,就排到队列的前面。
  71. queueLifo := waitStartTime != 0
  72. if waitStartTime == 0 {
  73. waitStartTime = runtime_nanotime()
  74. }
  75. // 进入休眠状态,等待信号唤醒后重新开始循环 如果queueLifo为true,则将等待goroutine插入到队列的前面
  76. runtime_SemacquireMutex(&m.sema, queueLifo)
  77. // 计算等待时间 确定 mutex 当前所处模式
  78. // 此时这个goroutine已经被唤醒
  79. starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
  80. old = m.state
  81. // 判断被唤醒的goroutine是否为饥饿状态
  82. if old&mutexStarving != 0 {
  83. // If this goroutine was woken and mutex is in starvation mode,
  84. // ownership was handed off to us but mutex is in somewhat
  85. // inconsistent state: mutexLocked is not set and we are still
  86. // accounted as waiter. Fix that.
  87. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
  88. throw("sync: inconsistent mutex state")
  89. }
  90. delta := int32(mutexLocked - 1<<mutexWaiterShift)
  91. // 当不是饥饿状态或者等待数只有一个,则退出饥饿模式
  92. if !starving || old>>mutexWaiterShift == 1 {
  93. delta -= mutexStarving
  94. }
  95. atomic.AddInt32(&m.state, delta)
  96. break
  97. }
  98. // 如果不是饥饿模式 让新到来的 goroutine 先获取锁,继续循环
  99. awoke = true
  100. iter = 0
  101. } else {
  102. // 如果CAS替换未能成功 则继续循环
  103. old = m.state
  104. }
  105. }
  106. if race.Enabled {
  107. race.Acquire(unsafe.Pointer(m))
  108. }
  109. }

解锁

  1. func (m *Mutex) Unlock() {
  2. if race.Enabled {
  3. _ = m.state
  4. race.Release(unsafe.Pointer(m))
  5. }
  6. // 利用原子操作 设置state锁位置为0
  7. new := atomic.AddInt32(&m.state, -mutexLocked)
  8. // 判断状态,给未加锁的mutex解锁,抛出错误
  9. if (new+mutexLocked)&mutexLocked == 0 {
  10. throw("sync: unlock of unlocked mutex")
  11. }
  12. // 判断是否为饥饿模式
  13. if new&mutexStarving == 0 {
  14. // 正常状态
  15. old := new
  16. for {
  17. // 如果等待的goroutine为零 || 已经被锁定、唤醒、或者已经变成饥饿状态
  18. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
  19. return
  20. }
  21. // 更新new的值,减去等待数量
  22. new = (old - 1<<mutexWaiterShift) | mutexWoken
  23. // 使用CAS 替换旧值
  24. if atomic.CompareAndSwapInt32(&m.state, old, new) {
  25. // 如果替换成功 则恢复挂起的goroutine.r如果为 true表明将唤醒第一个阻塞的goroutine
  26. runtime_Semrelease(&m.sema, false)
  27. return
  28. }
  29. old = m.state
  30. }
  31. } else {
  32. // 恢复挂起的goroutine.r如果为 true表明将唤醒第一个阻塞的goroutine
  33. runtime_Semrelease(&m.sema, true)
  34. }
  35. }