源码分析go channel以及相关的操作
go version: 1.17
本文从源码层面分析channel是如何创建、发送、接收、关闭的。
找到源码位置
package main func main() { ch := make(chan int) ch <- 1 <-ch select { case ch <- 1: default: } select { case <-ch: default: } close(ch) } 复制代码
查看汇编代码:
go tool compile -S -l -N main.go
输出为(省略了不必要的代码):
(...) 0x0021 00033 (main.go:5) CALL runtime.makechan(SB) 0x0026 00038 (main.go:5) MOVQ AX, "".ch+32(SP) 0x002b 00043 (main.go:6) LEAQ ""..stmp_0(SB), BX 0x0032 00050 (main.go:6) PCDATA $1, $1 0x0032 00050 (main.go:6) CALL runtime.chansend1(SB) 0x0037 00055 (main.go:7) MOVQ "".ch+32(SP), AX 0x003c 00060 (main.go:7) XORL BX, BX 0x003e 00062 (main.go:7) NOP 0x0040 00064 (main.go:7) CALL runtime.chanrecv1(SB) (...) 0x0062 00098 (main.go:10) CALL runtime.selectnbsend(SB) (...) 0x0080 00128 (main.go:15) CALL runtime.selectnbrecv(SB) (...) 0x009a 00154 (main.go:18) CALL runtime.closechan(SB) (...) 复制代码
可以得出:
makechan
: channel的创建chansend1
: channel的阻塞
发送chanrecv1
: channel的阻塞
接收selectnbsend
: channel的无阻塞
发送selectnbrecv
: channel的无阻塞
接收closechan
: channel的关闭
channel结构
type hchan struct { qcount uint // 目前循环队列里数据个数 dataqsiz uint // 这个循环队列的大小 buf unsafe.Pointer // 为指针,指向一个大小固定的数组,用来存放channel的数据 elemsize uint16 // channel中元素的大小 closed uint32 // channel是否关闭,1为关闭 elemtype *_type // channel中元素的类型 sendx uint // 发送数据的索引 recvx uint // 接收数据的索引 recvq waitq // 等待接收的队列,里面放的是goroutne sendq waitq // 等待发送的队列,里面放的是goroutne // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex // 此channel的互斥锁 } 复制代码
sendq
和recvq
的数据结构如下所示:
type waitq struct { first *sudog last *sudog } 复制代码
创建channel
const ( maxAlign = 8 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) ) func makechan(t *chantype, size int) *hchan { elem := t.elem (...) // 这里判断elem.size * size是否越界。返回值:需要申请的空间大小以及是否越界 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { // 如果无缓存,则不需要创建buf case mem == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // 元素不包含指针,则给hchan和buf分配一块公用的空间,并且buf紧挨着hchan c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // 元素包含指针,hchan和buf各自单独分配空间 c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c } 复制代码
hchanSize
:
意思就是按照maxAlign
倍数对齐,大于等于size(hchan)
,需要申请的最小空间 比如size(hchan)为5字节,maxAlign为8字节,转换成公式为(5 + uintptr( (-5) & (8 - 1)))=8
,只需要申请8字节,用来存放hchan
总结流程
发送数据
阻塞发送
func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc()) } 复制代码
无阻塞发送
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc()) } 复制代码
可以看出来这两个函数都是调用chansend
,只不过block
参数一个为true
,一个为false
而已.
所以直接分析chansend
即可:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } (...) } 复制代码
逻辑如下
如果是channel空的
对于非阻塞,直接返回false
对于阻塞,会调用gopark挂起,不会返回
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { (...) if !block && c.closed == 0 && full(c) { return false } (...) } 复制代码
func full(c *hchan) bool { // c.dataqsiz is immutable (never written after the channel is created) // so it is safe to read at any time during channel operation. if c.dataqsiz == 0 { // Assumes that a pointer read is relaxed-atomic. return c.recvq.first == nil } // Assumes that a uint read is relaxed-atomic. return c.qcount == c.dataqsiz } 复制代码
逻辑如下
对于非阻塞,并且channel没有关闭
满足以下两种情况会直接返回false
没有缓冲区,并且当前没有接受者
有缓冲区,并且满了
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { (...) // 加锁 lock(&c.lock) // 若关闭了,则解锁并且panic if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // 从recvq里取一个接收者 if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). // 若有接收者,则直接向接收者发送数据,不经过buf send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } (...) } 复制代码
看下send
函数
// send processes a send operation on an empty channel c. // The value ep sent by the sender is copied to the receiver sg. // The receiver is then woken up to go on its merry way. // Channel c must be empty and locked. send unlocks c with unlockf. // sg must already be dequeued from c. // ep must be non-nil and point to the heap or the caller's stack. func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // c为channel // sg为接收者 // ep为要发送数据的地址 (...) // sg.elem指向接收者存数据的地址 if sg.elem != nil { // 把要发送的数据copy到接收者存数据的地方 sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 唤醒接收者的goroutine goready(gp, skip+1) } 复制代码
逻辑如下
recvq
中有接收者
则直接把数据拷贝到接收者存数据的地方
唤醒接收者的goroutine.
继续往下看:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { (...) // 如果缓冲区没满,则将发送的数据copy至缓冲区 if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. // 找到缓冲区要放数据的位置 qp := chanbuf(c, c.sendx) (...) typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } (...) } 复制代码
逻辑如下
如果缓冲区没满
则先获得缓冲区中要放数据的位置
将要发送的数据拷贝到缓冲区
更新sendx
channel中元素数量+1
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { (...) // 缓冲区满了,非阻塞直接返回false if !block { unlock(&c.lock) return false } // Block on the channel. Some receiver will complete our operation for us. gp := getg() // 新建一个sudog对象 mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. // 这里elem指向要发送数据的地址 mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil // 将sudog放入sendq队列中 c.sendq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) // 进入等待状态 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) (...) } 复制代码
逻辑如下
缓冲区满了
非阻塞直接返回false
阻塞的新建sudog,并放入sendq
总结流程
接收数据
阻塞接收
// entry points for <- c from compiled code //go:nosplit func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } 复制代码
无阻塞接收
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) { return chanrecv(c, elem, false) } 复制代码
可以看到,最终都是调用chanrecv
,所以直接分析chanrecv
。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { (...) // channel为nil if c == nil { // 非阻塞直接返回 if !block { return } // 阻塞就等待 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // Fast path: check for failed non-blocking operation without acquiring the lock. // 非阻塞并且 && (channel无缓冲|| channel无数据) if !block && empty(c) { // channel已关闭,返回false,false if atomic.Load(&c.closed) == 0 { return } // The channel is irreversibly closed. Re-check whether the channel has any pending data // to receive, which could have arrived between the empty and closed checks above. // Sequential consistency is also required here, when racing with such a send. if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { typedmemclr(c.elemtype, ep) } // channel未关闭,返回true,false return true, false } } (...) } 复制代码
逻辑如下
channel为nil
a. 非阻塞直接返回 b. 阻塞则等待非阻塞并且 && (channel无缓冲|| channel无数据)
a. channel已关闭,返回false,false
b. channel未关闭,返回true,false
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { (...) // 上锁 lock(&c.lock) // 再次判断channel已关闭,并且channel中无数据,则直接返回false,false if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } // sendq里有接收者 if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } (...) } 复制代码
// recv processes a receive operation on a full channel c. // There are 2 parts: // 1) The value sent by the sender sg is put into the channel // and the sender is woken up to go on its merry way. // 2) The value received by the receiver (the current G) is // written to ep. // For synchronous channels, both values are the same. // For asynchronous channels, the receiver gets its data from // the channel buffer and the sender's data is put in the // channel buffer. // Channel c must be full and locked. recv unlocks c with unlockf. // sg must already be dequeued from c. // A non-nil ep must point to the heap or the caller's stack. func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 无缓冲区 if c.dataqsiz == 0 { (...) if ep != nil { // copy data from sender // 直接从发送者copy数据到ep recvDirect(c.elemtype, sg, ep) } } else { // 有缓冲区 qp := chanbuf(c, c.recvx) (...) // 因为既然sendq不为空,所以缓冲区里一定有数据 // 从缓冲期copy数据到ep if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 将发送者到数据copy到缓冲区 typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 唤醒sender的goroutine goready(gp, skip+1) } 复制代码
逻辑如下
再次判断channel已关闭,并且channel中无数据,则直接返回false,false
sendq里有等待的sender
a. channel无缓冲区,直接将sender的数据copy到ep b. channel有缓冲区,将缓冲区到数据copy到ep,再将sender的数据copy到缓冲区唤起sender到goroutine
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { (...) // 缓冲区有数据 if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) (...) // 直接将缓冲区数据copy到ep if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 清空当前缓冲区刚刚读数据的地方 typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } (...) } 复制代码
逻辑如下
缓冲区有数据
直接将缓冲区数据copy到ep
清空当前缓冲区刚刚读数据的地方
更新c.recvx
c.qcount--
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { (...) // 缓从区满了,无阻塞channel直接返回 if !block { unlock(&c.lock) return false, false } // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) (...) } 复制代码
逻辑如下
缓冲区满了,无阻塞channel直接返回
阻塞channel的话
a. 新建sudog b. 将sudog放入recvq c. 调用gopark,等待
总结流程
关闭channel
func closechan(c *hchan) { // channel为nil,panic if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) // channel已关闭,panic if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } (...) // close设置为1 c.closed = 1 var glist gList // release all readers // 将所有接收者放入glist for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // release all writers (they will panic) // 将所有发送者放入glist for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. // 唤醒所有接收者以及发送者的goroutine,但是唤醒发送者会导致panic for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }
作者:zxx
链接:https://juejin.cn/post/7017723784002175006