tunny源代码阅读
线程池——tunny
代码:https://github.com/Jeffail/tunny
代码目录
tunny-master
├── LICENSE
├── README.md
├── go.mod
├── tunny.go
├── tunny_logo.png
├── tunny_test.go
└── worker.go
主要的代码文件有两个,
tunny.go
线程池相关,worker.go
消费者相关逻辑,先看tunny.go
的类对象type Pool struct {
queuedJobs int64// 当前池子中的任务数量ctorfunc() Worker // 用户具体要执行的方法,处理输入参数,获取结果
workers []*workerWrapper // 维护有目前有多少线程运行,即线程池大小的worker集合
reqChan chan workRequest // 维护有多少空闲的线程,只要有空闲的worker就加入到这个channel中workerMut sync.Mutex
}
再看
worker.go
的方法,主要有两个对象type workRequest struct {
// jobChan is used to send the payload to this worker.
jobChan chan<- interface{}// 用于放worker的payloda参数// retChan is used to read the result from this worker.
retChan <-chan interface{} // 用于放worker的处理结果// interruptFunc can be called to cancel a running job. When called it is no
// longer necessary to read from retChan.
interruptFunc func()// 用于取消worker
}type workerWrapper struct {
workerWorker // worker类对象,有客户自己实现的process方法
interruptChan chan struct{} // reqChan is NOT owned by this type, it is used to send requests for work.
reqChan chan<- workRequest // 和Pool的reqChan是一个,当前worker如果空闲,则加入到reqChan中,通知pool// closeChan can be closed in order to cleanly shutdown this worker.
closeChan chan struct{} // 用于通知worker要被关闭掉// closedChan is closed by the run() goroutine when it exits.
closedChan chan struct{} // 用于控制worker是否已经被关闭
}
再看
worker.go
的其它方法:func newWorkerWrapper(reqChan chan<- workRequest, worker Worker,) *workerWrapper {
w := workerWrapper{
worker:worker,
interruptChan: make(chan struct{}),
reqChan:reqChan,
closeChan:make(chan struct{}),
closedChan:make(chan struct{}),
}go w.run()return &w
}
上面是创建
workerWrapper
方法,输入参数【tunny源代码阅读】1)
reqChan
:和pool的reqChan
是同一个,用于控制当前的workerWrapper
是否是空闲的,2)
worker
是具体需要执行的方法对应的接口,worker
的接口方法process
被用户实现;
其中该方法会执行
run
方法,这个方法比较细节,具体代码:func (w *workerWrapper) run() {
jobChan, retChan := make(chan interface{}), make(chan interface{})
defer func() {
w.worker.Terminate()
close(retChan)
close(w.closedChan)
}()for {
// NOTE: Blocking here will prevent the worker from closing down.
w.worker.BlockUntilReady()
select {
case w.reqChan <- workRequest{
jobChan:jobChan,
retChan:retChan,
interruptFunc: w.interrupt,
}:
select {
case payload := <-jobChan:
result := w.worker.Process(payload)
select {
case retChan <- result:
case <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case _, _ = <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case <-w.closeChan:
return
}
}
}
首先创建两个
jobChan和retChan
,jobChan
用于存储需要执行的参数, retChan
用于存储函数执行参数后的结果; 然后有一个defer
函数,将当前worker
关闭,如果pool stop
这个worker
,则会退出。然后有一个for
会一直执行里面的逻辑,case w.reqChan <- workRequest{
jobChan:jobChan,
retChan:retChan,
interruptFunc: w.interrupt,
}:
select {
case payload := <-jobChan:
result := w.worker.Process(payload)
select {
case retChan <- result:
case <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case _, _ = <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
然后会往
reqChan
中插入一个workRequest
,跳到这个case
里面的逻辑: select {
case payload := <-jobChan:
result := w.worker.Process(payload)
select {
case retChan <- result:
case <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case _, _ = <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
这块会阻塞等待往
jobChan
的里面塞一个东西(这块是pool
里面的逻辑),然后payload := <- jobChan
取到这个值,继续走里面的逻辑: result := w.worker.Process(payload)
select {
case retChan <- result:
使用
worker
处理参数,并把结果放入到retChan
中, pool
那边会阻塞等待取到结果,返回给用户。这个
run
方法其实和pool
有很大的关系, 可以接着看pool
的方法,func (p *Pool) Process(payload interface{}) interface{} {
atomic.AddInt64(&p.queuedJobs, 1)
/* 从chan中取出空闲的workerRequest 这块对应 worker run中的
case w.reqChan <- workRequest{
jobChan:jobChan,
retChan:retChan,
interruptFunc: w.interrupt,
}
*/
request, open := <-p.reqChan
if !open {
panic(ErrPoolNotRunning)
}/* 将用户传输的payload传递给request的jobChan, 这块对应 worker run中的
case payload := <-jobChan:
result := w.worker.Process(payload)
*/
request.jobChan <- payload/* 获取request中的结果,这块对应 worker run中的
select {
case retChan <- result:
*/
payload, open = <-request.retChan
if !open {
panic(ErrWorkerClosed)
}atomic.AddInt64(&p.queuedJobs, -1)
return payload
}
上面的这段逻辑基本上是线程池之间数据流转的核心, 除此之外还有些其它功能,例如设置
pool
的大小,需要注意,如果当前的进程数量小于设置的大小,需要增加worker, 否则需要减少, 这块代码如下func (p *Pool) SetSize(n int) {
p.workerMut.Lock() // 加锁
defer p.workerMut.Unlock()lWorkers := len(p.workers) // 计算当前的worker数量
if lWorkers == n {
return
}// Add extra workers if N > len(workers)
for i := lWorkers;
i < n;
i++ {
p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
} // 下面两个for循环都是为了停掉多余的worker,这块调用了stop和join,下面会继续分析,这块是我觉得可以学习的一点
// Asynchronously stop all workers > N
for i := n;
i < lWorkers;
i++ {
p.workers[i].stop()
}// Synchronously wait for all workers > N to stop
for i := n;
i < lWorkers;
i++ {
p.workers[i].join()
}// Remove stopped workers from slice
p.workers = p.workers[:n]
}
下面分析
stop和join
方法, 需要结合 run
方法一起看/*
调用stop,会关闭掉closeChan
这时候run函数的对应逻辑为:
case <-w.closeChan:
return
在返回之前 run有一个defer方法
defer func() {
w.worker.Terminate()
close(retChan)
close(w.closedChan)
}()
*/
func (w *workerWrapper) stop() {
close(w.closeChan)
}/*
这块是上面注释中解释的defer执行结束前, 会一直阻塞。
*/
func (w *workerWrapper) join() {
<-w.closedChan
}func (w *workerWrapper) run() {
jobChan, retChan := make(chan interface{}), make(chan interface{})
defer func() {
w.worker.Terminate()
close(retChan)
close(w.closedChan)
}()for {
// NOTE: Blocking here will prevent the worker from closing down.
w.worker.BlockUntilReady()
select {
case w.reqChan <- workRequest{
jobChan:jobChan,
retChan:retChan,
interruptFunc: w.interrupt,
}:
select {
case payload := <-jobChan:
result := w.worker.Process(payload)
select {
case retChan <- result:
case <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case _, _ = <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case <-w.closeChan:
return
}
}
}
除此以外,源代码中还使用了接口来定义worker, 并实现了closureWorker
type Worker interface {
// Process will synchronously perform a job and return the result.
Process(interface{}) interface{}// BlockUntilReady is called before each job is processed and must block the
// calling goroutine until the Worker is ready to process the next job.
BlockUntilReady()// Interrupt is called when a job is cancelled. The worker is responsible
// for unblocking the Process implementation.
Interrupt()// Terminate is called when a Worker is removed from the processing pool
// and is responsible for cleaning up any held resources.
Terminate()
}//------------------------------------------------------------------------------// closureWorker is a minimal Worker implementation that simply wraps a
// func(interface{}) interface{}
type closureWorker struct {
processor func(interface{}) interface{} // processor 在申明结构体的时候会赋值给用户具体实现的函数
}func (w *closureWorker) Process(payload interface{}) interface{} {
return w.processor(payload) // 非直接处理, 而是用这种委托的方式感觉比较好
}func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt(){}
func (w *closureWorker) Terminate(){}//---------------
值得学习的点:
- 线程池和worker之间的交互使用同一个reqChan, 并且两边分别都阻塞等待下一步执行完。
- worker的关闭使用两个chan来控制
- 将ctor交给每个worker, 使用interface
- 面向接口编程,申明
worker
参考资料
[1] https://www.shangmayuan.com/a/839f766980484fd88facf32c.html
推荐阅读
- 考研英语阅读终极解决方案——阅读理解如何巧拿高分
- Ⅴ爱阅读,亲子互动——打卡第178天
- 上班后阅读开始变成一件奢侈的事
- 历史教学书籍
- 绘本讲师训练营【24期】14/21阅读原创《小黑鱼》
- 21天|21天|M&M《见识》04
- 绘本讲师训练营7期9/21阅读原创《蜗牛屋|绘本讲师训练营7期9/21阅读原创《蜗牛屋 》
- 桂妃研读社|桂妃研读社|D124|如何有效阅读一本书 Day1
- 4.23世界阅读日,樊登读书狂欢放送,听书中成长
- 绘本讲师训练营【28期】15/21阅读原创《活了100万次的猫》