数据结构
channel内部的数据结构表示如下:
type hchan struct {
qcount uint // channel中元素的个数
dataqsiz uint // channel中循环队列的长度
buf unsafe.Pointer // channel缓冲数据指针
elemsize uint16 // 收发的元素大小
closed uint32
elemtype *_type // 收发的元素类型
sendx uint // channel的发送操作处理到缓冲位置
recvx uint // channel的接收操作处理到缓冲位置
recvq waitq // 当前因为缓冲区不足而阻塞的Gorontine列表
sendq waitq
lock mutex
}
阻塞的 GoRoutine 等待队列使用双向链表表示,链表中所有的元素是 runtime.sudog
结构:
type waitq struct {
first *sudog
last *sudog
}
该结构中存储了两个分别指向前后 runtime.sudog
的指针构成的链表。
创建channel
Go 语言中所有的 channel 都使用 make
关键字进行创建,并转换为 runtime.makechan
或者 runtime.makechan64
的调用,后者用于处理缓冲区大于 $2^{32}$ 的情况。
// 根据收发元素的类型和缓冲区的大小进行初始化
func makechan(t *chantype, size int) *hchan {
elem := t.elem
mem, _ := math.MulUintptr(elem.size, uintptr(size))
var c *hchan
switch {
case mem == 0: // 不存在缓冲区
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
上面的代码就是根据收发元素的类型和缓冲区大小初始化 channel 的代码:
- 如果当前的channel不存在缓冲区,那么就只为
runtime.hchan
分配内存。 - 如果存储的不是指针类型,那么为channel和底层数组分配一段连续的空间,并将缓冲区指针
hchan.buf
指向底层数组。 - 默认情况下分别为
runtime.hchan
和缓冲区分配内存。
完成内存的分配工作之后对 hchan
中的几个字段进行更新。
发送数据
当向channel发送数据时,我们使用 ch<-i
语句,该语句最终会转换为对 runtime.chansend()
的调用,并传入 channel 和需要发送的数据。
下面来逐步分析发送逻辑:
// block为true表示当前的发送操作是阻塞的
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock) // 为当前的channel加锁
if c.closed != 0 { // 检查是否是在一个关闭的channel上发送数据
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
在进入发送逻辑之前首先需要为当前的 channel 加锁,防止多个协程并发修改数据。然后检查是否是在已经关闭的channl上发送数据,如果是,那么就解锁,并触发宕机。
直接发送
当存在等待的接受者时,通过 runtime.send
直接将数据发送给阻塞的接收者。具体来说,runtime.chansend
函数会直接从阻塞的接收队列 recvq
中取最先陷入等待的goroutine,并直接向其发送数据。
// 如果有因为等待接收数据阻塞的gorountine
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
runtime.send
函数的代码如下:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1)
}
该函数遵循下面的逻辑:
- 调用
runtime.sendDirect
将发送的数据x = <-c
直接拷贝到表达式中变量x
所在的内存地址上。 - 调用
runtime.goready
将等待接收数据的Goroutine标记为可运行状态**Grunnable**
,并把该Goroutine放到发送方所在的处理器的runnext
上等待执行 (程序并没有立即执行该Goroutine),该处理器在下一次调度时会立即唤醒数据的接收方。
缓冲区
如果创建的 channel 包含缓冲区,并且 channel 中的数据没有装满,那么会执行下面的逻辑:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 缓冲区未满
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
//buf 是一个循环数组,所以当 sendx 等于 dataqsiz 时会重新回到数组开始的位置
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}
在上面的逻辑中首先通过 runtime.chanbuf
计算下一个可以存储数据的位置,即索引 sendx
,然后通过 runtime.typedmemmove
将发送数据拷贝到缓冲区 (索引 send
所指的位置),并增加 sendx
和 qcount
的值,最后解锁。
阻塞发送
如果当前的 channel 没有缓冲区时,向其发送数据会造成阻塞。向 channel 阻塞地发送数据会执行下面的逻辑:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 如果是非阻塞的发送就解锁,退出
if !block {
unlock(&c.lock)
return false
}
gp := getg()
mysg := acquireSudog() // 获取 runtime.sudog 结构
mysg.elem = ep
mysg.g = gp
mysg.c = c
gp.waiting = mysg
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
gp.waiting = nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true
}
阻塞发送的逻辑如下:
- 调用
runtime.gettg
获取发送数据使用的goroutine; - 通过
runtime.acquireSudog
获取sudog
结构, 被设置阻塞发送关的信息; - 将创建的
sudog
加入发送等待队列, 并设置到发送数据的goroutine的waiting上;表示该goroutine正在等待该sudog
准备就绪; - 调用
runtime.goparkkunlock
将当前的goroutine陷入睡眠等待唤醒; - 被调度器唤醒之后将一些属性设置为
nil
,并释放sudog
最后返回true
表示成功发送了数据。
小结
我们在这里可以简单梳理和总结一下使用 ch <- i
表达式向 Channel 发送数据时遇到的几种情况:
- 如果当前 Channel 的 recvq 上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前 Goroutine 并将其设置成下一个运行的 Goroutine。
- 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们会直接将数据存储到缓冲区 sendx 所在的位置上。
- 如果不满足上面的两种情况,会创建一个 runtime.sudog 结构并将其加入 Channel 的 sendq 队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据。
发送数据的过程中包含几个会触发 Goroutine 调度的时机:
- 发送数据时发现 Channel 上存在等待接收数据的 Goroutine,立刻设置处理器的 runnext 属性,但是并不会立刻触发调度;
- 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入 Channel 的 sendq 队列并调用 runtime.goparkunlock 触发 Goroutine 的调度让出处理器的使用权;
接收数据
Go语言中使用两种方式接收数据,分别是 i<-ch
, i, ok <- ch
。两种方式最终都会转换成为对 runtime.chanrecv
的调用。
下面逐步分析接收逻辑:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
如果我们从一个空的 channel 中接收数据,那么会直接调用 runtime.gopark
让出处理器的使用权。如果当前的 channel 已经被关闭,并且缓冲区中不包含任何数据,那么会清除 ep
指针中的数据,并立即返回。
直接接收
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
当 channel 的等待队列中包含处于等待状态的 gorountine 时,该函数直接取出队头的 goroutine,类似于发送的逻辑,只是调用的是 runtime.recv
:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 { // 没有缓冲区
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx) // 接收的内存地址
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1)
}
runtime.recv
函数根据缓冲区的大小分别处理:
- 如果channel不存在缓冲区
- 直接调用
runtime.recvDirect
将channel发送队列中Goroutine存储的elem数据拷贝到目标内存地址中.
- 如果channel存在缓冲区
- 将队列中的数据拷贝到接收方的内存地址;
- 将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方
两种情况中,都会调用runtime.goready
将当前处理器的runnext
设置称发送数据的goroutine, 并在调度器下一次调度时将阻塞的发送方唤醒.
缓冲区
当channel的缓冲区已经包含数据时,从中取数据会直接从缓冲区中的recvx
索引的位置中取数据进行处理。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp) // 清除队列中的数据
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
return true, true
}
...
}
如果接收数据的内存地址不为空,那么就使用 runtime.typedmemmove
将缓冲区中的数据拷贝到内存中,清除队列中的数据,并递增 recv
, 递减qcount
,并释放持有的channel锁。
阻塞接收
当 Channel 的发送队列中不存在等待的 Goroutine 并且缓冲区中也不存在任何数据时,从管道中接收数据的操作会变成阻塞的,然而不是所有的接收操作都是阻塞的,与 select 语句结合使用时就可能会使用到非阻塞的接收操作:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block {
unlock(&c.lock)
return false, false
}
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
gp.waiting = mysg
mysg.g = gp
mysg.c = c
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
gp.waiting = nil
closed := gp.param == nil
gp.param = nil
releaseSudog(mysg)
return true, !closed
}
在正常的接收场景中,我们会使用 runtime.sudog 将当前 Goroutine 包装成一个处于等待状态的 Goroutine 并将其加入到接收队列中。
完成入队之后,上述代码还会调用 runtime.goparkunlock 立刻触发 Goroutine 的调度,让出处理器的使用权并等待调度器的调度。
小结
我们梳理一下从 Channel 中接收数据时可能会发生的五种情况:
- 如果 Channel 为空,那么会直接调用 runtime.gopark 挂起当前 Goroutine;
- 如果 Channel 已经关闭并且缓冲区没有任何数据,
runtime.chanrecv
会直接返回; - 如果 Channel 的 sendq 队列中存在挂起的 Goroutine,会将
recvx
索引所在的数据拷贝到接收变量所在的内存空间上并将sendq
队列中 Goroutine 的数据拷贝到缓冲区(语义上保证FIFO); - 如果 Channel 的缓冲区中包含数据,那么直接读取
recvx
索引对应的数据; 在默认情况下会挂起当前的 Goroutine,将runtime.sudog
结构加入recvq
队列并陷入休眠等待调度器的唤醒;
我们总结一下从 Channel 接收数据时,会触发 Goroutine 调度的两个时机:
- 当 Channel 为空时;
- 当缓冲区中不存在数据并且也不存在数据的发送者时;
关闭channel
channel的关闭会转换为runtime.closechan
的调用.
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
首先,如果关闭的是一个空指针或者是一个已经被关闭的channel会直接触发宕机。
处理完上诉的异常逻辑之后,就可以执行关闭channel的逻辑,关键代码如下:
c.closed = 1
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
gp := sg.g
gp.param = nil
glist.push(gp)
}
for {
sg := c.sendq.dequeue()
...
}
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
下面代码的主要工作就是将 recvq
和 sendq
两个队列中的数据加入到 Goroutine 列表 gList
中,与此同时该函数会清除所有 runtime.sudog
上未被处理的元素。
该函数在最后会为所有被阻塞的 Goroutine 调用 runtime.goready 触发调度。