WaitGroup 是开发过程中经常使用的并发控制技术,用来在程序中控制等待一组 goroutine 结束。
实现原理
数据结构
WaitGroup 的数据结构包括了一个 noCopy 的辅助字段,一个 state1 记录 WaitGroup 状态的数组:
- noCopy 的辅助字段;
- state1,一个具有复合意义的字段,包含 WaitGroup 的计数、阻塞在检查点的 waiter 数和信号量。
type WaitGroup struct {
// 避免复制使用的一个技巧,可以告诉 vet 工具违反了复制使用的规则
noCopy noCopy
// 前 64bit(8bytes) 的值分成两段,高 32bit 是计数值,低 32bit 是 waiter 的计数
// 另外 32bit 是用作信号量的
// 因为 64bit 值的原子操作需要 64bit 对齐,但是 32bit 编译器不支持,所以数组中的元素在不同的架构中不一样,具体处理看下面的方法
// 总之,会找到对齐的那 64bit 作为 state,其余的 32bit 做信号量
state1 [3]uint32
}// 得到state的地址和信号量的地址
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// 如果地址是 64bit 对齐的,数组前两个元素做 state,后一个元素做信号量
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
// 如果地址是 32bit 对齐的,数组后两个元素用来做 state,它可以用来做 64bit 的原子操作,第一个元素 32bit 用来做信号量
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
在 64 位环境下,state1 的第一个元素是 waiter 数,第二个元素是 WaitGroup 的计数值,第三个元素是信号量。
文章图片
noCopy:辅助 vet 检查方法 Add & Done Add 方法主要操作的是 state 的计数部分,去除 race 检查和异常检查的代码后,它的实现如下:
noCopy 字段的作用是指示 vet 工具在做检查的时候,这个数据结构不能做值复制使用。更严谨地说,是不能在第一次使用之后复制使用。
vet 会对实现 Locker 接口的数据类型做静态检查,一旦代码中有复制使用这种数据类型的情况,就会发出警告。但是,WaitGroup 不满足 Locker 接口,这时就可以通过给 WaitGroup 添加一个 noCopy 字段来实现 Locker 接口。而且因为 noCopy 字段是未输出类型,所以 WaitGroup 不会暴露 Lock/Unlock 方法。
如果你想要自己定义的数据结构不被复制使用,或者说,不能通过 vet 工具检查出复制使用的报警,就可以通过嵌入 noCopy 这个数据类型来实现。
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
// 高 32bit 是计数值 v,所以把 delta 左移 32,增加到计数上
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32) // 当前计数值
w := uint32(state) // waiter countif v > 0 || w == 0 {
return
}// 如果计数值 v 为 0 并且 waiter 的数量 w 不为 0,那么 state 的值就是 waiter 的数量
// 将waiter的数量设置为 0,因为计数值 v 也是 0,所以它们俩的组合 *statep 直接设置为 0 即可。此时需要并唤醒所有的 waiter
*statep = 0
for ;
w != 0;
w-- {
runtime_Semrelease(semap, false, 0)
}
}// Done 方法实际就是计数器减 1
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
Wait Wait 方法的实现逻辑是:不断检查 state 的值。如果其中的计数值变为了 0,那么说明所有的任务已完成,调用者不必再等待,直接返回。如果计数值大于 0,说明此时还有任务没完成,那么调用者就变成了等待者,需要加入 waiter 队列,并且阻塞住自己。
其主干实现代码如下:
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32) // 当前计数值
w := uint32(state) // waiter 的数量
if v == 0 {
// 如果计数值为 0, 调用这个方法的 goroutine 不必再等待,继续执行它后面的逻辑即可
return
}
// 否则把 waiter 数量加 1。期间可能有并发调用 Wait 的情况,增加可能会失败,所以最外层使用了一个 for 循环
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 阻塞休眠等待
runtime_Semacquire(semap)
// 被唤醒,不再阻塞,返回
return
}
}
}
常见错误
计数器设置为负值 WaitGroup 的计数器的值必须大于等于 0。我们在更改这个计数值的时候,WaitGroup 会先做检查,如果计数值被设置为负数,就会导致 panic。
一般情况下,有两种方法会导致计数器设置为负数:
- 调用 Add 的时候传递一个负数。如果你能保证当前的计数器加上这个负数后还是大于等于 0 的话,也没有问题,否则就会导致 panic。
- 调用 Done 方法的次数过多,超过了 WaitGroup 的计数值。
前一个 Wait 还没结束就重用 WaitGroup 只要 WaitGroup 的计数值恢复到零值的状态,那么它就可以被看作是新创建的 WaitGroup,被重复使用。但是,如果我们在 WaitGroup 的计数值还没有恢复到零值的时候就重用,就会导致程序 panic。我们看一个例子,初始设置 WaitGroup 的计数值为 1,启动一个 goroutine 先调用 Done 方法,接着就调用 Add 方法,Add 方法有可能和主 goroutine 并发执行。
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
time.Sleep(time.Millisecond)
wg.Done() // 计数器减 1
wg.Add(1) // 计数值加 1
}()
wg.Wait() // 主 goroutine 等待,有可能和第 7 行并发执行
}
在这个例子中,第 6 行虽然让 WaitGroup 的计数恢复到 0,但是因为第 9 行有个 waiter 在等待,如果等待 Wait 的 goroutine,刚被唤醒就和 Add 调用(第 7 行)有并发执行的冲突,所以就会出现 panic。
【【Go进阶—并发编程】WaitGroup】WaitGroup 虽然可以重用,但是是有一个前提的,那就是必须等到上一轮的 Wait 完成之后,才能重用 WaitGroup 执行下一轮的 Add/Wait,如果你在 Wait 还没执行完的时候就调用下一轮 Add 方法,就有可能出现 panic。
推荐阅读
- 【第三十二期】春招 Golang实习面经 七牛
- Golang|Golang 小数操作之判断几位小数点与四舍五入
- Golang|Golang []int []string 互转与判断字符是否在数组中
- Go 语言社区新提案 arena,可优化内存分配
- golang中的单元测试
- 【第三十一期】360后台开发实习面经 - 两轮技术面
- Leetcode专题[字符串]-剑指 Offer 05-替换空格
- Leetcode专题[字符串]-151-翻转字符串里的单词
- Leetcode专题[字符串]-541-反转字符串II