tunny源代码阅读

线程池——tunny 代码:https://github.com/Jeffail/tunny

代码目录 tunny-master ├── LICENSE ├── README.md ├── go.mod ├── tunny.go ├── tunny_logo.png ├── tunny_test.go └── worker.go

主要的代码文件有两个,tunny.go 线程池相关,worker.go 消费者相关逻辑,先看tunny.go的类对象
type Pool struct { queuedJobs int64// 当前池子中的任务数量ctorfunc() Worker // 用户具体要执行的方法,处理输入参数,获取结果 workers []*workerWrapper // 维护有目前有多少线程运行,即线程池大小的worker集合 reqChan chan workRequest // 维护有多少空闲的线程,只要有空闲的worker就加入到这个channel中workerMut sync.Mutex }

再看worker.go的方法,主要有两个对象
type workRequest struct { // jobChan is used to send the payload to this worker. jobChan chan<- interface{}// 用于放worker的payloda参数// retChan is used to read the result from this worker. retChan <-chan interface{} // 用于放worker的处理结果// interruptFunc can be called to cancel a running job. When called it is no // longer necessary to read from retChan. interruptFunc func()// 用于取消worker }type workerWrapper struct { workerWorker // worker类对象,有客户自己实现的process方法 interruptChan chan struct{} // reqChan is NOT owned by this type, it is used to send requests for work. reqChan chan<- workRequest // 和Pool的reqChan是一个,当前worker如果空闲,则加入到reqChan中,通知pool// closeChan can be closed in order to cleanly shutdown this worker. closeChan chan struct{} // 用于通知worker要被关闭掉// closedChan is closed by the run() goroutine when it exits. closedChan chan struct{} // 用于控制worker是否已经被关闭 }

再看worker.go的其它方法:
func newWorkerWrapper(reqChan chan<- workRequest, worker Worker,) *workerWrapper { w := workerWrapper{ worker:worker, interruptChan: make(chan struct{}), reqChan:reqChan, closeChan:make(chan struct{}), closedChan:make(chan struct{}), }go w.run()return &w }

上面是创建workerWrapper方法,输入参数
【tunny源代码阅读】1)reqChan:和pool的reqChan是同一个,用于控制当前的workerWrapper是否是空闲的,
2)worker是具体需要执行的方法对应的接口,worker的接口方法process被用户实现;
其中该方法会执行run方法,这个方法比较细节,具体代码:
func (w *workerWrapper) run() { jobChan, retChan := make(chan interface{}), make(chan interface{}) defer func() { w.worker.Terminate() close(retChan) close(w.closedChan) }()for { // NOTE: Blocking here will prevent the worker from closing down. w.worker.BlockUntilReady() select { case w.reqChan <- workRequest{ jobChan:jobChan, retChan:retChan, interruptFunc: w.interrupt, }: select { case payload := <-jobChan: result := w.worker.Process(payload) select { case retChan <- result: case <-w.interruptChan: w.interruptChan = make(chan struct{}) } case _, _ = <-w.interruptChan: w.interruptChan = make(chan struct{}) } case <-w.closeChan: return } } }

首先创建两个jobChan和retChan,jobChan用于存储需要执行的参数, retChan用于存储函数执行参数后的结果; 然后有一个defer函数,将当前worker关闭,如果pool stop 这个worker,则会退出。然后有一个for会一直执行里面的逻辑,
case w.reqChan <- workRequest{ jobChan:jobChan, retChan:retChan, interruptFunc: w.interrupt, }: select { case payload := <-jobChan: result := w.worker.Process(payload) select { case retChan <- result: case <-w.interruptChan: w.interruptChan = make(chan struct{}) } case _, _ = <-w.interruptChan: w.interruptChan = make(chan struct{}) }

然后会往reqChan中插入一个workRequest,跳到这个case里面的逻辑:
select { case payload := <-jobChan: result := w.worker.Process(payload) select { case retChan <- result: case <-w.interruptChan: w.interruptChan = make(chan struct{}) } case _, _ = <-w.interruptChan: w.interruptChan = make(chan struct{}) }

这块会阻塞等待往jobChan的里面塞一个东西(这块是pool里面的逻辑),然后payload := <- jobChan取到这个值,继续走里面的逻辑:
result := w.worker.Process(payload) select { case retChan <- result:

使用worker处理参数,并把结果放入到retChan中, pool那边会阻塞等待取到结果,返回给用户。
这个run方法其实和pool有很大的关系, 可以接着看pool的方法,
func (p *Pool) Process(payload interface{}) interface{} { atomic.AddInt64(&p.queuedJobs, 1) /* 从chan中取出空闲的workerRequest 这块对应 worker run中的 case w.reqChan <- workRequest{ jobChan:jobChan, retChan:retChan, interruptFunc: w.interrupt, } */ request, open := <-p.reqChan if !open { panic(ErrPoolNotRunning) }/* 将用户传输的payload传递给request的jobChan, 这块对应 worker run中的 case payload := <-jobChan: result := w.worker.Process(payload) */ request.jobChan <- payload/* 获取request中的结果,这块对应 worker run中的 select { case retChan <- result: */ payload, open = <-request.retChan if !open { panic(ErrWorkerClosed) }atomic.AddInt64(&p.queuedJobs, -1) return payload }

上面的这段逻辑基本上是线程池之间数据流转的核心, 除此之外还有些其它功能,例如设置 pool的大小,需要注意,如果当前的进程数量小于设置的大小,需要增加worker, 否则需要减少, 这块代码如下
func (p *Pool) SetSize(n int) { p.workerMut.Lock() // 加锁 defer p.workerMut.Unlock()lWorkers := len(p.workers) // 计算当前的worker数量 if lWorkers == n { return }// Add extra workers if N > len(workers) for i := lWorkers; i < n; i++ { p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor())) } // 下面两个for循环都是为了停掉多余的worker,这块调用了stop和join,下面会继续分析,这块是我觉得可以学习的一点 // Asynchronously stop all workers > N for i := n; i < lWorkers; i++ { p.workers[i].stop() }// Synchronously wait for all workers > N to stop for i := n; i < lWorkers; i++ { p.workers[i].join() }// Remove stopped workers from slice p.workers = p.workers[:n] }

下面分析stop和join方法, 需要结合 run方法一起看
/* 调用stop,会关闭掉closeChan 这时候run函数的对应逻辑为: case <-w.closeChan: return 在返回之前 run有一个defer方法 defer func() { w.worker.Terminate() close(retChan) close(w.closedChan) }() */ func (w *workerWrapper) stop() { close(w.closeChan) }/* 这块是上面注释中解释的defer执行结束前, 会一直阻塞。 */ func (w *workerWrapper) join() { <-w.closedChan }func (w *workerWrapper) run() { jobChan, retChan := make(chan interface{}), make(chan interface{}) defer func() { w.worker.Terminate() close(retChan) close(w.closedChan) }()for { // NOTE: Blocking here will prevent the worker from closing down. w.worker.BlockUntilReady() select { case w.reqChan <- workRequest{ jobChan:jobChan, retChan:retChan, interruptFunc: w.interrupt, }: select { case payload := <-jobChan: result := w.worker.Process(payload) select { case retChan <- result: case <-w.interruptChan: w.interruptChan = make(chan struct{}) } case _, _ = <-w.interruptChan: w.interruptChan = make(chan struct{}) } case <-w.closeChan: return } } }

除此以外,源代码中还使用了接口来定义worker, 并实现了closureWorker
type Worker interface { // Process will synchronously perform a job and return the result. Process(interface{}) interface{}// BlockUntilReady is called before each job is processed and must block the // calling goroutine until the Worker is ready to process the next job. BlockUntilReady()// Interrupt is called when a job is cancelled. The worker is responsible // for unblocking the Process implementation. Interrupt()// Terminate is called when a Worker is removed from the processing pool // and is responsible for cleaning up any held resources. Terminate() }//------------------------------------------------------------------------------// closureWorker is a minimal Worker implementation that simply wraps a // func(interface{}) interface{} type closureWorker struct { processor func(interface{}) interface{} // processor 在申明结构体的时候会赋值给用户具体实现的函数 }func (w *closureWorker) Process(payload interface{}) interface{} { return w.processor(payload) // 非直接处理, 而是用这种委托的方式感觉比较好 }func (w *closureWorker) BlockUntilReady() {} func (w *closureWorker) Interrupt(){} func (w *closureWorker) Terminate(){}//---------------

值得学习的点:
  1. 线程池和worker之间的交互使用同一个reqChan, 并且两边分别都阻塞等待下一步执行完。
  2. worker的关闭使用两个chan来控制
  3. 将ctor交给每个worker, 使用interface
  4. 面向接口编程,申明worker
欢迎一起探讨:微信联系方式 13161411563
参考资料
[1] https://www.shangmayuan.com/a/839f766980484fd88facf32c.html

    推荐阅读