看过管道的底层源码吗

题目来源: 答案:

  • Channel原理
    channel的底层是一个hchan的结构体 基于锁实现的
  1. type hchan struct {
  2. // chan 里元素数量
  3. qcount uint
  4. // chan 底层循环数组的长度
  5. dataqsiz uint
  6. // 指向底层循环数组的指针
  7. // 只针对有缓冲的 channel
  8. buf unsafe.Pointer
  9. // chan 中元素大小
  10. elemsize uint16
  11. // chan 是否被关闭的标志
  12. closed uint32
  13. // chan 中元素类型
  14. elemtype *_type // element type
  15. // 已发送元素在循环数组中的索引
  16. sendx uint // send index
  17. // 已接收元素在循环数组中的索引
  18. recvx uint // receive index
  19. // 等待接收的 goroutine 队列
  20. recvq waitq // list of recv waiters
  21. // 等待发送的 goroutine 队列
  22. sendq waitq // list of send waiters
  23. // 保护 hchan 中所有字段
  24. lock mutex
  25. }
  • Channel 发送
  1. 如果当前 Channel 的 recvq 上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前 Goroutine 并将其设置成下一个运行的 Goroutine;
  2. 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们会直接将数据存储到缓冲区 sendx 所在的位置上;
  3. 如果不满足上面的两种情况,会创建一个 runtime.sudog 结构并将其加入 Channel 的 sendq 队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;
  1. func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  2. // 如果 channel 是 nil
  3. if c == nil {
  4. // 不能阻塞,直接返回 false,表示未发送成功
  5. if !block {
  6. return false
  7. }
  8. // 当前 goroutine 被挂起
  9. gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
  10. throw("unreachable")
  11. }
  12. // 省略 debug 相关……
  13. // 对于不阻塞的 send,快速检测失败场景
  14. //
  15. // 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:
  16. // 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine
  17. // 2. channel 是缓冲型的,但循环数组已经装满了元素
  18. if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
  19. (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
  20. return false
  21. }
  22. var t0 int64
  23. if blockprofilerate > 0 {
  24. t0 = cputicks()
  25. }
  26. // 锁住 channel,并发安全
  27. lock(&c.lock)
  28. // 如果 channel 关闭了
  29. if c.closed != 0 {
  30. // 解锁
  31. unlock(&c.lock)
  32. // 直接 panic
  33. panic(plainError("send on closed channel"))
  34. }
  35. // 如果接收队列里有 goroutine,直接将要发送的数据拷贝到接收 goroutine
  36. if sg := c.recvq.dequeue(); sg != nil {
  37. send(c, sg, ep, func() { unlock(&c.lock) }, 3)
  38. return true
  39. }
  40. // 对于缓冲型的 channel,如果还有缓冲空间
  41. if c.qcount < c.dataqsiz {
  42. // qp 指向 buf 的 sendx 位置
  43. qp := chanbuf(c, c.sendx)
  44. // ……
  45. // 将数据从 ep 处拷贝到 qp
  46. typedmemmove(c.elemtype, qp, ep)
  47. // 发送游标值加 1
  48. c.sendx++
  49. // 如果发送游标值等于容量值,游标值归 0
  50. if c.sendx == c.dataqsiz {
  51. c.sendx = 0
  52. }
  53. // 缓冲区的元素数量加一
  54. c.qcount++
  55. // 解锁
  56. unlock(&c.lock)
  57. return true
  58. }
  59. // 如果不需要阻塞,则直接返回错误
  60. if !block {
  61. unlock(&c.lock)
  62. return false
  63. }
  64. // channel 满了,发送方会被阻塞。接下来会构造一个 sudog
  65. // 获取当前 goroutine 的指针
  66. gp := getg()
  67. mysg := acquireSudog()
  68. mysg.releasetime = 0
  69. if t0 != 0 {
  70. mysg.releasetime = -1
  71. }
  72. mysg.elem = ep
  73. mysg.waitlink = nil
  74. mysg.g = gp
  75. mysg.selectdone = nil
  76. mysg.c = c
  77. gp.waiting = mysg
  78. gp.param = nil
  79. // 当前 goroutine 进入发送等待队列
  80. c.sendq.enqueue(mysg)
  81. // 当前 goroutine 被挂起
  82. goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
  83. // 从这里开始被唤醒了(channel 有机会可以发送了)
  84. if mysg != gp.waiting {
  85. throw("G waiting list is corrupted")
  86. }
  87. gp.waiting = nil
  88. if gp.param == nil {
  89. if c.closed == 0 {
  90. throw("chansend: spurious wakeup")
  91. }
  92. // 被唤醒后,channel 关闭了。坑爹啊,panic
  93. panic(plainError("send on closed channel"))
  94. }
  95. gp.param = nil
  96. if mysg.releasetime > 0 {
  97. blockevent(mysg.releasetime-t0, 2)
  98. }
  99. // 去掉 mysg 上绑定的 channel
  100. mysg.c = nil
  101. releaseSudog(mysg)
  102. return true
  103. }
  • Channel 接收
  1. 如果Channel为空,那么会直接调用 runtime.gopark 挂起当前 Goroutine;
  2. 如果Channel已经关闭并且缓冲区没有任何数据,runtime.chanrecv 会直接返回;
  3. 如果Channel的sendq队列中存在挂起的Goroutine,会将recvx索引所在的数据拷贝到接收变量所在的内存空间上并将 sendq 队列中 Goroutine 的数据拷贝到缓冲区;
  4. 如果 Channel 的缓冲区中包含数据,那么直接读取 recvx 索引对应的数据;
  5. 在默认情况下会挂起当前的 Goroutine,将 runtime.sudog 结构加入 recvq 队列并陷入休眠等待调度器的唤醒;
  1. func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  2. // 省略 debug 内容 …………
  3. // 如果是一个 nil 的 channel
  4. if c == nil {
  5. // 如果不阻塞,直接返回 (false, false)
  6. if !block {
  7. return
  8. }
  9. // 否则,接收一个 nil 的 channel,goroutine 挂起
  10. gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
  11. // 不会执行到这里
  12. throw("unreachable")
  13. }
  14. // 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回
  15. // 当我们观察到 channel 没准备好接收:
  16. // 1. 非缓冲型,等待发送列队 sendq 里没有 goroutine 在等待
  17. // 2. 缓冲型,但 buf 里没有元素
  18. // 之后,又观察到 closed == 0,即 channel 未关闭。
  19. // 因为 channel 不可能被重复打开,所以前一个观测的时候 channel 也是未关闭的,
  20. // 因此在这种情况下可以直接宣布接收失败,返回 (false, false)
  21. if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
  22. c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
  23. atomic.Load(&c.closed) == 0 {
  24. return
  25. }
  26. var t0 int64
  27. if blockprofilerate > 0 {
  28. t0 = cputicks()
  29. }
  30. // 加锁
  31. lock(&c.lock)
  32. // channel 已关闭,并且循环数组 buf 里没有元素
  33. // 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
  34. // 也就是说即使是关闭状态,但在缓冲型的 channel,
  35. // buf 里有元素的情况下还能接收到元素
  36. if c.closed != 0 && c.qcount == 0 {
  37. if raceenabled {
  38. raceacquire(unsafe.Pointer(c))
  39. }
  40. // 解锁
  41. unlock(&c.lock)
  42. if ep != nil {
  43. // 从一个已关闭的 channel 执行接收操作,且未忽略返回值
  44. // 那么接收的值将是一个该类型的零值
  45. // typedmemclr 根据类型清理相应地址的内存
  46. typedmemclr(c.elemtype, ep)
  47. }
  48. // 从一个已关闭的 channel 接收,selected 会返回true
  49. return true, false
  50. }
  51. // 等待发送队列里有 goroutine 存在,说明 buf 是满的
  52. // 这有可能是:
  53. // 1. 非缓冲型的 channel
  54. // 2. 缓冲型的 channel,但 buf 满了
  55. // 针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
  56. // 针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
  57. if sg := c.sendq.dequeue(); sg != nil {
  58. // Found a waiting sender. If buffer is size 0, receive value
  59. // directly from sender. Otherwise, receive from head of queue
  60. // and add sender's value to the tail of the queue (both map to
  61. // the same buffer slot because the queue is full).
  62. recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
  63. return true, true
  64. }
  65. // 缓冲型,buf 里有元素,可以正常接收
  66. if c.qcount > 0 {
  67. // 直接从循环数组里找到要接收的元素
  68. qp := chanbuf(c, c.recvx)
  69. // …………
  70. // 代码里,没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
  71. if ep != nil {
  72. typedmemmove(c.elemtype, ep, qp)
  73. }
  74. // 清理掉循环数组里相应位置的值
  75. typedmemclr(c.elemtype, qp)
  76. // 接收游标向前移动
  77. c.recvx++
  78. // 接收游标归零
  79. if c.recvx == c.dataqsiz {
  80. c.recvx = 0
  81. }
  82. // buf 数组里的元素个数减 1
  83. c.qcount--
  84. // 解锁
  85. unlock(&c.lock)
  86. return true, true
  87. }
  88. if !block {
  89. // 非阻塞接收,解锁。selected 返回 false,因为没有接收到值
  90. unlock(&c.lock)
  91. return false, false
  92. }
  93. // 接下来就是要被阻塞的情况了
  94. // 构造一个 sudog
  95. gp := getg()
  96. mysg := acquireSudog()
  97. mysg.releasetime = 0
  98. if t0 != 0 {
  99. mysg.releasetime = -1
  100. }
  101. // 待接收数据的地址保存下来
  102. mysg.elem = ep
  103. mysg.waitlink = nil
  104. gp.waiting = mysg
  105. mysg.g = gp
  106. mysg.selectdone = nil
  107. mysg.c = c
  108. gp.param = nil
  109. // 进入channel 的等待接收队列
  110. c.recvq.enqueue(mysg)
  111. // 将当前 goroutine 挂起
  112. goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
  113. // 被唤醒了,接着从这里继续执行一些扫尾工作
  114. if mysg != gp.waiting {
  115. throw("G waiting list is corrupted")
  116. }
  117. gp.waiting = nil
  118. if mysg.releasetime > 0 {
  119. blockevent(mysg.releasetime-t0, 2)
  120. }
  121. closed := gp.param == nil
  122. gp.param = nil
  123. mysg.c = nil
  124. releaseSudog(mysg)
  125. return true, !closed
  126. }