用go实现一个协程池,大概用什么实现

题目来源:网易

答案:

定义一个task 的结构体 标示具体要执行的任务格式

  1. type Job func([]interface{})type taskWork struct {
  2. Run Job
  3. startBool bool
  4. params []interface{}
  5. }

定义一个worker 池,控制协程相关信息

  1. type WorkPool struct {
  2. taskPool chan taskWork
  3. workNum int
  4. maxNum int
  5. stopTopic bool
  6. //考虑后期 作为冗余队列使用
  7. taskQue chan taskWork
  8. }

实现协程池相关启动,停止,扩容策略,缩减策略,备用队列启用等 逻辑

  1. //得到一个线程池并返回 句柄
  2. func (p *WorkPool) InitPool() {
  3. *p = WorkPool{workNum: workerNumDefault,
  4. maxNum: workerNumMax, stopTopic: false,
  5. taskPool: make(chan taskWork, workerNumDefault*2), taskQue: nil}
  6. (p).start()
  7. go (p).workerRemoveConf()
  8. }
  9. //开始work
  10. func (p *WorkPool) start() {
  11. for i := 0; i < workerNumDefault; i++ {
  12. p.workInit(i)
  13. fmt.Println("start pool task:", i)
  14. }
  15. }
  16. //初始化 work池 后期应该考虑如何 自动 增减协程数,以达到最优
  17. func (p *WorkPool) workInit(id int) {
  18. go func(idNum int) {
  19. //var i int = 0
  20. for {
  21. select {
  22. case task := <-p.taskPool:
  23. if task.startBool == true && task.Run != nil {
  24. //fmt.Print("this is pool ", idNum, "---")
  25. task.Run(task.params)
  26. }
  27. //单个结束任务
  28. if task.startBool == false {
  29. //fmt.Print("this is pool -- ", idNum, "---")
  30. return
  31. }
  32. //防止从channal 中读取数据超时
  33. case <-time.After(time.Millisecond * 1000):
  34. //fmt.Println("time out init")
  35. if p.stopTopic == true && len(p.taskPool) == 0 {
  36. fmt.Println("topic=", p.stopTopic)
  37. //work数递减
  38. p.workNum--
  39. return
  40. }
  41. //从备用队列读取数据
  42. case queTask := <-p.taskQue:
  43. if queTask.startBool == true && queTask.Run != nil {
  44. //fmt.Print("this is que ", idNum, "---")
  45. queTask.Run(queTask.params)
  46. }
  47. }
  48. }
  49. }(id)
  50. }
  51. //停止一个workPool
  52. func (p *WorkPool) Stop() {
  53. p.stopTopic = true
  54. }
  55. //普通运行实例,非自动扩充
  56. func (p *WorkPool) Run(funcJob Job, params ...interface{}) {
  57. p.taskPool <- taskWork{funcJob, true, params}
  58. }
  59. //用select 去做 实现 自动扩充 协程个数 启用备用队列等特性
  60. func (p *WorkPool) RunAuto(funcJob Job, params ...interface{}) {
  61. task := taskWork{funcJob, true, params}
  62. select {
  63. //正常写入
  64. case p.taskPool <- task:
  65. //写入超时 说明队列满了 写入备用队列
  66. case <-time.After(time.Millisecond * 1000):
  67. p.taskQueInit()
  68. p.workerAddConf()
  69. //task 入备用队列
  70. p.taskQue <- task
  71. }
  72. }
  73. //自动初始化备用队列
  74. func (p *WorkPool) taskQueInit() {
  75. //扩充队列
  76. if p.taskQue == nil {
  77. p.taskQue = make(chan taskWork, p.maxNum*2)
  78. }
  79. }
  80. //自动扩充协程 简单的自动扩充策略
  81. func (p *WorkPool) workerAddConf() {
  82. //说明需要扩充进程 协程数量小于 1000 协程数量成倍增长
  83. if p.workNum < 1000 {
  84. p.workerAdd(p.workNum)
  85. } else if p.workNum < p.maxNum {
  86. tmpNum := p.maxNum - p.workNum
  87. tmpNum = tmpNum / 10
  88. if tmpNum == 0 {
  89. tmpNum = 1
  90. }
  91. p.workerAdd(1)
  92. }
  93. }
  94. //自动缩减协程 实现比较粗糙,可以考虑后续精细实现一些策略
  95. func (p *WorkPool) workerRemoveConf() {
  96. for {
  97. select {
  98. case <-time.After(time.Millisecond * 1000 * 600):
  99. if p.workNum > workerNumDefault && len(p.taskPool) == 0 && len(p.taskQue) == 0 {
  100. rmNum := (p.workNum - workerNumDefault) / 5
  101. if rmNum == 0 {
  102. rmNum = 1
  103. }
  104. p.workerRemove(rmNum)
  105. }
  106. }
  107. }
  108. }
  109. func (p *WorkPool) workerAdd(num int) {
  110. for i := 0; i < num; i++ {
  111. p.workNum++
  112. p.workInit(p.workNum)
  113. }
  114. }
  115. func (p *WorkPool) workerRemove(num int) {
  116. for i := 0; i < num; i++ {
  117. task := taskWork{startBool: false}
  118. p.taskPool <- task
  119. p.workNum--
  120. }
  121. }