syncpool的实现原理
**题目序号:**361
题目来源:
**频次:**1
答案1:(趁醉独饮痛)
Pool是什么:
Go标准库中提供的一个通用的Pool数据结构,可以使用它创建池化的对象。sync.Pool数据类型的对象用来保存一组可独立访问的临时对象,注意它是临时的,也就是说sync.Pool中保存的对象会在将来的某个时间从sync.Pool中移除掉,如果也没有被其他对象引用的话,该对象会被垃圾回收掉。
Go是一个自带GC的语言,使用起来没有心智负担,我们想使用一个对象就创建一个对象,想使用多个就创建多个,不用关心对象的释放问题。因为有GC程序会帮助我们自动将不再使用的对象的内存回收掉。但是在使用Go开发高性能的应用程序的时候,需要考虑GC执行垃圾回收给我们带来的影响。在GC执行垃圾回收的过程中需要STW(stop-the-world)时间,如果创建有大量的对象,GC程序在检查这些对象是不是垃圾的时候需要花费很多时间。进而对业务功能造成性能影响。
优点:
- 减轻GC负担
- 减少新对象的申请 复用对象,避免每次使用对象都要创建对象
- sync.Pool 本身就是线程安全的,多个 goroutine 可以并发地调用它的方法存取对象;
- sync.Pool 不可在使用之后再复制使用(struct 中有 Nocopy 静态检查是否复制使用)
三个方法:
**New:**当调用 Pool 的 Get方法从池中获取元素,
1. 没有更多的空闲元素可返回时,就会调用这个 New 方法来创建新的元素。
2. 没有设置 New 字段,没有更多的空闲元素可返回时,Get 方法将返回 nil,表明当前没有可用的元素。
3. **Get:从Pool 移除一个元素,这个元素会从 Pool 中移除,返回给调用者,返回值可能是 nil**
Put:将一个元素返还给 Pool,Pool 会把这个元素保存到池中,并且可以复用。但如果 Put 一个 nil 值,Pool 就会忽略这个值
原理:
(sync.Pool结构图)
1.13< 版本之前的两个问题:
- 每次 GC 都会回收创建的对象。
- 元素过多会导致 SWT 耗时变长
- 缓存元素都被回收后,会导致Get 命中率下降,不得不创建心的对象
- 底层实现使用了 Mutex,对这个锁并发请求竞争激烈的时候,会导致性能的下降。
- Go 对 Pool 的优化就是避免使用锁,同时将加锁的 queue 改成 lock-free 的 queue 的实现,给即将移除的元素再多一次“复活”的机会。
Pool 最重要的两个字段 (local 和 victim)
- local:
local是一个poolLocal数组的指针,localSize代表这个数组的大小,它的大小是goroutine中P的数量。每个P都有一个ID,这个ID的值对应着poolLoca数组的下标索引。所以poolLocal数组大小最大为runtime.GOMAXPROCS(0)。victim对应local,它也是一个poolLocal数组的指针,victimSize是victim数组的大小。每次垃圾回收的时候,Pool会把victim中的对象清理掉,然后把local的数据赋值给victim,并把local设置为nil,victim像一个中转站,里面的数据可以被捡回来重新使用,也可能被GC回收掉。
- Victim:
victim 中的元素如果被 Get 取走,那么这个元素就很幸运,因为它又“活”过来了。但是,如果这个时候 Get 的并发不是很大,元素没有被 Get 取走,那么就会被移除掉,因为没有别人引用它的话,就会被垃圾回收掉。
type Pool struct {
noCopy noCopy
// local是一个指向[p]poolLocal的指针,它指向的是一个数组,此数组中的元素是poolLocal类型
// 数组的大小是固定的,数量与p的数量是相同的,也是每个p对应数组中的1个元素
local unsafe.Pointer
// local数组的大小
localSize uintptr
// victim类型与local是一样的,可以理解为local的中转站,过渡存储local用的
victim unsafe.Pointer
// victim数组的大小
victimSize uintptr
// New方法,返回的空接口类型,该字段也是Pool唯一暴露给外界的字段
// New方法可以赋值一个能够产生值的函数,在调用Get方法的时候可以用
// New方法来产生一个value,如果没有给New赋值,调用Get时将返回nil.
New func() interface{}
}
poolLocal:
poolLocal是sync.Pool中local或victim指向数组中的元素类型,里面有一个pad数组填充,使得poolLocal的大小构成128字节的整数倍,是为了防止在cache line上分配多个poolLocalInternal从而造成伪共享。
type poolLocal struct {
poolLocalInternal
// pad是填充用的,防止伪共享,让整个poolLocal构成128字节的整数倍
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
poolLocalInternal:
它包含private和shared两个字段,private是一个私有对象,只能被拥有它的P使用,而一个P同时只能执行一个G, 所以G访问private不需要加锁,不存在并发冲突。shared是一个双端队列,它既能被与之绑定的P访问和修改,也能被其他的P访问和修改,绑定的P可以从队头加入和出队元素,其他的P只能从队尾出队元素
// poolLocal的内部结构,Pool中的poolLocal是对poolLocalInternal的简单包装
type poolLocalInternal struct {
// private是一个私有对象,从名字可以看出来,它只能被拥有它的P使用
// Pool中也说明了,每个P有1个poolLocal,也即有一个poolLocalInternal
// 这里的private只能被他属于的P使用
private interface{} // Can be used only by the respective P.
// shared是一个双端队列,它既能被与之绑定的P访问和修改,也能被其他的P访问和修改
// 绑定的P可以从对头入队加和出队元素,其他的P只能从队尾出队元素。
shared poolChain
}
poolChain:
poolChain是一个双向链表的结构体头,它保存着双向链表的头节点和尾节点指针,链表中节点元素类型是poolChainElt。这里需要注意一下,按照我们常规数据结构的理解,head指针指向的位置在链表最左边,tail指针指向的位置在最右边,但是这里刚好是相反的,这里的head指针指向链表的最右边,tail指向最左边。每添加一个元素的时候,添加在head节点的下一个节点,然后将head指向刚添加的节点。
// poolChain是一个双向链表的结构体头,它保存着双向链表的头节点和尾节点指针,poolChainElt
// 是链表中每个节点的类型结构,该结构类型中有2个字段,headTail uint64和 vals []eface,
// headTail作用后面有描述,vals就是存储元素的切片。vals的大小在每个poolChainElt是不同的,
// 相邻的2个直接大小存在着2倍的关系。
type poolChain struct {
// head只会被1个生产者调用,即Put操作,只在头部添加元素,不需要强同步
head *poolChainElt
// tail被消费者调用,从尾部获取元素,可能有多个消费者同时获取,所以操作必须是原子的
tail *poolChainElt
}
poolChainElt:
双向链表中的节点结构,它有一个存储数据的poolDequeue队列,next和prev分别是指向前后节点的指针。poolDequeue中vals字段存放元素,vals是一个固定大小的切片,相邻的两个poolChainElt中的vals大小存在2倍关系,第一个poolChainElt中vals大小是8,可以结合上面pool结构图看,第二个poolChainElt中的size是16,最大值为2^30。
type poolChainElt struct {
// 存数据的队列,是一个固定大小的切片
poolDequeue
// next和prev是双向链表的指针,next被生产者写,消费者读,它的值只会从
// nil变成非nil, prev被消费者写,生产者读,它的值只会从非nil变成nil
next, prev *poolChainElt
}
poolDequeue:
poolDequeue是一个无锁的、固定大小的,单个生产者、多个消费者的队列。poolDequeue无锁实现是Pool的精髓,巧妙采用CAS去掉了1.13版本之前的有锁实现。它有headTail和vals两个字段,headTail占8个字节,高4字节表示的是head在vals中的下标位置,低4字节表示的是tail在vals中的下标位置。生产者每生产1个数据,head+1,消费者每取走队列中1个数据,tail+1.当tail与head相等的时候,说明队列中没有元素了,当tail+len(vals)=head的时候,说明队列满了。
// poolDequeue是一个无锁的、固定大小的、单个生产者、多个消费者的队列
// 生产者只能从队头向队列添加数据或者从队头取走数据,单个或多个消费者可以
// 可用队尾取走数据
type poolDequeue struct {
// headTail占8个字节,分为两部分,高位4字节和低位4字节,高4字节表示的
// 32位int数据描述的是head下标,低4字节表示的是32位int数据描述的是tail
// 下标,这里的下标说的都是下面vals数组的下标,vals[tail,head)下标范围
// 内是有数据的,可以被消费者消费。生产者每生产1个数据,head会+1, 消费者
// 每取走队列中1个数据,tail+1, 当tail赶上head的时候,说明队列中没有数
// 据了,头部索引存储在最高有效位中,因此我们可以原子地添加它,溢出并不会产生
// 别的影响。
headTail uint64
// vals是一个存储数据的环形队列,数据可以是任何类型,因为定义的类型是interface{}
// 它的大小必须是2整数次幂。vals[i].typ为nil,表示该下标位置的没有放数据,typ不是
// nil表示该位置已放有数据。在tail还未设置为i+1和vals[i].typ为非nil前,vals中的
// 位置i视为还在占用状态。当前消费者取走数据或是生产者读取走数据后,vals[i]将自动被
// 它们设置为nil.
vals []eface
}
type eface struct {
typ, val unsafe.Pointer
}
Get实现原理:
Get先从本地 local 的private中获取元素,此private只能被当前的P访问,一个P同时只能运行一个G, 所以直接访问l.privated不需要加锁。如果local.private中没有元素了,尝试从local.shared队列的头部弹出一个元素,如果local.shared中也没有元素了,则从其他P对应的shared中偷取一个,如果其他P对应的shared中也没有元素了,再检查victim中是否有元素,如果还是没有,设置了New方法,将调用New方法产生一个。如果没有设置New方法,将返回nil。
// Get方法从Pool中返回一个元素,并将它从Pool中移除,调用者不要假定放入(Put方法)Pool中元素和从里面
// 取(Get)出来的元素有任何关系,如果Pool中没有缓存的元素了并且P.New没有赋值将会返回nil.否则调用New
// 方法产生一个对象返回
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
// 把当前的goroutine固定在当前的P上,返回当前的P上的*poolLocal
l, pid := p.pin()
// 先从本地local的private字段获取元素,此private只能被当前的P访问
// 其他P是不能访问的,一个P同一时间只能有1个goroutine在运行,所以
// 直接访问l.private并不需要加锁
x := l.private
l.private = nil
// private中没有元素
if x == nil {
// 尝试从当前的本地local.shared中出队一个元素,x是从队头弹出的。
x, _ = l.shared.popHead()
if x == nil {
// 如果从local.shared也没有拿到,则从其他P中的l.shared中偷1个
// 如果shared中也没有,产生从victim中获取
x = p.getSlow(pid)
}
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
// 走到这里说明,private, local.shared, 其他P中的local.shared, 本地local.victim,
// 其他P中的local.victim都没有缓存了,如果设置New函数,调用New函数创建一个元素返回
if x == nil && p.New != nil {
x = p.New()
}
// 走到这里说明没有设置New函数,返回nil
retur
pin方法会调用 runtime_procPin 方法获取当前运行的G绑定到当前的P上,禁止抢占,返回关联P的pid. 然后原子操作取出p.localSize值与Pid进行比较,正常情况下pid是不会比localSize 大,如果不在 ocalSize 范围内,说明程序修改过P的大小,会走到pinSlow处理逻辑。
pin方法会调用runtime_procPin方法获取当前运行的G绑定到当前的P上,禁止抢占,返回关联P的pid. 然后原子操作取出p.localSize值与Pid进行比较,正常情况下pid是不会比localSize大,如果不在localSize范围内,说明程序修改过P的大小,会走到pinSlow处理逻辑。
// 将当前的G定在与它关联的P上,禁止抢占,返回关联P对应的poolLocal对象和P的id
// 调用者必须执行runtime_procUnpin操作当结束对pool操作的时候
func (p *Pool) pin() (*poolLocal, int) {
// 获取运行当前G,与之关联的P的id
pid := runtime_procPin()
// 检查localSize是不是比pid大,正常情况下p.localSize=max(pid)+1
// 因为pid是P的id,如果P的数量为n,则pid的范围为[0,n), 如果pid的值不在
// localSize范围内,说明程序修改过P的数量,这里就会走到pinSlow逻辑
// pinSlow逻辑主要是重新分配p的local和localSize
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow()
}
pinSlow会再次检查pid是否在localSize范围内,如果不在,将会重新分配poolLocal数组给local, 初始化localSize大小,并将p加入到全局的allPools切片中,allPools维护了程序中所有的sync.Pool对象。
func (p *Pool) pinSlow() (*poolLocal, int) {
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
// 已经加了allPoolsMu全局锁,这里可以直接读取localSize和local,不存在data race
s := p.localSize
l := p.local
// 再次判断pid的值是否在正常的范围内,刚才前面解除了禁止抢占,等现在再拿到pid的时候
// 可能不是之前的pid了
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// 将p加入到全局pool allPools中
if p.local == nil {
allPools = append(allPools, p)
}
// GOMAXPROCS传0返回值是当前可以并发运行的CPU数
size := runtime.GOMAXPROCS(0)
// 创建一个新的poolLocal切片,并复制给当前的p
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
// 返回当前的P对应的localPool和pid的值
return &local[pid], pid
}
// l存储的是local的起始地址,local是一个poolLocal数组,所以根据起始位置+poolLocal的偏移量
// 可以获取到第i个索引中的poolLocal值
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}
分析Get中的popHead操作,popHead从poolChain中获取一个数据,优先从头节点head指向的环形队列中获取,如果没有获取到,再从head前一个节点指向的环形队列中获取,直到查询完所有的队列都没有,返回失败。整个处理过程是不断的从链表的head节点向尾节点遍历,对遍历到的每个节点执行popHead, 这个节点类型是poolDequeue, poolDequeue的popHead。
// popHead从poolChain获取一个数据,优先从头部的环形队列中获取,
// 如果没有获取到,在从head前一个节点指向的环形队列中获取,直到
// 查询完所有的队列都没有,返回获取失败false
func (c *poolChain) popHead() (interface{}, bool) {
d := c.head
// 从head节点开始查找
for d != nil {
if val, ok := d.popHead(); ok {
return val, ok
}
// 继续查找前一个队列,只要d非空即没有查到tail位置,一直
// 查找,获取到了数据就提前返回
d = loadPoolChainElt(&d.prev)
}
// 走到这里表示没有获取数据,返回失败
return nil, false
}
// popHead从队列的头部返回一个元素,如果队列为空,第2个返回值将返回false, 注意该函数
// 只能被单个生产者调用,多个生产者即G同时调用,会引发数据竞争
func (d *poolDequeue) popHead() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// 下标tail和head相等了,说明tail追赶上了head,即将vals中的元素取完了
if tail == head {
// Queue is empty.
return nil, false
}
head--
// 构建一个新的uint64,head是-1后的值
ptrs2 := d.pack(head, tail)
// 进一步比较headTail和ptrs,如果相同说明没有修改它,将其设置为ptrs2
// 这时可以放心大胆的对head进行操作,获取里面的元素了,如果这段时间headTail
// 有人修改,继续进行下一轮循环处理
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// 这里可以放心大胆的读取vals中head下标处的元素了
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
// 这里判断slot中的val是不是dequeueNil型,因为在pushHead中有肯能会放入,作占位用
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// 将slot填充为nil值,因为popHead和pushHead不会同时被调用,所以这里可以放心大胆对其进行
// 操作,还有一点是,前面已经重新设置了head值了,popTail已经不可能访问到slot这里,也不用担心
// popTail与popHead在这里的冲突问题
*slot = eface{}
return val, true
}
如果shared中的popHead方法也没有获取到值,将会调下面的getSlow方法。先尝试从其他的P对应的poolLocal中偷一个元素,尝试的顺序是从当前pid+1个索引位置开始的,会对sync.local检查一圈。对每个poolLocal检查的是shared中的元素,看尾部有没有元素,如果有从尾部弹出一个元素。如果检查完所有的poolLocal中的shared都没有找到,接下来将在当前本地poolLocal中的victim的private查找,如果还是没有,就检查它的victim的shared是否有,最后检查其他P对应的poolLocal中victim中的shared是否有数据。
func (p *Pool) getSlow(pid int) interface{} {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// Try to steal one element from other procs.
// 尝试从其他P中偷一个元素,尝试P的顺序是从当前pid+1个索引位置开始对应的
// poolLocal中的shared开始查找是否有元素,在shared中的查询方式是从
// 它的尾部弹出一个元素,如果shared队列中没有,继续尝试下一个位置,直到
// 循环检查一圈都没有,走victim中检查
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
// 下面尝试从受害中缓存victim中查找是否元素,查找的位置是从pid索引位置开始的poolLocal
// 产生从它的shared尾部弹出一个元素,如果有就返回,如果没有就尝试下一个位置的poolLocal
locals = p.victim
l := indexLocal(locals, pid)
// 检查victim.private是否有元素,有就返回
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 这里将p.victimSize设置为0,是为了减少后序调用getSlow,不用再一次遍历
// victim数组了,直接返回nil, 加快处理速度
atomic.StoreUintptr(&p.victimSize, 0)
// 尝试了所有的local.shared和victim.private, victim.shared都没有找到,返回nil
return nil
}
// popTail从poolChain中获取一个数据返回,第一个返回值表示返回的数据,第二个
// 返回值表示是否成功获取到了数据,如果没有获取到,将返回false
func (c *poolChain) popTail() (interface{}, bool) {
d := loadPoolChainElt(&c.tail)
// d为nil,表示c.tail还未初始化,比如第一次执行Get操作的时候就会发生,这个时候
// 还没有Put数据进去,所以获取失败
if d == nil {
return nil, false
}
for {
// 需要注意这里先要进行loadPoolChainElt操作再执行d.popTail操作,
// 这两条语句不能颠倒顺序,一般来说,d可能短暂为空,如果在执行pop
// 且执行失败了,但是在执行前d2为非空。这种情况d永久会为空了,需要将
// d从poolChain中移除掉
d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok {
return val, ok
}
if d2 == nil {
// 走完整个链表了,还是没有获取到数据,直接返回nil,false
return nil, false
}
// 当前tail执行的节点中的环形队列中已经没有数据可以获取了,尝试tail的
// next节点中获取,因为当前的tail节点永远不会再有数据,所以把它从poolChain
// 移除掉,下次再执行popTail的时候,就不用检查这个空队列了。
// 这里采用了CAS操作,因为有多个消费者可能并行执行popTail.
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
// 将d从poolChain中移除,以便在gc的时候可以将d回收掉,
// 那为啥在上面的popHead中没有将d从poolChain中移除掉呢?
// 因为popHead中的d可以留着,在后序的pushHead中可以继续
// 使用,而popTail中的d永远不会再被使用了,因为没有pushTail操作
storePoolChainElt(&d2.prev, nil)
}
d = d2
}
}
上面的popTail对d的处理做了优化,将d从poolChain中移除,以便在gc的时候可以将d回收掉,那为啥在上面的popHead中没有将d从poolChain中移除掉呢?因为popHead中的d可以留着,在后序的pushHead中可以继续使用,而popTail中d永远不会再被使用了,因为没有pushTail操作。
Put实现原理
Put操作处理逻辑比较简单,优先将元素放在本地的private,如果private字段已经有值了,会将元素添加到本地的shared队列中。
GC操作:
sync.Pool中的对象并不是一直保持不释放的。否则对象太多了,会引起内存溢出。Go中的GC对sync.Pool做了专门的处理。在pool.go的init函数里,向GC注册了清理机制,源码如下.poolCleanup 会在 STW 阶段被调用。主要是将 local 和 victim 作交换,那么不至于GC 把所有的 Pool 都清空了,而是需要两个 GC 周期才会被释放
// 注册pool清理回调,在gc的时候,会调用poolCleanup
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
// 清理pool函数,对oldPools和allPools的操作都是没有加锁的,因为这里执行的时候
// 已经STW了,不存在竞争
func poolCleanup() {
// oldPools是一个全局变量,是一个保存*sync.Pool的切片,这些
// 切片元素都是等待被gc清理的,当执行完下面的循环之后,就被清理了
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// allPools是当前新产生的*sync.Pool元素集合,里面存储的元素跟前面的
// oldPools是不同的,当该轮gc执行的时候,会将allPools中所有的Pool中的
// local元素移动到victim受害者缓存中,并将local缓存清空
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// 交换oldPools和allPools,处理的很精炼,为什么要这么做呢?
// 进程起来,运行过程中申请了sync.Pool创建的变量vp,然后执行gc,此时oldPools是空的
// allPools是非空的,里面装的是vp, 当该函数被调用的时候,将vp中的元素从local移动到victim
// 即在第一次gc的时候,vp中的元素没有被垃圾回收,接下来执行下面这个语句之后,allPools被清空
// oldPools里面保存的是vp, 当下次执行gc的时候,oldPools中的内容被清理掉,所以vp中元素生命
// 周期在两次gc之间都是存活的
oldPools, allPools = allPools, nil
}