十一.|十一. Go并发编程--singleflight
一.前言
1.1 为什么需要Singleflight?
很多程序员可能还是第一次听说,本人第一次听说这个的时候以为翻译过来就是程序设计中被称为的是 "单例模式"。 google之后二者天壤之别。一般情况下我们在写一写对外的服务的时候都会有一层 cache 作为缓存,用来减少底层数据库的压力,但是在遇到例如 redis 抖动或者其他情况可能会导致大量的 cache miss 出现。
1.2 使用场景
如下图所示,可能存在来自桌面端和移动端的用户有 1000 的并发请求,他们都访问的获取文章列表的接口,获取前 20 条信息,如果这个时候我们服务直接去访问 redis 出现 cache miss 那么我们就会去请求 1000 次数据库,这时可能会给数据库带来较大的压力(这里的 1000 只是一个例子,实际上可能远大于这个值)导致我们的服务异常或者超时。
文章图片
这时候就可以使用
singleflight
库了,直译过来就是 单飞.这个库的主要作用就是: 将一组相同的请求合并成一个请求,实际上只会去请求一次,然后对所有的请求返回相同的结果。 如下图所示
文章图片
二. slingleFligh 库(使用教程) 2.1 函数签名
主要是一个
Group
结构体, 结构体包含三个方法,相见代码注释type Group
// Do 执行函数, 对同一个 key 多次调用的时候,在第一次调用没有执行完的时候
// 只会执行一次 fn 其他的调用会阻塞住等待这次调用返回
// v, err 是传入的 fn 的返回值
// shared 表示是否真正执行了 fn 返回的结果,还是返回的共享的结果
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) // DoChan 和 Do 类似,只是 DoChan 返回一个 channel,也就是同步与异步的区别
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result// Forget 用于通知 Group 删除某个 key 这样后面继续这个 key 的调用的时候就不会在阻塞等待了
func (g *Group) Forget(key string)
2.2 使用示例
先使用一个普通的例子,这时一个获取文章详情的函数,我们在函数里面使用一个 count 模拟不同并发下的耗时的不同,并发越多请求耗时越多。
func getArticle(id int) (article string, err error) {
// 假设这里会对数据库进行调用, 模拟不同并发下耗时不同
// 使用原子操作保证并发安全
atomic.AddInt32(&count, 1)
time.Sleep(time.Duration(count) * time.Millisecond) return fmt.Sprintf("article: %d", id), nil
}
使用 singlefight 的时候就只需要 new(singleflight.Group) 然后调用以下相对应的
Do
方法func singleflightGetArticle(sg *singleflight.Group, id int) (string, error) {
v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
return getArticle(id)
}) return v.(string), err
}
3. 测试
写一个简单的测试代码,模拟启动1000个goroutine 去并发调用这两个方法
package mainimport (
"fmt"
"sync"
"sync/atomic"
"time" "golang.org/x/sync/singleflight"
)var count int32func getArticle(id int) (article string, err error) {
// 假设这里会对数据库进行调用, 模拟不同并发下耗时不同
atomic.AddInt32(&count, 1)
time.Sleep(time.Duration(count) * time.Millisecond) return fmt.Sprintf("article: %d", id), nil
}func singleFlightGetArticle(sg *singleflight.Group, id int) (string, error) {
v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
return getArticle(id)
}) return v.(string), err
}func main() {
// 使用一个定时器,
time.AfterFunc(1*time.Second, func() {
atomic.AddInt32(&count, -count)
}) var (
wgsync.WaitGroup
now = time.Now()
n= 1000
sg= &singleflight.Group{}
) for i := 0;
i < n;
i++ {
wg.Add(1)
go func() {
res, _ := singleFlightGetArticle(sg, 1)
// res, _ := getArticle(1)
if res != "article: 1" {
panic("err")
}
wg.Done()
}()
} wg.Wait()
fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now))
}
调用 getArticle 方法的耗时,花费了 1s 多
go run demo.go
同时发起 1000 次请求,耗时: 1.0157721s
切换到singleflight方法测试发现花费20ms
go run demo.go
同时发起 1000 次请求,耗时: 21.1962ms
测试发现,使用singleflight这种方式的确在这种场景下能减少执行执行,提高效率。
3.实现原理
这是非内置库,仓库
golang.org/x/sync/singleflight
源码解析如下Group
是一个结构体
type Group struct {
mu sync.Mutex// protects m
mmap[string]*call // lazily initialized
}
Group
结构体由一个互斥锁
和一个 map
组成,可以看到注释 map 是懒加载的,所以 Group 只要声明就可以使用,不用进行额外的初始化零值就可以直接使用。call 保存了当前调用对应的信息,map 的键就是我们调用 Do 方法传入的 keycall 也是一个结构体,结构体如下
type call struct {
wg sync.WaitGroup // 函数的返回值,在 wg 返回前只会写入一次
val interface{}
err error // 使用调用了 Forgot 方法
forgotten bool// 统计调用次数以及返回的 channel
dupsint
chans []chan<- Result
}
Do
Do方法用来执行传入函数
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()// 前面提到的懒加载
if g.m == nil {
g.m = make(map[string]*call)
}// 会先去看 key 是否已经存在
if c, ok := g.m[key];
ok {
// 如果存在就会解锁
c.dups++
g.mu.Unlock()// 然后等待 WaitGroup 执行完毕,只要一执行完,所有的 wait 都会被唤醒
c.wg.Wait()// 这里区分 panic 错误和 runtime 的错误,避免出现死锁,后面可以看到为什么这么做
if e, ok := c.err.(*panicError);
ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}// 如果我们没有找到这个 key 就 new call
c := new(call)// 然后调用 waitgroup 这里只有第一次调用会 add 1,其他的都会调用 wait 阻塞掉
// 所以这要这次调用返回,所有阻塞的调用都会被唤醒
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()// 然后我们调用 doCall 去执行
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
doCall
这个方法种有个技巧值得学习,使用了两个 defer 巧妙的将 runtime 的错误和我们传入 function 的 panic 区别开来避免了由于传入的 function panic 导致的死锁。
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false// 第一个 defer 检查 runtime 错误
defer func() {
...
}()// 使用一个匿名函数来执行
func() {
defer func() {
if !normalReturn {
// 如果 panic 了我们就 recover 掉,然后 new 一个 panic 的错误
// 后面在上层重新 panic
if r := recover();
r != nil {
c.err = newPanicError(r)
}
}
}()c.val, c.err = fn()// 如果 fn 没有 panic 就会执行到这一步,如果 panic 了就不会执行到这一步
// 所以可以通过这个变量来判断是否 panic 了
normalReturn = true
}()// 如果 normalReturn 为 false 就表示,我们的 fn panic 了
// 如果执行到了这一步,也说明我们的 fnrecover 住了,不是直接 runtime exit
if !normalReturn {
recovered = true
}
}
第一个defer函数如下
defer func() {
// 如果既没有正常执行完毕,又没有 recover 那就说明需要直接退出了
if !normalReturn && !recovered {
c.err = errGoexit
} c.wg.Done()
g.mu.Lock()
defer g.mu.Unlock()// 如果已经 forgot 过了,就不要重复删除这个 key 了
if !c.forgotten {
delete(g.m, key)
} if e, ok := c.err.(*panicError);
ok {
// 如果返回的是 panic 错误,为了避免 channel 死锁,我们需要确保这个 panic 无法被恢复
if len(c.chans) > 0 {
go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// 已经准备退出了,也就不用做其他操作了
} else {
// 正常情况下向 channel 写入数据
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
DoChan
Do chan 和 Do 类似,其实就是一个是同步等待,一个是异步返回,主要实现上就是,如果调用 DoChan 会给 call.chans 添加一个 channel 这样等第一次调用执行完毕之后就会循环向这些 channel 写入数据
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key];
ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock() go g.doCall(c, key, fn) return ch
}
Forget
forget 用于手动释放某个 key 下次调用就不会阻塞等待了
func (g *Group) Forget(key string) {
g.mu.Lock()
if c, ok := g.m[key];
ok {
c.forgotten = true
}
delete(g.m, key)
g.mu.Unlock()
}
三.注意事项 3.1 一个阻塞,全员等待
使用 singleflight 我们比较常见的是直接使用 Do 方法,但是这个极端情况下会导致整个程序 hang 住,如果我们的代码出点问题,有一个调用 hang 住了,那么会导致所有的请求都 hang 住
还是之前的例子, 加入一个 select 模拟阻塞
package mainimport (
"fmt"
"sync"
"sync/atomic"
"time" "golang.org/x/sync/singleflight"
)var count2 int32func getArticle2(id int) (article string, err error) {
// 假设这里会对数据库进行调用, 模拟不同并发下耗时不同
atomic.AddInt32(&count2, 1)
time.Sleep(time.Duration(count2) * time.Millisecond) return fmt.Sprintf("article: %d", id), nil
}func singleFlightGetArticle2(sg *singleflight.Group, id int) (string, error) {
v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
// 模拟阻塞
select {}
return getArticle2(id)
}) return v.(string), err
}func main() {
// 使用一个定时器,
time.AfterFunc(1*time.Second, func() {
atomic.AddInt32(&count2, -count2)
}) var (
wgsync.WaitGroup
now = time.Now()
n= 1000
sg= &singleflight.Group{}
) for i := 0;
i < n;
i++ {
wg.Add(1)
go func() {
res, _ := singleFlightGetArticle2(sg, 1)
if res != "article: 1" {
panic("err")
}
wg.Done()
}()
} wg.Wait()
fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now))
}
执行就会发现死锁
fatal error: all goroutines are asleep - deadlock!goroutine 1 [select (no cases)]:
这时候DoChan就能派上用场了,结合 select 做超时控制
func singleflightGetArticle(ctx context.Context, sg *singleflight.Group, id int) (string, error) {
result := sg.DoChan(fmt.Sprintf("%d", id), func() (interface{}, error) {
// 模拟出现问题,hang 住
select {}
return getArticle(id)
}) select {
case r := <-result:
return r.Val.(string), r.Err
case <-ctx.Done():
return "", ctx.Err()
}
}
调用的时候传入一个含 超时的 context 即可
func main() {
// 使用一个定时器,
time.AfterFunc(1*time.Second, func() {
atomic.AddInt32(&count2, -count2)
}) var (
wgsync.WaitGroup
now= time.Now()
n= 1000
sg= &singleflight.Group{}
// 超时控制
ctx, _ = context.WithTimeout(context.Background(), 1*time.Second)
) for i := 0;
i < n;
i++ {
wg.Add(1)
go func() {
res, _ := singleFlightGetArticle2(ctx, sg, 1)
if res != "article: 1" {
panic("err")
}
wg.Done()
}() } wg.Wait()
fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now))
}
执行时就会返回超时错误.
? go run demo2.go
panic: context deadline exceeded
3.2 一个出错,全部出错
本身不是什么问题,因为singleflight就是这么设计的。 但是实际使用的时候,可能并不是想要这样的效果。 比如 如果一次调用要1s. 我们的数据库请求或者下游服务服务可以支撑10rps(即每秒可以支持10次的请求)的请求的时候会导致我们的错误阈值提高。 因为我们可以1s尝试10次,但是用了singleflight之后只能尝试一次。只要出错这段时间内的所有请求都会收到影响。 那这种情况该如何解决呢?
【十一.|十一. Go并发编程--singleflight】我们可以启动一个Goroutine定时forget一下,相当于将rps 从1rps提高到10rps
go func() {
time.Sleep(100 * time.Millisecond)
// logging
g.Forget(key)
}()
四. 使用场景 如开头场景所讲, singleflige 可以有效解决在使用 Redis 对数据库中的数据进行缓存,发生缓存击穿时,大量的流量都会打到数据库上进而影响服务的尾延时。
文章图片
使用singleflight能有效解决这个问题,限制对同一个键值对的多次重复请求,减少对下游的瞬时流量
文章图片
通过一段代码可以查看如何解决这种缓存击穿的问题
type service struct {
requestGroup singleflight.Group
}func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
// request.Hash就是传入的建,请求的哈希在业务上一般表示相同的请求,所以上述代码使用它作为请求的键
v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {
rows, err := // select * from tables
if err != nil {
return nil, err
}
return rows, nil
})
if err != nil {
return nil, err
}
return Response{
rows: rows,
}, nil
}
五. 总结
golang/sync/singleflight.Group.Do
和 golang/sync/singleflight.Group.DoChan
分别提供了同步和异步的调用方式,这让我们使用起来也更加灵活。当我们需要减少对下游的相同请求时,可以使用 golang/sync/singleflight.Group 来增加吞吐量和服务质量,不过在使用的过程中我们也需要注意以下的几个问题:
golang/sync/singleflight.Group.Do
和golang/sync/singleflight.Group.DoChan
一个用于同步阻塞调用传入的函数,一个用于异步调用传入的参数并通过 Channel 接收函数的返回值;golang/sync/singleflight.Group.Forget
可以通知golang/sync/singleflight.Group
在持有的映射表中删除某个键,接下来对该键的调用就不会等待前面的函数返回了;- 一旦调用的函数返回了错误,所有在等待的 Goroutine 也都会接收到同样的错误;
- https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/#singleflight
- https://lailin.xyz/post/go-training-week5-singleflight.html
- https://pkg.go.dev/golang.org/x/sync/singleflight
推荐阅读
- C语言学习|第十一届蓝桥杯省赛 大学B组 C/C++ 第一场
- python青少年编程比赛_第十一届蓝桥杯大赛青少年创意编程组比赛细则
- 推荐系统论文进阶|CTR预估 论文精读(十一)--Deep Interest Evolution Network(DIEN)
- 佳琪(三十一)
- 周一(十一)
- HTML基础--基本概念--跟着李南江学编程
- 七律·游首钢园(新韵)
- 第十一节课打卡|第十一节课打卡 杨小风
- 《将来的你,一定会感谢现在战胜烦恼的自己-------第四章/第十一节/用逆向思维解除烦恼》
- 9月回顾