图解Golang channel源码


| 阅读 |,阅读约 11 分钟
| 复制链接:

Overview

图解Golang channel源码

前言

先上一张channel布局图,channel的底层实际上并不复杂,没有用到很高深的知识,主要是围绕着一个环形队列和两个链表展开。相信你看完本篇文章一定能掌握channel的实现。

高清地址

hchan结构

channel简介

  • channel是一个类型管道,通过它可以在groutine之间发送和接收消息
  • go语言层面提供的groutine之间的通讯方式

在日常开发中,对于channel的使用应该不陌生了,但是了解了基本的使用后,你是否对它的底层实现很好奇呢,为什么它就能实现并发的groutine之间通信呢?带着这个好奇,让我们研究一下channel底层的源码实现吧!

channel使用

下面是channel的最简单的用法:

 1package main
 2import "fmt"
 3
 4func main() {
 5  c := make(chan int)
 6  go func() {
 7    // 发送数据到channel
 8    c <- 1
 9  }()
10  // 从channel取出数据
11  x := <- c
12  close(c)
13  fmt.Println(x)
14}

channel源码入口

channel使用的make、<- 等符号,在源码中没有对应的实现,而是通过编译器将相关符号翻译为底层实现。 使用以下命令将go源码翻译为汇编

1go tool compile -N -l -S main.go>hello.s

查看部分带有CALL指令的核心内容如下:

10x0043 00067 (main.go:42) CALL  runtime.makechan(SB)
20x006a 00106 (main.go:44) CALL  runtime.newproc(SB)
30x008b 00139 (main.go:47) CALL  runtime.chanrecv1(SB)
40x0032 00050 (main.go:45) CALL  runtime.chansend1(SB)
50x00a3 00163 (main.go:48) CALL  runtime.closechan(SB)

可以猜测对应关系:

  • make(chan int)对应:runtime.makechan函数
  • 协程创建:runtime.newproc函数
  • ch <- 1 写数据语句对应:runtime.chansend1函数
  • x := <- ch 读数据语句对应:runtime.chanrecv1函数
  • close(c) 关闭通道语句对应:runtime.closechan函数

相关源码只需要到runtime包下,全局搜索就可以找到在文件runtime/chan.go下

1func makechan(t *chantype, size int) *hchan {}
2func chansend1(c *hchan, elem unsafe.Pointer) {}
3func chanrecv1(c *hchan, elem unsafe.Pointer) {}
4func closechan(c *hchan) {}

源码分析

上述三个函数都用到一个hchan类型的参数,它就是channel的核心数据结构,我们先分析hchan

在IDE中下断点调试时,也能看出chan的内部数据结构

  • 位置:src/runtime/chan.go

chan数据结构

  • channel内部数据结构是固定长度的双向循环列表
  • 按顺利往里面写数据,写满之后又从0开始写
  • chan中的两个重要组件是bufwaitq,所有的行为和实现都是围绕着两个组件进行的

github上Go夜读提供的这个图片比较形象,直接引用过来。

 1type hchan struct {
 2  // 当前队列中总元素个数
 3  qcount   uint           // total data in the queue
 4  // 环形队列长度,即缓冲区大小(申明channel时指定的大小)
 5  dataqsiz uint           // size of the circular queue
 6  // 环形队列指针
 7  buf      unsafe.Pointer // points to an array of dataqsiz elements
 8  // buf中每个元素的大小
 9  elemsize uint16
10  // 当前通道是否处于关闭状态,创建通道时该字段为0,关闭时字段为1
11  closed   uint32
12  // 元素类型,用于传值过程的赋值
13  elemtype *_type // element type
14  // 环形缓冲区中已发送位置索引
15  sendx    uint   // send index
16  // 环形缓冲区中已接收位置索引
17  recvx    uint   // receive index
18  // 等待读消息的groutine队列
19  recvq    waitq  // list of recv waiters
20  // 等待写消息的groutine队列
21  sendq    waitq  // list of send waiters
22  // 互斥锁,为每个读写操作锁定通道(发送和接收必须互斥)
23  lock mutex
24}
25
26// 等待读写的队列数据结构,保证先进先出
27type waitq struct {
28  first *sudog
29  last  *sudog
30}

创建channel

概述:

创建channel时,可以往channel中放入不同类型的数据,不同类型数据占用的空间大小也是不一样的,这决定了hchan和hchan中的buf字段需要开辟多大的存储空间。在go的源码中对不同的情况做不同的处理。包括三种情况:

总体的原则是:总内存大小 = hchan需要的内存大小 + 元素需要的内存大小

  • 队列为空或元素大小为0:只需要开辟的内存空间为hchan本身的大小
  • 元素不是指针类型:需要开辟的内存空间=hchan本身大小+每个元素的大小*申请的队列长度
  • 元素是指针类型:这种情况下buf需要单独开辟空间,buf占用内存大小为每个元素的大小*申请的队列长度

输入:

  • chantype:channel的类型
  • size:channel大小

输出:

  • 创建好的hchan对象

核心流程:

  • 各种参数校验
  • 数据赋值
  • 创建缓冲区存储空间(区分元素为空、元素有指针、元素无指针三种情况)

高清地址

channel创建

 1// 对应的源码为 c := make(chan int, size)
 2// c := make(chan int) 这种情况下,size = 0
 3func makechan(t *chantype, size int) *hchan {
 4  elem := t.elem
 5
 6  // 总共需要的buff大小 = channel中创建的这种元素类型的大小(elem.size)* size
 7  mem, overflow := math.MulUintptr(elem.size, uintptr(size))
 8
 9  var c *hchan
10  // 下面是为buf创建并分配存储空间
11  switch {
12  case mem == 0:
13    // size为0,或者每个元素占用的大小为0
14    // 这时为buf分配大小时,只需要分配hchan结构体本身占用的大小即可
15    // hchanSize是一个常量,表示空的hchan需要占用的字节大小
16    // hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
17    c = (*hchan)(mallocgc(hchanSize, nil, true))
18    // raceaddr内部实现为:return unsafe.Pointer(&c.buf)
19    c.buf = c.raceaddr()
20  case elem.ptrdata == 0:
21    // 如果队列中不存在指针,那么每个元素都需要被存储并占用空间,占用大小为前面乘法算出来的mem
22    // 同时还要加上hchan本身占用的空间大小,加起来就是整个hchan占用的空间大小
23    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
24    // 把buf指针指向空的hchan占用空间大小的末尾
25    c.buf = add(unsafe.Pointer(c), hchanSize)
26  default:
27    // Elements contain pointers.
28    // 如果chan中的元素是指针类型的数据,为buf单独开辟mem大小的空间,用来保存所有的数据
29    c = new(hchan)
30    c.buf = mallocgc(mem, elem, true)
31  }
32  // 设置chan的总大小
33  c.elemsize = uint16(elem.size)
34  // 元素类型
35  c.elemtype = elem
36  // 环形队列的大小,即用户创建时设置的大小
37  c.dataqsiz = uint(size)
38  return c
39}

发送数据到channel

概述:

发送数据到channel时,直观的理解是将数据放到chan的环形队列中,不过go做了一些优化:先判断是否有等待接收数据的groutine,如果有,直接将数据发给Groutine,唤醒groutine,就不放入队列中了。当然还有另外一种情况就是:队列如果满了,那就只能放到队列中等待,直到有数据被取走才能发送。

输入:

  • chan对象
  • 要发送的数据
  • 是否阻塞
  • 回调函数

输出:无

核心逻辑:

  1. 如果recvq不为空,从recvq中取出一个等待接收数据的Groutine,将数据发送给该Groutine
  2. 如果recvq为空,才将数据放入buf中
  3. 如果buf已满,则将要发送的数据和当前的Groutine打包成Sudog对象放入sendq,并将groutine置为等待状态

有等待接收数据的groutine

高清地址

channel发送-有receive

无等待接收数据的groutine,环形队列未满

高清地址

channel发送-无等待groutine-队列未满

无等待接收数据的groutine,环形队列已满

高清地址

channel发送-无等待groutine-队列已满

发送数据源码

  1// ep指向要发送数据的首地址
  2func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  3
  4  // 先上锁
  5  lock(&c.lock)
  6
  7  // 如果channel已经关闭,抛出错误
  8  // 下面这个错误经常会遇到,都是对channel使用不当报出来的
  9  if c.closed != 0 {
 10    unlock(&c.lock)
 11    panic(plainError("send on closed channel"))
 12  }
 13
 14  // 从接收队列中取出元素,如果取到数据,就将数据传过去
 15  if sg := c.recvq.dequeue(); sg != nil {
 16    // 调用send方法,将值传过去
 17    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
 18    return true
 19  }
 20
 21  // 走到这里,说明没有等待接收数据的Groutine
 22  // 如果缓冲区没有满,直接将要发送的数据复制到缓冲区
 23  if c.qcount < c.dataqsiz {
 24    // c.sendx是已发送的索引位置,这个方法通过指针偏移找到索引位置
 25    // 相当于执行c.buf(c.sendx)
 26    qp := chanbuf(c, c.sendx)
 27    if raceenabled {
 28      raceacquire(qp)
 29      racerelease(qp)
 30    }
 31
 32    // 复制数据,内部调用了memmove,是用汇编实现的
 33    // 通知接收方数据给你了,将接收方协程由等待状态改成可运行状态,
 34    // 将当前协程加入协程队列,等待被调度
 35    typedmemmove(c.elemtype, qp, ep)
 36
 37    // 数据索引前移,如果到了末尾,又从0开始
 38    c.sendx++
 39    if c.sendx == c.dataqsiz {
 40      c.sendx = 0
 41    }
 42
 43    // 元素个数加1,释放锁并返回
 44    c.qcount++
 45    unlock(&c.lock)
 46    return true
 47  }
 48
 49  // 走到这里,说明缓冲区也写满了
 50  // 同步非阻塞的情况,直接返回
 51  if !block {
 52    unlock(&c.lock)
 53    return false
 54  }
 55
 56  // 以下为同步阻塞的情况
 57  // 此时会将当前的Groutine以及要发送的数据放入到sendq队列中,并且切换出该Groutine
 58  gp := getg()
 59  mysg := acquireSudog()
 60  mysg.releasetime = 0
 61  if t0 != 0 {
 62    mysg.releasetime = -1
 63  }
 64  // No stack splits between assigning elem and enqueuing mysg
 65  // on gp.waiting where copystack can find it.
 66  mysg.elem = ep
 67  mysg.waitlink = nil
 68  mysg.g = gp
 69  mysg.isSelect = false
 70  mysg.c = c
 71  gp.waiting = mysg
 72  gp.param = nil
 73
 74  // 将Groutine放入sendq队列
 75  c.sendq.enqueue(mysg)
 76
 77  // Groutine转入 waiting 状态,gopark是调度相关的代码
 78  // 在用户看来,向channel发送数据的代码语句会阻塞
 79  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
 80  KeepAlive(ep)
 81
 82  // G被唤醒
 83  if mysg != gp.waiting {
 84    throw("G waiting list is corrupted")
 85  }
 86  gp.waiting = nil
 87  gp.activeStackChans = false
 88  if gp.param == nil {
 89    if c.closed == 0 {
 90      throw("chansend: spurious wakeup")
 91    }
 92    panic(plainError("send on closed channel"))
 93  }
 94  gp.param = nil
 95  if mysg.releasetime > 0 {
 96    blockevent(mysg.releasetime-t0, 2)
 97  }
 98  mysg.c = nil
 99
100  // G被唤醒,状态改成可执行状态,从这里开始继续执行
101  releaseSudog(mysg)
102  return true
103}

send函数

 1// 要发送的数据ep,被拷贝到接收者sg中,之后sg被唤醒继续执行
 2func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 3
 4  // 拷贝数据
 5  if sg.elem != nil {
 6    sendDirect(c.elemtype, sg, ep)
 7    sg.elem = nil
 8  }
 9  gp := sg.g
10  unlockf()
11  gp.param = unsafe.Pointer(sg)
12  if sg.releasetime != 0 {
13    sg.releasetime = cputicks()
14  }
15  // 放入调度队列,等待被调度
16  goready(gp, skip+1)
17}

读取数据

概述:

从channel读取数据的流程和发送的类似,基本是发送操作的逆操作。

从channel读取数据时,不是直接去环形队列中去数据,而是先判断是否有等待发送数据的groutine,如果有,直接将groutine出队列,取出数据返回,并唤醒groutine。如果没有等待发送数据的groutine,再从环形队列中取数据。

输入:

  • chan对象
  • 接收数据的指针
  • 是否阻塞

输出:是否接收成功

核心逻辑:

  1. 如果有等待发送数据的groutine,从sendq中取出一个等待发送数据的Groutine,取出数据
  2. 如果没有等待的groutine,且环形队列中有数据,从队列中取出数据
  3. 如果没有等待的groutine,且环形队列中也没有数据,则阻塞该Groutine,并将groutine打包为sudogo加入到recevq等待队列中

sendq中有等待的groutine

高清地址

发送数据-有等待的groutine

sendq中无等待的groutine,队列不为空

高清地址

发送数据-无等待的groutine-队列不为空

sendq中无等待的groutine,队列为空

高清地址

发送数据-无等待的groutine-队列为空

读取数据源码

 1func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
 2
 3  // 上锁
 4  lock(&c.lock)
 5  // 优先从发送队列中取数据,如果有等待发送数据的groutine,直接从发送数据的协程中取出数据
 6  if sg := c.sendq.dequeue(); sg != nil {
 7    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
 8    return true, true
 9  }
10
11  // chan环形队列中如果有有数据
12  if c.qcount > 0 {
13    // 从接收数据的索引出取出数据
14    // 等价于 c.buf[c.recvx]
15    qp := chanbuf(c, c.recvx)
16    if raceenabled {
17      raceacquire(qp)
18      racerelease(qp)
19    }
20    // 将数据拷贝到接收数据的协程
21    if ep != nil {
22      typedmemmove(c.elemtype, ep, qp)
23    }
24    typedmemclr(c.elemtype, qp)
25    // 接收数据的索引前移
26    c.recvx++
27    // 环形队列,如果到了末尾,再从0开始
28    if c.recvx == c.dataqsiz {
29      c.recvx = 0
30    }
31    // 发送数据的索引移动位置
32    c.qcount--
33    unlock(&c.lock)
34    return true, true
35  }
36
37  // 同步非阻塞,协程直接返回
38  if !block {
39    unlock(&c.lock)
40    return false, false
41  }
42
43  // 同步阻塞
44  // 如果代码走到这,说明没有任何数据可以获取到,阻塞住协程,并加入channel的接收队列中
45  gp := getg()
46  mysg := acquireSudog()
47  mysg.releasetime = 0
48  if t0 != 0 {
49    mysg.releasetime = -1
50  }
51  // No stack splits between assigning elem and enqueuing mysg
52  // on gp.waiting where copystack can find it.
53  mysg.elem = ep
54  mysg.waitlink = nil
55  gp.waiting = mysg
56  mysg.g = gp
57  mysg.isSelect = false
58  mysg.c = c
59  gp.param = nil
60
61  // 添加到接收队列中
62  c.recvq.enqueue(mysg)
63  // 调度
64  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
65
66  // someone woke us up
67  if mysg != gp.waiting {
68    throw("G waiting list is corrupted")
69  }
70  gp.waiting = nil
71  gp.activeStackChans = false
72  if mysg.releasetime > 0 {
73    blockevent(mysg.releasetime-t0, 2)
74  }
75  closed := gp.param == nil
76  gp.param = nil
77  mysg.c = nil
78
79  // G被唤醒,从这里继续执行
80  releaseSudog(mysg)
81  return true, !closed
82}

关闭channel

输入:channel

输出:无

核心流程:

  • 设置关闭状态
  • 唤醒所有等待读取chanel的协程
  • 所有等待写入channel的协程,抛出异常
 1func closechan(c *hchan) {
 2  // channel为空,抛出异常
 3  if c == nil {
 4    panic(plainError("close of nil channel"))
 5  }
 6
 7  // 上锁
 8  lock(&c.lock)
 9
10  // 如果channel已经被关闭,抛出异常
11  if c.closed != 0 {
12    unlock(&c.lock)
13    panic(plainError("close of closed channel"))
14  }
15
16  // 设置关闭状态的值
17  c.closed = 1
18
19  // 申明一个存放g的list,把所有的groutine放进来
20  // 目的是尽快释放锁,因为队列中可能还有数据需要处理,可能用到锁
21  var glist gList
22
23  // release all readers
24  // 唤醒所有等待读取chanel数据的协程
25  for {
26    sg := c.recvq.dequeue()
27    // 等待队列处理完毕,退出
28    if sg == nil {
29      break
30    }
31    if sg.elem != nil {
32      typedmemclr(c.elemtype, sg.elem)
33      sg.elem = nil
34    }
35    if sg.releasetime != 0 {
36      sg.releasetime = cputicks()
37    }
38    gp := sg.g
39    gp.param = nil
40    if raceenabled {
41      raceacquireg(gp, c.raceaddr())
42    }
43    // 加入临时队列
44    glist.push(gp)
45  }
46
47  // release all writers (they will panic)
48  // 处理所有要发送数据的协程,抛出异常
49  for {
50    sg := c.sendq.dequeue()
51    if sg == nil {
52      break
53    }
54    sg.elem = nil
55    if sg.releasetime != 0 {
56      sg.releasetime = cputicks()
57    }
58    gp := sg.g
59    gp.param = nil
60    if raceenabled {
61      raceacquireg(gp, c.raceaddr())
62    }
63    // 加入临时队列
64    glist.push(gp)
65  }
66  unlock(&c.lock)
67
68  // Ready all Gs now that we've dropped the channel lock.
69  // 处理临时队列中所有的groutine
70  for !glist.empty() {
71    gp := glist.pop()
72    gp.schedlink = 0
73
74    // 放入调度队列,等待被调度
75    goready(gp, 3)
76  }
77}

总结

初次使用channel时,感觉很复杂也很神奇,带着这份好奇去研究底层的源码实现,看完之后才发现,它其实没有那么复杂,底层实现逻辑很清晰。本文通过图文并茂的方式整理了底层的逻辑,包括创建channel,发送数据,接收数据等。当然,里面还涉及到调度等知识,后面专门再整理一篇文章加以分析。

参考