Golang中channel的原理解读(推荐)

数据结构 【Golang中channel的原理解读(推荐)】channel的数据结构在$GOROOT/src/runtime/chan.go文件下:

type hchan struct {qcountuint// 当前队列中剩余元素个数dataqsiz uint// 环形队列长度,即可以存放的元素个数bufunsafe.Pointer // 环形队列指针elemsize uint16// 每个元素的大小closeduint32// 标记是否关闭elemtype *_type// 元素类型sendxuint// 队列下标,指向元素写入时存放到队列中的位置recvxuint// 队列下标,指向元素从队列中读出的位置recvqwaitq// 等待读消息的groutine队列sendqwaitq// 等待写消息的groutine队列lockmutex// 互斥锁}

chan内部实现了一个环形队列作为缓冲区,队列的长度在创建chan时指定:
Golang中channel的原理解读(推荐)
文章图片

等待队列(recvq/sendq)使用双向链表 runtime.waitq 表示,链表中所有的元素都是 runtime.sudog结构:
type waitq struct {first *sudoglast*sudog}type sudog struct {g*gnext*sudogprev*sudogelemunsafe.Pointer // data element (may point to stack)acquiretimeint64releasetimeint64ticketuint32isSelectboolparent*sudog // semaRoot binary treewaitlink*sudog // g.waiting list or semaRootwaittail*sudog // semaRootc*hchan // channel}

创建channel 通常使用make(channel string, 0)的方式创建无缓存的channel,使用make(channel string, 10)创建有缓存的channel。
源码:
func makechan(t *chantype, size int) *hchan {elem := t.elem// compiler checks this but be safe.if elem.size >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")}mem, overflow := math.MulUintptr(elem.size, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}var c *hchanswitch {case mem == 0:// 如果当前 Channel 中不存在缓冲区,那么就只会为 runtime.hchan 分配一段内存空间;c = (*hchan)(mallocgc(hchanSize, nil, true))c.buf = c.raceaddr()case elem.ptrdata =https://www.it610.com/article/= 0:// 如果当前 Channel 中存储的类型不是指针类型,会为当前的 Channel 和底层的数组分配一块连续的内存空间;c = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)default://单独为 runtime.hchan 和缓冲区分配内存;c = new(hchan)c.buf = mallocgc(mem, elem, true)}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)lockInit(&c.lock, lockRankHchan)// 在函数的最后会统一更新elemsize、elemtype 和 dataqsiz 几个字段; if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")}return c}

channel读写 写
  1. 当有新数据来时,首先判断recvq中是否有groutine存在,如果recvq不为空,则说明缓冲区为空,或者没有缓冲区,因为如果缓冲区有数据会被recvq里面的groutine消费。此时从recvq中拿出一个groutine并绑定数据,唤醒该groutine执行任务,这个过程跳过了将数据写入缓冲区的过程。
  2. 如果缓冲区有数据并有空余位置,将数据放入缓冲区。
  3. 如果缓冲区有数据但没有空余位置,当前groutine绑定数据并放入sendx,进入睡眠,等待被唤醒。
Golang中channel的原理解读(推荐)
文章图片

源码:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {.....lock(&c.lock)if c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}// 如果Channel 没有被关闭并且已经有处于读等待的 Goroutine,// 那么从接收队列 recvq 中取出最先陷入等待的 Goroutine 并直接向它发送数据if sg := c.recvq.dequeue(); sg != nil {send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}// 如果recvq为空且缓冲区中还有剩余空间if c.qcount < c.dataqsiz {// 计算出下一个可以存储数据的位置,qp := chanbuf(c, c.sendx)// raceenabled: 是否启用数据竞争检测,在编译时指定,默认为falseif raceenabled {// 发出数据竞争警告raceacquire(qp)racerelease(qp)}// 将发送的数据拷贝到缓冲区中,产生内存拷贝typedmemmove(c.elemtype, qp, ep)// 增加 sendx 索引c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}// 增加计数器c.qcount++unlock(&c.lock)return true}if !block {unlock(&c.lock)return false}// 将channel数据绑定到当前groutine并使groutine休眠// 获取发送数据使用的 Goroutinegp := getg()// 获取 runtime.sudog 结构并设置这一次阻塞发送的相关信息,// 例如发送的 Channel、是否在 select 中和待发送数据的内存地址等mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// 将刚刚创建并初始化的 mysg 加入发送等待队列,并设置到当前 Goroutine的waiting上,// 表示 Goroutine 正在等待该sudog准备就绪mysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nilc.sendq.enqueue(mysg)// 休眠groutinegopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)// 保证传入的数据不被GCKeepAlive(ep)// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif gp.param == nil {if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}gp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)return true}


  1. 如果sendx不为空且缓冲区不为空,从缓冲区头部读出数据并在当前G执行任务,在sendx中拿出一个G,将其数据写入缓冲区尾部并唤醒该G。
  2. 如果sendx不为空且缓冲区为空,直接从sendx中拿出一个G,将G中数据取出并唤醒该G。
  3. 如果sendx为空且缓冲区不为空,则从缓冲区头部拿出一个数据。
  4. 如果sendx为空且缓冲区为空,将该G放入recvq,进入休眠,等待被唤醒。
Golang中channel的原理解读(推荐)
文章图片

源码:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // block:这次接收是否阻塞if debugChan {print("chanrecv: chan=", c, "\n")}if c == nil {if !block {return}// 从一个空 Channel 接收数据时会直接让出处理器的使用权gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.if !block && empty(c) {// 如果channel为空并且未关闭,直接返回if atomic.Load(&c.closed) == 0 {return}if empty(c) {// The channel is irreversibly closed and empty.if raceenabled {raceacquire(c.raceaddr())}if ep != nil {// 手动标记清楚对象typedmemclr(c.elemtype, ep)}return true, false}}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)//如果channel为空,并且已关闭,说明对象不可达if c.closed != 0 && c.qcount == 0 {if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep != nil {// 手动标记清除typedmemclr(c.elemtype, ep)}return true, false}// 如果sendq不为空,直接消费,避免sendq --> queue --> recvx的过程if sg := c.sendq.dequeue(); sg != nil {recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}// 当 Channel 的缓冲区中已经包含数据时,从 Channel 中接收数据会直接从缓冲区中 // recvx 的索引位置中取出数据进行处理if c.qcount > 0 {// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {raceacquire(qp)racerelease(qp)}// 如果接收数据的内存地址不为空,那么会使用 runtime.typedmemmove将缓冲区中的数据拷贝到内存中if ep != nil {typedmemmove(c.elemtype, ep, qp)}// 使用 runtime.typedmemclr清除队列中的数据并完成收尾工作typedmemclr(c.elemtype, qp)c.recvx++// recvx位置归零if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount-- // 计数减一unlock(&c.lock) return true, true}if !block {unlock(&c.lock)return false, false}// 当 sendq不为空 并且缓冲区中也不存在任何数据时,阻塞并休眠当前groutinegp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nilc.recvq.enqueue(mysg)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}closed := gp.param == nilgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, !closed}

到此这篇关于Golang中channel的原理解读的文章就介绍到这了,更多相关Golang channel原理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    推荐阅读