数据结构

channel内部的数据结构表示如下:

type hchan struct {
	qcount   uint // channel中元素的个数
	dataqsiz uint // channel中循环队列的长度
	buf      unsafe.Pointer // channel缓冲数据指针
	elemsize uint16 // 收发的元素大小
	closed   uint32
	elemtype *_type // 收发的元素类型
	sendx    uint // channel的发送操作处理到缓冲位置
	recvx    uint // channel的接收操作处理到缓冲位置
	recvq    waitq // 当前因为缓冲区不足而阻塞的Gorontine列表
	sendq    waitq

	lock mutex
}

阻塞的 GoRoutine 等待队列使用双向链表表示,链表中所有的元素是 runtime.sudog 结构:

type waitq struct {
	first *sudog
	last  *sudog
}

该结构中存储了两个分别指向前后 runtime.sudog 的指针构成的链表。

创建channel

Go 语言中所有的 channel 都使用 make 关键字进行创建,并转换为 runtime.makechan 或者 runtime.makechan64 的调用,后者用于处理缓冲区大于 $2^{32}$ 的情况。

// 根据收发元素的类型和缓冲区的大小进行初始化
func makechan(t *chantype, size int) *hchan {
	elem := t.elem
    
	mem, _ := math.MulUintptr(elem.size, uintptr(size))

	var c *hchan
	switch {
	case mem == 0: // 不存在缓冲区
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		c.buf = c.raceaddr()
	case elem.kind&kindNoPointers != 0:
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	return c
}

上面的代码就是根据收发元素的类型和缓冲区大小初始化 channel 的代码:

  • 如果当前的channel不存在缓冲区,那么就只为 runtime.hchan 分配内存。
  • 如果存储的不是指针类型,那么为channel和底层数组分配一段连续的空间,并将缓冲区指针 hchan.buf 指向底层数组。
  • 默认情况下分别为 runtime.hchan 和缓冲区分配内存。

完成内存的分配工作之后对 hchan 中的几个字段进行更新。

发送数据

当向channel发送数据时,我们使用 ch<-i 语句,该语句最终会转换为对 runtime.chansend() 的调用,并传入 channel 和需要发送的数据。

下面来逐步分析发送逻辑:

// block为true表示当前的发送操作是阻塞的
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	lock(&c.lock) // 为当前的channel加锁

	if c.closed != 0 { // 检查是否是在一个关闭的channel上发送数据
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

在进入发送逻辑之前首先需要为当前的 channel 加锁,防止多个协程并发修改数据。然后检查是否是在已经关闭的channl上发送数据,如果是,那么就解锁,并触发宕机。

直接发送

当存在等待的接受者时,通过 runtime.send 直接将数据发送给阻塞的接收者。具体来说,runtime.chansend 函数会直接从阻塞的接收队列 recvq 中取最先陷入等待的goroutine,并直接向其发送数据。

// 如果有因为等待接收数据阻塞的gorountine
if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

runtime.send 函数的代码如下:

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if sg.elem != nil {
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	goready(gp, skip+1)
}

该函数遵循下面的逻辑:

  1. 调用 runtime.sendDirect 将发送的数据 x = <-c 直接拷贝到表达式中变量 x 所在的内存地址上。
  2. 调用 runtime.goready 将等待接收数据的Goroutine标记为可运行状态 **Grunnable**,并把该Goroutine放到发送方所在的处理器的 runnext 上等待执行 (程序并没有立即执行该Goroutine),该处理器在下一次调度时会立即唤醒数据的接收方。

缓冲区

如果创建的 channel 包含缓冲区,并且 channel 中的数据没有装满,那么会执行下面的逻辑:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	...
    // 缓冲区未满
	if c.qcount < c.dataqsiz {
		qp := chanbuf(c, c.sendx)
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++

        //buf 是一个循环数组,所以当 sendx 等于 dataqsiz 时会重新回到数组开始的位置
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}
	...
}

在上面的逻辑中首先通过 runtime.chanbuf 计算下一个可以存储数据的位置,即索引 sendx,然后通过 runtime.typedmemmove 将发送数据拷贝到缓冲区 (索引 send 所指的位置),并增加 sendxqcount 的值,最后解锁。

阻塞发送

如果当前的 channel 没有缓冲区时,向其发送数据会造成阻塞。向 channel 阻塞地发送数据会执行下面的逻辑:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	...
    // 如果是非阻塞的发送就解锁,退出
	if !block {
		unlock(&c.lock)
		return false
	}


	gp := getg()
	mysg := acquireSudog() // 获取 runtime.sudog 结构
	mysg.elem = ep
	mysg.g = gp
	mysg.c = c
	gp.waiting = mysg
	c.sendq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

	gp.waiting = nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true
}

阻塞发送的逻辑如下:

  1. 调用 runtime.gettg 获取发送数据使用的goroutine;
  2. 通过 runtime.acquireSudog 获取 sudog 结构, 被设置阻塞发送关的信息;
  3. 将创建的 sudog 加入发送等待队列, 并设置到发送数据的goroutine的waiting上;表示该goroutine正在等待该 sudog 准备就绪;
  4. 调用 runtime.goparkkunlock 将当前的goroutine陷入睡眠等待唤醒;
  5. 被调度器唤醒之后将一些属性设置为 nil,并释放 sudog 最后返回true 表示成功发送了数据。

小结

我们在这里可以简单梳理和总结一下使用 ch <- i 表达式向 Channel 发送数据时遇到的几种情况:

  1. 如果当前 Channel 的 recvq 上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前 Goroutine 并将其设置成下一个运行的 Goroutine。
  2. 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们会直接将数据存储到缓冲区 sendx 所在的位置上。
  3. 如果不满足上面的两种情况,会创建一个 runtime.sudog 结构并将其加入 Channel 的 sendq 队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据。

发送数据的过程中包含几个会触发 Goroutine 调度的时机:

  • 发送数据时发现 Channel 上存在等待接收数据的 Goroutine,立刻设置处理器的 runnext 属性,但是并不会立刻触发调度;
  • 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入 Channel 的 sendq 队列并调用 runtime.goparkunlock 触发 Goroutine 的调度让出处理器的使用权;

接收数据

Go语言中使用两种方式接收数据,分别是 i<-chi, ok <- ch。两种方式最终都会转换成为对 runtime.chanrecv 的调用。

下面逐步分析接收逻辑:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	lock(&c.lock)

	if c.closed != 0 && c.qcount == 0 {
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}

如果我们从一个空的 channel 中接收数据,那么会直接调用 runtime.gopark 让出处理器的使用权。如果当前的 channel 已经被关闭,并且缓冲区中不包含任何数据,那么会清除 ep 指针中的数据,并立即返回。

直接接收

if sg := c.sendq.dequeue(); sg != nil {
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

当 channel 的等待队列中包含处于等待状态的 gorountine 时,该函数直接取出队头的 goroutine,类似于发送的逻辑,只是调用的是 runtime.recv

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 { // 没有缓冲区
		if ep != nil {
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		qp := chanbuf(c, c.recvx) // 接收的内存地址
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	gp := sg.g
	gp.param = unsafe.Pointer(sg)
	goready(gp, skip+1)
}

runtime.recv函数根据缓冲区的大小分别处理:

  • 如果channel不存在缓冲区
  1. 直接调用runtime.recvDirect将channel发送队列中Goroutine存储的elem数据拷贝到目标内存地址中.
  • 如果channel存在缓冲区
  1. 将队列中的数据拷贝到接收方的内存地址;
  2. 将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方

两种情况中,都会调用runtime.goready将当前处理器的runnext设置称发送数据的goroutine, 并在调度器下一次调度时将阻塞的发送方唤醒.

缓冲区

当channel的缓冲区已经包含数据时,从中取数据会直接从缓冲区中的recvx 索引的位置中取数据进行处理。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
	if c.qcount > 0 {
		qp := chanbuf(c, c.recvx)
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp) // 清除队列中的数据
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		return true, true
	}
	...
}

如果接收数据的内存地址不为空,那么就使用 runtime.typedmemmove 将缓冲区中的数据拷贝到内存中,清除队列中的数据,并递增 recv, 递减qcount,并释放持有的channel锁。

阻塞接收

当 Channel 的发送队列中不存在等待的 Goroutine 并且缓冲区中也不存在任何数据时,从管道中接收数据的操作会变成阻塞的,然而不是所有的接收操作都是阻塞的,与 select 语句结合使用时就可能会使用到非阻塞的接收操作:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
	if !block {
		unlock(&c.lock)
		return false, false
	}

	gp := getg()
	mysg := acquireSudog()
	mysg.elem = ep
	gp.waiting = mysg
	mysg.g = gp
	mysg.c = c
	c.recvq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

	gp.waiting = nil
	closed := gp.param == nil
	gp.param = nil
	releaseSudog(mysg)
	return true, !closed
}

在正常的接收场景中,我们会使用 runtime.sudog 将当前 Goroutine 包装成一个处于等待状态的 Goroutine 并将其加入到接收队列中。

完成入队之后,上述代码还会调用 runtime.goparkunlock 立刻触发 Goroutine 的调度,让出处理器的使用权并等待调度器的调度。

小结

我们梳理一下从 Channel 中接收数据时可能会发生的五种情况:

  1. 如果 Channel 为空,那么会直接调用 runtime.gopark 挂起当前 Goroutine;
  2. 如果 Channel 已经关闭并且缓冲区没有任何数据,runtime.chanrecv 会直接返回;
  3. 如果 Channel 的 sendq 队列中存在挂起的 Goroutine,会将 recvx 索引所在的数据拷贝到接收变量所在的内存空间上并将 sendq 队列中 Goroutine 的数据拷贝到缓冲区(语义上保证FIFO);
  4. 如果 Channel 的缓冲区中包含数据,那么直接读取 recvx 索引对应的数据; 在默认情况下会挂起当前的 Goroutine,将 runtime.sudog 结构加入 recvq 队列并陷入休眠等待调度器的唤醒;

我们总结一下从 Channel 接收数据时,会触发 Goroutine 调度的两个时机:

  • 当 Channel 为空时;
  • 当缓冲区中不存在数据并且也不存在数据的发送者时;

关闭channel

channel的关闭会转换为runtime.closechan的调用.

func closechan(c *hchan) {
	if c == nil {
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

首先,如果关闭的是一个空指针或者是一个已经被关闭的channel会直接触发宕机。

处理完上诉的异常逻辑之后,就可以执行关闭channel的逻辑,关键代码如下:

c.closed = 1

	var glist gList
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		gp := sg.g
		gp.param = nil
		glist.push(gp)
	}

	for {
		sg := c.sendq.dequeue()
		...
	}
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

下面代码的主要工作就是将 recvqsendq 两个队列中的数据加入到 Goroutine 列表 gList 中,与此同时该函数会清除所有 runtime.sudog 上未被处理的元素。

该函数在最后会为所有被阻塞的 Goroutine 调用 runtime.goready 触发调度。