阅读 108

源码分析go channel以及相关的操作

go version: 1.17

本文从源码层面分析channel是如何创建、发送、接收、关闭的。

找到源码位置

package main func main() { ch := make(chan int) ch <- 1 <-ch select { case ch <- 1: default: } select { case <-ch: default: } close(ch) } 复制代码

查看汇编代码:
go tool compile -S -l -N main.go  
输出为(省略了不必要的代码):

(...) 0x0021 00033 (main.go:5)        CALL    runtime.makechan(SB) 0x0026 00038 (main.go:5)        MOVQ    AX, "".ch+32(SP) 0x002b 00043 (main.go:6)        LEAQ    ""..stmp_0(SB), BX 0x0032 00050 (main.go:6)        PCDATA  $1, $1 0x0032 00050 (main.go:6)        CALL    runtime.chansend1(SB) 0x0037 00055 (main.go:7)        MOVQ    "".ch+32(SP), AX 0x003c 00060 (main.go:7)        XORL    BX, BX 0x003e 00062 (main.go:7)        NOP 0x0040 00064 (main.go:7)        CALL    runtime.chanrecv1(SB) (...) 0x0062 00098 (main.go:10)       CALL    runtime.selectnbsend(SB) (...) 0x0080 00128 (main.go:15)       CALL    runtime.selectnbrecv(SB) (...) 0x009a 00154 (main.go:18)       CALL    runtime.closechan(SB) (...) 复制代码

可以得出:

  • makechan : channel的创建

  • chansend1: channel的阻塞发送

  • chanrecv1: channel的阻塞接收

  • selectnbsend: channel的无阻塞发送

  • selectnbrecv: channel的无阻塞接收

  • closechan: channel的关闭

channel结构

type hchan struct {    qcount   uint           // 目前循环队列里数据个数    dataqsiz uint           // 这个循环队列的大小    buf      unsafe.Pointer // 为指针,指向一个大小固定的数组,用来存放channel的数据    elemsize uint16         // channel中元素的大小    closed   uint32         // channel是否关闭,1为关闭    elemtype *_type // channel中元素的类型    sendx    uint   // 发送数据的索引      recvx    uint   // 接收数据的索引    recvq    waitq  // 等待接收的队列,里面放的是goroutne    sendq    waitq  // 等待发送的队列,里面放的是goroutne    // lock protects all fields in hchan, as well as several    // fields in sudogs blocked on this channel.    //    // Do not change another G's status while holding this lock    // (in particular, do not ready a G), as this can deadlock    // with stack shrinking.    lock mutex // 此channel的互斥锁 } 复制代码

sendqrecvq的数据结构如下所示:

type waitq struct {    first *sudog    last  *sudog } 复制代码

image.png

创建channel

const (    maxAlign  = 8    hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) ) func makechan(t *chantype, size int) *hchan {    elem := t.elem    (...)    // 这里判断elem.size * size是否越界。返回值:需要申请的空间大小以及是否越界    mem, overflow := math.MulUintptr(elem.size, uintptr(size))    if overflow || mem > maxAlloc-hchanSize || size < 0 {       panic(plainError("makechan: size out of range"))    }    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.    // buf points into the same allocation, elemtype is persistent.    // SudoG's are referenced from their owning thread so they can't be collected.    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.    var c *hchan    switch {    // 如果无缓存,则不需要创建buf    case mem == 0:       // Queue or element size is zero.       c = (*hchan)(mallocgc(hchanSize, nil, true))       // Race detector uses this location for synchronization.       c.buf = c.raceaddr()    case elem.ptrdata == 0:       // 元素不包含指针,则给hchan和buf分配一块公用的空间,并且buf紧挨着hchan       c = (*hchan)(mallocgc(hchanSize+mem, nil, true))       c.buf = add(unsafe.Pointer(c), hchanSize)    default:       // 元素包含指针,hchan和buf各自单独分配空间       c = new(hchan)       c.buf = mallocgc(mem, elem, true)    }    c.elemsize = uint16(elem.size)    c.elemtype = elem    c.dataqsiz = uint(size)    lockInit(&c.lock, lockRankHchan)    if debugChan {       print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")    }    return c } 复制代码

hchanSize:
意思就是按照maxAlign倍数对齐,大于等于size(hchan),需要申请的最小空间 比如size(hchan)为5字节,maxAlign为8字节,转换成公式为(5 + uintptr( (-5) & (8 - 1)))=8,只需要申请8字节,用来存放hchan

总结流程

image.png

发送数据

阻塞发送

func chansend1(c *hchan, elem unsafe.Pointer) {    chansend(c, elem, true, getcallerpc()) } 复制代码

无阻塞发送

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {    return chansend(c, elem, false, getcallerpc()) } 复制代码

可以看出来这两个函数都是调用chansend,只不过block参数一个为true,一个为false而已.
所以直接分析chansend即可:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    if c == nil {       if !block {          return false       }       gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)       throw("unreachable")    } (...) } 复制代码

逻辑如下
如果是channel空的

  1. 对于非阻塞,直接返回false

  2. 对于阻塞,会调用gopark挂起,不会返回

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    (...)    if !block && c.closed == 0 && full(c) {       return false    }    (...) } 复制代码

func full(c *hchan) bool {    // c.dataqsiz is immutable (never written after the channel is created)    // so it is safe to read at any time during channel operation.    if c.dataqsiz == 0 {       // Assumes that a pointer read is relaxed-atomic.       return c.recvq.first == nil    }    // Assumes that a uint read is relaxed-atomic.    return c.qcount == c.dataqsiz } 复制代码

逻辑如下
对于非阻塞,并且channel没有关闭  满足以下两种情况会直接返回false

  1. 没有缓冲区,并且当前没有接受者

  2. 有缓冲区,并且满了

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {   (...)   // 加锁   lock(&c.lock)   // 若关闭了,则解锁并且panic   if c.closed != 0 {      unlock(&c.lock)      panic(plainError("send on closed channel"))   }   // 从recvq里取一个接收者   if sg := c.recvq.dequeue(); sg != nil {      // Found a waiting receiver. We pass the value we want to send      // directly to the receiver, bypassing the channel buffer (if any).      // 若有接收者,则直接向接收者发送数据,不经过buf      send(c, sg, ep, func() { unlock(&c.lock) }, 3)      return true   }   (...) } 复制代码

看下send函数

// send processes a send operation on an empty channel c. // The value ep sent by the sender is copied to the receiver sg. // The receiver is then woken up to go on its merry way. // Channel c must be empty and locked.  send unlocks c with unlockf. // sg must already be dequeued from c. // ep must be non-nil and point to the heap or the caller's stack. func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {    // c为channel    // sg为接收者    // ep为要发送数据的地址    (...)    // sg.elem指向接收者存数据的地址    if sg.elem != nil {       // 把要发送的数据copy到接收者存数据的地方       sendDirect(c.elemtype, sg, ep)       sg.elem = nil    }    gp := sg.g    unlockf()    gp.param = unsafe.Pointer(sg)    sg.success = true    if sg.releasetime != 0 {       sg.releasetime = cputicks()    }    // 唤醒接收者的goroutine    goready(gp, skip+1) } 复制代码

逻辑如下
recvq中有接收者

  1. 则直接把数据拷贝到接收者存数据的地方

  2. 唤醒接收者的goroutine.

继续往下看:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    (...)    // 如果缓冲区没满,则将发送的数据copy至缓冲区    if c.qcount < c.dataqsiz {       // Space is available in the channel buffer. Enqueue the element to send.       // 找到缓冲区要放数据的位置       qp := chanbuf(c, c.sendx)       (...)       typedmemmove(c.elemtype, qp, ep)       c.sendx++       if c.sendx == c.dataqsiz {          c.sendx = 0       }       c.qcount++       unlock(&c.lock)       return true    }    (...) } 复制代码

逻辑如下
如果缓冲区没满

  1. 则先获得缓冲区中要放数据的位置

  2. 将要发送的数据拷贝到缓冲区

  3. 更新sendx

  4. channel中元素数量+1

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    (...)    // 缓冲区满了,非阻塞直接返回false    if !block {       unlock(&c.lock)       return false    }    // Block on the channel. Some receiver will complete our operation for us.    gp := getg()    // 新建一个sudog对象    mysg := acquireSudog()    mysg.releasetime = 0    if t0 != 0 {       mysg.releasetime = -1    }    // No stack splits between assigning elem and enqueuing mysg    // on gp.waiting where copystack can find it.    // 这里elem指向要发送数据的地址    mysg.elem = ep    mysg.waitlink = nil    mysg.g = gp    mysg.isSelect = false    mysg.c = c    gp.waiting = mysg    gp.param = nil    // 将sudog放入sendq队列中     c.sendq.enqueue(mysg)    // Signal to anyone trying to shrink our stack that we're about    // to park on a channel. The window between when this G's status    // changes and when we set gp.activeStackChans is not safe for    // stack shrinking.    atomic.Store8(&gp.parkingOnChan, 1)    // 进入等待状态    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)    (...) } 复制代码

逻辑如下
缓冲区满了

  1. 非阻塞直接返回false

  2. 阻塞的新建sudog,并放入sendq

总结流程

image.png

接收数据

阻塞接收

// entry points for <- c from compiled code //go:nosplit func chanrecv1(c *hchan, elem unsafe.Pointer) {    chanrecv(c, elem, true) } 复制代码

无阻塞接收

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {    return chanrecv(c, elem, false) } 复制代码

可以看到,最终都是调用chanrecv,所以直接分析chanrecv

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {    (...)    // channel为nil    if c == nil {       // 非阻塞直接返回       if !block {          return       }       // 阻塞就等待       gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)       throw("unreachable")    }    // Fast path: check for failed non-blocking operation without acquiring the lock.    // 非阻塞并且 && (channel无缓冲|| channel无数据)    if !block && empty(c) {       // channel已关闭,返回false,false       if atomic.Load(&c.closed) == 0 {          return       }       // The channel is irreversibly closed. Re-check whether the channel has any pending data       // to receive, which could have arrived between the empty and closed checks above.       // Sequential consistency is also required here, when racing with such a send.       if empty(c) {          // The channel is irreversibly closed and empty.          if raceenabled {             raceacquire(c.raceaddr())          }          if ep != nil {             typedmemclr(c.elemtype, ep)          }           // channel未关闭,返回true,false          return true, false       }    }    (...) } 复制代码

逻辑如下

  1. channel为nil
    a. 非阻塞直接返回 b. 阻塞则等待

  2. 非阻塞并且 && (channel无缓冲|| channel无数据)
    a. channel已关闭,返回false,false
    b. channel未关闭,返回true,false

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {     (...)     //  上锁     lock(&c.lock)     // 再次判断channel已关闭,并且channel中无数据,则直接返回false,false     if c.closed != 0 && c.qcount == 0 {        if raceenabled {           raceacquire(c.raceaddr())        }        unlock(&c.lock)        if ep != nil {           typedmemclr(c.elemtype, ep)        }        return true, false     }     // sendq里有接收者     if sg := c.sendq.dequeue(); sg != nil {        // Found a waiting sender. If buffer is size 0, receive value        // directly from sender. Otherwise, receive from head of queue        // and add sender's value to the tail of the queue (both map to        // the same buffer slot because the queue is full).        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)        return true, true     }     (...) } 复制代码

// recv processes a receive operation on a full channel c. // There are 2 parts: // 1) The value sent by the sender sg is put into the channel //    and the sender is woken up to go on its merry way. // 2) The value received by the receiver (the current G) is //    written to ep. // For synchronous channels, both values are the same. // For asynchronous channels, the receiver gets its data from // the channel buffer and the sender's data is put in the // channel buffer. // Channel c must be full and locked. recv unlocks c with unlockf. // sg must already be dequeued from c. // A non-nil ep must point to the heap or the caller's stack. func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {    // 无缓冲区    if c.dataqsiz == 0 {       (...)       if ep != nil {          // copy data from sender          // 直接从发送者copy数据到ep          recvDirect(c.elemtype, sg, ep)       }    } else {       // 有缓冲区       qp := chanbuf(c, c.recvx)       (...)       // 因为既然sendq不为空,所以缓冲区里一定有数据       // 从缓冲期copy数据到ep       if ep != nil {          typedmemmove(c.elemtype, ep, qp)       }       // 将发送者到数据copy到缓冲区       typedmemmove(c.elemtype, qp, sg.elem)       c.recvx++       if c.recvx == c.dataqsiz {          c.recvx = 0       }       c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz    }    sg.elem = nil    gp := sg.g    unlockf()    gp.param = unsafe.Pointer(sg)    sg.success = true    if sg.releasetime != 0 {       sg.releasetime = cputicks()    }    // 唤醒sender的goroutine    goready(gp, skip+1) } 复制代码

逻辑如下

  1. 再次判断channel已关闭,并且channel中无数据,则直接返回false,false

  2. sendq里有等待的sender
    a. channel无缓冲区,直接将sender的数据copy到ep b. channel有缓冲区,将缓冲区到数据copy到ep,再将sender的数据copy到缓冲区

  3. 唤起sender到goroutine

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {     (...)     // 缓冲区有数据     if c.qcount > 0 {        // Receive directly from queue        qp := chanbuf(c, c.recvx)        (...)        // 直接将缓冲区数据copy到ep        if ep != nil {           typedmemmove(c.elemtype, ep, qp)        }        // 清空当前缓冲区刚刚读数据的地方        typedmemclr(c.elemtype, qp)        c.recvx++        if c.recvx == c.dataqsiz {           c.recvx = 0        }        c.qcount--        unlock(&c.lock)        return true, true     }     (...) } 复制代码

逻辑如下
缓冲区有数据

  1. 直接将缓冲区数据copy到ep

  2. 清空当前缓冲区刚刚读数据的地方

  3. 更新c.recvx

  4. c.qcount--

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {     (...)     // 缓从区满了,无阻塞channel直接返回     if !block {        unlock(&c.lock)        return false, false     }     // no sender available: block on this channel.     gp := getg()     mysg := acquireSudog()     mysg.releasetime = 0     if t0 != 0 {        mysg.releasetime = -1     }     // No stack splits between assigning elem and enqueuing mysg     // on gp.waiting where copystack can find it.     mysg.elem = ep     mysg.waitlink = nil     gp.waiting = mysg     mysg.g = gp     mysg.isSelect = false     mysg.c = c     gp.param = nil     c.recvq.enqueue(mysg)     // Signal to anyone trying to shrink our stack that we're about     // to park on a channel. The window between when this G's status     // changes and when we set gp.activeStackChans is not safe for     // stack shrinking.     atomic.Store8(&gp.parkingOnChan, 1)     gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)     (...) } 复制代码

逻辑如下

  1. 缓冲区满了,无阻塞channel直接返回

  2. 阻塞channel的话
    a. 新建sudog b. 将sudog放入recvq c. 调用gopark,等待

总结流程

image.png

关闭channel

func closechan(c *hchan) {    // channel为nil,panic    if c == nil {       panic(plainError("close of nil channel"))    }    lock(&c.lock)    // channel已关闭,panic    if c.closed != 0 {       unlock(&c.lock)       panic(plainError("close of closed channel"))    }    (...)    // close设置为1    c.closed = 1    var glist gList    // release all readers    // 将所有接收者放入glist    for {       sg := c.recvq.dequeue()       if sg == nil {          break       }       if sg.elem != nil {          typedmemclr(c.elemtype, sg.elem)          sg.elem = nil       }       if sg.releasetime != 0 {          sg.releasetime = cputicks()       }       gp := sg.g       gp.param = unsafe.Pointer(sg)       sg.success = false       if raceenabled {          raceacquireg(gp, c.raceaddr())       }       glist.push(gp)    }    // release all writers (they will panic)     // 将所有发送者放入glist    for {       sg := c.sendq.dequeue()       if sg == nil {          break       }       sg.elem = nil       if sg.releasetime != 0 {          sg.releasetime = cputicks()       }       gp := sg.g       gp.param = unsafe.Pointer(sg)       sg.success = false       if raceenabled {          raceacquireg(gp, c.raceaddr())       }       glist.push(gp)    }    unlock(&c.lock)    // Ready all Gs now that we've dropped the channel lock.    // 唤醒所有接收者以及发送者的goroutine,但是唤醒发送者会导致panic    for !glist.empty() {       gp := glist.pop()       gp.schedlink = 0       goready(gp, 3)    } }


作者:zxx
链接:https://juejin.cn/post/7017723784002175006

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐