Go 有哪些并发同步原语?

Go是一门以并发编程见长的语言,它提供了一系列的同步原语方便开发者使用

原子操作

Mutex、RWMutex 等并发原语的底层实现是通过 atomic 包中的一些原子操作来实现的,原子操作是最基础的并发原语

562.Go 有哪些并发同步原语? - 图1

  1. package main
  2. import (
  3. "fmt"
  4. "sync/atomic"
  5. )
  6. var opts int64 = 0
  7. func main() {
  8. add(&opts, 3)
  9. load(&opts)
  10. compareAndSwap(&opts, 3, 4)
  11. swap(&opts, 5)
  12. store(&opts, 6)
  13. }
  14. func add(addr *int64, delta int64) {
  15. atomic.AddInt64(addr, delta) //加操作
  16. fmt.Println("add opts: ", *addr)
  17. }
  18. func load(addr *int64) {
  19. fmt.Println("load opts: ", atomic.LoadInt64(&opts))
  20. }
  21. func compareAndSwap(addr *int64, oldValue int64, newValue int64) {
  22. if atomic.CompareAndSwapInt64(addr, oldValue, newValue) {
  23. fmt.Println("cas opts: ", *addr)
  24. return
  25. }
  26. }
  27. func swap(addr *int64, newValue int64) {
  28. atomic.SwapInt64(addr, newValue)
  29. fmt.Println("swap opts: ", *addr)
  30. }
  31. func store(addr *int64, newValue int64) {
  32. atomic.StoreInt64(addr, newValue)
  33. fmt.Println("store opts: ", *addr)
  34. }

Channel

channel 管道,高级同步原语,goroutine之间通信的桥梁

使用场景:消息队列、数据传递、信号通知、任务编排、锁

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. c := make(chan struct{}, 1)
  8. for i := 0; i < 10; i++ {
  9. go func() {
  10. c <- struct{}{}
  11. time.Sleep(1 * time.Second)
  12. fmt.Println("通过ch访问临界区")
  13. <-c
  14. }()
  15. }
  16. for {
  17. }
  18. }

基本并发原语

Go 语言在 sync包中提供了用于同步的一些基本原语,这些基本原语提供了较为基础的同步功能,但是它们是一种相对原始的同步机制,在多数情况下,我们都应该使用抽象层级更高的 Channel 实现同步。

常见的并发原语如下:sync.Mutexsync.RWMutexsync.WaitGroupsync.Condsync.Oncesync.Poolsync.Context

sync.Mutex

sync.Mutex (互斥锁) 可以限制对临界资源的访问,保证只有一个 goroutine 访问共享资源

使用场景:大量读写,比如多个 goroutine 并发更新同一个资源,像计数器

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. func main() {
  7. // 封装好的计数器
  8. var counter Counter
  9. var wg sync.WaitGroup
  10. var gNum = 1000
  11. wg.Add(gNum)
  12. // 启动10个goroutine
  13. for i := 0; i < gNum; i++ {
  14. go func() {
  15. defer wg.Done()
  16. counter.Incr() // 受到锁保护的方法
  17. }()
  18. }
  19. wg.Wait()
  20. fmt.Println(counter.Count())
  21. }
  22. // 线程安全的计数器类型
  23. type Counter struct {
  24. mu sync.Mutex
  25. count uint64
  26. }
  27. // 加1的方法,内部使用互斥锁保护
  28. func (c *Counter) Incr() {
  29. c.mu.Lock()
  30. c.count++
  31. c.mu.Unlock()
  32. }
  33. // 得到计数器的值,也需要锁保护
  34. func (c *Counter) Count() uint64 {
  35. c.mu.Lock()
  36. defer c.mu.Unlock()
  37. return c.count
  38. }

sync.RWMutex

sync.RWMutex (读写锁) 可以限制对临界资源的访问,保证只有一个 goroutine 写共享资源,可以有多个goroutine 读共享资源

使用场景:大量并发读,少量并发写,有强烈的性能要求

package main import ( "fmt" "sync" "time" ) func main() { // 封装好的计数器 var counter Counter var gNum = 1000 // 启动10个goroutine for i := 0; i < gNum; i++ { go func() { counter.Count() // 受到锁保护的方法 }() } for { // 一个writer counter.Incr() // 计数器写操作 fmt.Println("incr") time.Sleep(time.Second) } } // 线程安全的计数器类型 type Counter struct { mu sync.RWMutex count uint64 } // 加1的方法,内部使用互斥锁保护 func (c *Counter) Incr() { c.mu.Lock() c.count++ c.mu.Unlock() } // 得到计数器的值,也需要锁保护 func (c *Counter) Count() uint64 { c.mu.RLock() defer c.mu.RUnlock() return c.count }

sync.WaitGroup

sync.WaitGroup 可以等待一组 Goroutine 的返回

使用场景:并发等待,任务编排,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求

requests := []*Request{...} wg := &sync.WaitGroup{} wg.Add(len(requests)) for _, request := range requests { go func(r *Request) { defer wg.Done() // res, err := service.call(r) }(request) } wg.Wait()

sync.Cond

sync.Cond 可以让一组的 Goroutine 都在满足特定条件时被唤醒

使用场景:利用等待 / 通知机制实现阻塞或者唤醒

package main import ( "fmt" "sync" "sync/atomic" "time" ) var status int64 func main() { c := sync.NewCond(&sync.Mutex{}) for i := 0; i < 10; i++ { go listen(c) } time.Sleep(1 * time.Second) go broadcast(c) time.Sleep(1 * time.Second) } func broadcast(c *sync.Cond) { c.L.Lock() atomic.StoreInt64(&status, 1) c.Signal() c.L.Unlock() } func listen(c *sync.Cond) { c.L.Lock() for atomic.LoadInt64(&status) != 1 { c.Wait() } fmt.Println("listen") c.L.Unlock() }

sync.Once

sync.Once 可以保证在 Go 程序运行期间的某段代码只会执行一次

使用场景:常常用于单例对象的初始化场景

package main import ( "fmt" "sync" ) func main() { o := &sync.Once{} for i := 0; i < 10; i++ { o.Do(func() { fmt.Println("only once") }) } }

sync.Pool

sync.Pool可以将暂时将不用的对象缓存起来,待下次需要的时候直接使用,不用再次经过内存分配,复用对象的内存,减轻 GC 的压力,提升系统的性能(频繁地分配、回收内存会给 GC 带来一定的负担,严重的时候会引起 CPU 的毛刺)

使用场景:对象池化, TCP连接池、数据库连接池、Worker Pool

package main import ( "fmt" "sync" ) func main() { pool := sync.Pool{ New: func() interface{} { return 0 }, } for i := 0; i < 10; i++ { v := pool.Get().(int) fmt.Println(v) // 取出来的值是put进去的,对象复用;如果是新建对象,则取出来的值为0 pool.Put(i) } }

sync.Map

sync.Map 线程安全的map

使用场景:map 并发读写

package main import ( "fmt" "sync" ) func main() { var scene sync.Map // 将键值对保存到sync.Map scene.Store("1", 1) scene.Store("2", 2) scene.Store("3", 3) // 从sync.Map中根据键取值 fmt.Println(scene.Load("1")) // 根据键删除对应的键值对 scene.Delete("1") // 遍历所有sync.Map中的键值对 scene.Range(func(k, v interface{}) bool { fmt.Println("iterate:", k, v) return true }) }

sync.Context

sync.Context 可以进行上下文信息传递、提供超时和取消机制、控制子 goroutine 的执行

使用场景:取消一个goroutine的执行

package main import ( "context" "fmt" "time" ) func main() { ctx, cancel := context.WithCancel(context.Background()) go func() { defer func() { fmt.Println("goroutine exit") }() for { select { case <-ctx.Done(): fmt.Println("receive cancel signal!") return default: fmt.Println("default") time.Sleep(time.Second) } } }() time.Sleep(time.Second) cancel() time.Sleep(2 * time.Second) }

扩展并发原语

ErrGroup

errgroup 可以在一组 Goroutine 中提供了同步、错误传播以及上下文取消的功能

使用场景:只要一个 goroutine 出错我们就不再等其他 goroutine 了,减少资源浪费,并且返回错误

  1. package main
  2. import (
  3. "fmt"
  4. "net/http"
  5. "golang.org/x/sync/errgroup"
  6. )
  7. func main() {
  8. var g errgroup.Group
  9. var urls = []string{
  10. "http://www.baidu.com/",
  11. "https://www.sina.com.cn/",
  12. }
  13. for i := range urls {
  14. url := urls[i]
  15. g.Go(func() error {
  16. resp, err := http.Get(url)
  17. if err == nil {
  18. resp.Body.Close()
  19. }
  20. return err
  21. })
  22. }
  23. err := g.Wait()
  24. if err == nil {
  25. fmt.Println("Successfully fetched all URLs.")
  26. } else {
  27. fmt.Println("fetched error:", err.Error())
  28. }
  29. }

Semaphore

Semaphore带权重的信号量,控制多个goroutine同时访问资源

使用场景:控制 goroutine 的阻塞和唤醒

package main import ( "context" "fmt" "log" "runtime" "time" "golang.org/x/sync/semaphore" ) var ( maxWorkers = runtime.GOMAXPROCS(0) sema = semaphore.NewWeighted(int64(maxWorkers)) //信号量 task = make([]int, maxWorkers*4) // 任务数,是worker的四 ) func main() { ctx := context.Background() for i := range task { // 如果没有worker可用,会阻塞在这里,直到某个worker被释放 if err := sema.Acquire(ctx, 1); err != nil { break } // 启动worker goroutine go func(i int) { defer sema.Release(1) time.Sleep(100 * time.Millisecond) // 模拟一个耗时操作 task[i] = i + 1 }(i) } // 请求所有的worker,这样能确保前面的worker都执行完 if err := sema.Acquire(ctx, int64(maxWorkers)); err != nil { log.Printf("获取所有的worker失败: %v", err) } fmt.Println(maxWorkers, task) }

SingleFlight

用于抑制对下游的重复请求

使用场景:访问缓存、数据库等场景,缓存过期时只有一个请求去更新数据库

package main import ( "fmt" "sync" "sync/atomic" "time" "golang.org/x/sync/singleflight" ) // 模拟从数据库读取 func getArticle(id int) (article string, err error) { // 假设这里会对数据库进行调用, 模拟不同并发下耗时不同 atomic.AddInt32(&count, 1) time.Sleep(time.Duration(count) * time.Millisecond) return fmt.Sprintf("article: %d", id), nil } // 模拟优先读缓存,缓存不存在读取数据库,并且只有一个请求读取数据库,其它请求等待 func singleflightGetArticle(sg *singleflight.Group, id int) (string, error) { v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) { return getArticle(id) }) return v.(string), err } var count int32 func main() { time.AfterFunc(1*time.Second, func() { atomic.AddInt32(&count, -count) }) var ( wg sync.WaitGroup now = time.Now() n = 1000 sg = &singleflight.Group{} ) for i := 0; i < n; i++ { wg.Add(1) go func() { res, _ := singleflightGetArticle(sg, 1) // res, _ := getArticle(1) if res != "article: 1" { panic("err") } wg.Done() }() } wg.Wait() fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now)) }