Go编程源码分析channel

0. 前言

在go中经常谈到的一句话是:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。
在Goroutine之间通过channel传递数据,作为Go语言的核心数据结构和Goroutine之间的通信方式,channel是支撑Go语言高性能并发编程模型的重要结构。
在这里插入图片描述
channel在运行时的内部表示是runtime.hchan,该结构体中包含了用于保护成员变量的互斥锁,从某种程度上说,channel是一个用于同步和通信的有锁队列。hchan结构体源码:


type hchan struct {
    qcount    uint        // 循环列表元素个数
    dataqsiz  uint        // 循环队列的大小
    buf      unsafe.Pointer  // 循环队列的指针
    elemsize  uint16      // chan中元素的大小
    closed    uint32      // 是否已close
    elemtype  *_type      // chan中元素类型
    sendx    uint        // chan的发送操作处理到的位置
    recvx    uint        // chan的接收操作处理到的位置
    recvq    waitq        // 等待接收数据的Goroutine列表
    sendq    waitq        // 等待发送数据的Goroutine列表
    
    lock    mutex        // 互斥锁
}

type waitq struct {        // 双向链表
    first  *sudog
    last  *sudog
}

waitq中连接的是一个sudog双向链表,保存的是等待中的Goroutine。
在这里插入图片描述

1 创建channel

使用make关键字来创建channel,**make(chan int, 3)**会调用到runtime.makechan函数


const (
  maxAlign  = 8
  hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
)

func makechan(t *chantype, size int) *hchan {
  elem := t.elem
    
    // 计算需要分配的buf空间大小
  mem, overflow := math.MulUintptr(elem.size, uintptr(size))
  if overflow || mem > maxAlloc-hchanSize || size < 0 {
    panic(plainError("makechan: size out of range"))
  }

  var c *hchan
  switch {
  case mem == 0:
    // chan的大小或者elem的大小为0,不需要创建buf
    c = (*hchan)(mallocgc(hchanSize, nil, true))
    // Race detector uses this location for synchronization.
    c.buf = c.raceaddr()
  case elem.ptrdata == 0:
    // elem不含指针,分配一块连续的内存给hchan数据结构和buf
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
    // elem包含指针,单独分配buf
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }

    // 更新hchan的elemsize、elemtype、dataqsiz字段
  c.elemsize = uint16(elem.size)
  c.elemtype = elem
  c.dataqsiz = uint(size)
    
  return c
}

上述代码根据channel中收发元素的类型和缓冲区的大小初始化runtime.hchan和缓冲区:

  • 若缓冲区所需大小为0,就只会为hchan分配一段内存;
  • 若缓冲区所需大小不为0且elem不包含指针,会为hchan和buf分配一块连续的内存;
  • 若缓冲区所需大小不为0且elem包含指针,会单独为hchan和buf分配内存。

2 发送数据到channel

发送数据到channel,ch<-i会调用到runtime.chansend函数中,该函数包含了发送数据的全部逻辑:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
    // 对于非阻塞的发送,直接返回
    if !block {
      return false
    }
    // 对于阻塞的通道,将goroutine挂起
    gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
    throw("unreachable")
  }
    // 加锁
  lock(&c.lock)
    // channel已关闭,panic
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("send on closed channel"))
  }
    ...
}

block表示当前的发送操作是否是阻塞调用。如果channel为空,对于非阻塞的发送,直接返回false,对于阻塞的发送,将goroutine挂起,并且永远不会返回。对channel加锁,防止多个线程并发修改数据,如果channel已关闭,报错并中止程序。

runtime.chansend函数的执行过程可以分为以下三个部分:

  • 当存在等待的接收者时,通过runtime.send直接将数据发送给阻塞的接收者;
  • 当缓冲区存在空余空间时,将发送的数据写入缓冲区;
  • 当不存在缓冲区或缓冲区已满时,等待其他Goroutine从channel接收数据。

2.1 直接发送

如果目标channel没有被关闭且recvq队列中已经有处于读等待的Goroutine,那么runtime.chansend会从接收队列 recvq中取出最先陷入等待的Goroutine并直接向它发送数据,注意,由于有接收者在等待,所以如果有缓冲区,那么缓冲区一定是空的:


func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ...
    // 从recvq中取出一个接收者
  if sg := c.recvq.dequeue(); sg != nil { 
    // 如果接收者存在,直接向该接收者发送数据,绕过buf
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
  }
    ...
}

直接发送会调用runtime.send函数:


func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  ...
  if sg.elem != nil {
    // 直接把要发送的数据copy到接收者的栈空间
    sendDirect(c.elemtype, sg, ep)
    sg.elem = nil
  }
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
  // 设置对应的goroutine为可运行状态
  goready(gp, skip+1)
}

sendDirect方法调用memmove进行数据的内存拷贝。goready方法将等待接收数据的Goroutine标记成可运行状态(Grunnable)并把该Goroutine发到发送方所在的处理器的runnext上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方。注意,只是放到了runnext中,并没有立刻执行该Goroutine。

2.2 发送到缓冲区

如果缓冲区未满,则将数据写入缓冲区:


func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
  // 如果缓冲区没有满,直接将要发送的数据复制到缓冲区
  if c.qcount < c.dataqsiz {
    // 找到buf要填充数据的索引位置
    qp := chanbuf(c, c.sendx)
    ...
    // 将数据拷贝到buf中
    typedmemmove(c.elemtype, qp, ep)
    // 数据索引前移,如果到了末尾,又从0开始
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
    // 元素个数加1,释放锁并返回
    c.qcount++
    unlock(&c.lock)
    return true
  }
  ...
}

找到缓冲区要填充数据的索引位置,调用typedmemmove方法将数据拷贝到缓冲区中,然后重新设值sendx偏移量。

2.3 阻塞发送

当channel没有接收者能够处理数据时,向channel发送数据会被下游阻塞,使用select关键字可以向channel非阻塞地发送消息:


func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
  // 缓冲区没有空间了,对于非阻塞调用直接返回
  if !block {
    unlock(&c.lock)
    return false
  }
  // 创建sudog对象
  gp := getg()
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {
    mysg.releasetime = -1
  }
  mysg.elem = ep
  mysg.waitlink = nil
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.waiting = mysg
  gp.param = nil
  // 将sudog对象入队
  c.sendq.enqueue(mysg)
  // 进入等待状态
  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
  ...
}

对于非阻塞的调用会直接返回,对于阻塞的调用会创建sudog对象并将sudog对象加入发送等待队列。调用gopark将当前Goroutine转入waiting状态。调用gopark之后,在使用者看来向该channel发送数据的代码语句会被阻塞。

在这里插入图片描述
注意,发送数据的过程中包含几个会触发Goroutine调度的时机:

  • 注意,发送数据的过程中包含几个会触发Goroutine调度的时机:
  • 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入channel的sendq队列并调用gopark触发Goroutine的调度让出处理器的使用权。

3 从chan接收数据

从channel获取数据最终调用到runtime.chanrecv函数:


func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if c == nil {
        // 如果c为空且是非阻塞调用,直接返回
    if !block {
      return
    }
        // 阻塞调用直接等待
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
    throw("unreachable")
  }
  ···
  lock(&c.lock)
    // 如果c已经关闭,并且c中没有数据,返回
  if c.closed != 0 && c.qcount == 0 {
    unlock(&c.lock)
    if ep != nil {
      typedmemclr(c.elemtype, ep)
    }
    return true, false
  }
    ···
}

当从一个空channel接收数据时,直接调用gopark让出处理器使用权。如果当前channel已被关闭且缓冲区中没有数据,直接返回。

runtime.chanrecv函数的具体执行过程可以分为以下三个部分:

  • 当存在等待的发送者时,通过runtime.recv从阻塞的发送者或者缓冲区中获取数据;
  • 当缓冲区存在数据时,从channel的缓冲区中接收数据;
  • 当缓冲区中不存在数据时,等待其他Goroutine向channel发送数据。

3.1 直接接收

当channel的sendq队列中包含处于发送等待状态的Goroutine时,调用runtime.recv直接从这个发送者那里提取数据。注意,由于有发送者在等待,所以如果有缓冲区,那么缓冲区一定是满的。


func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
  // 从发送者队列获取数据
  if sg := c.sendq.dequeue(); sg != nil { 
    // 发送者队列不为空,直接从发送者那里提取数据
    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true, true
  } 
  ...
}

主要看一下runtime.recv的实现:


func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 如果是无缓冲区chan
  if c.dataqsiz == 0 {
    if ep != nil {
            // 直接从发送者拷贝数据
      recvDirect(c.elemtype, sg, ep)
    }
    // 有缓冲区chan
  } else {
        // 获取buf的存放数据指针
    qp := chanbuf(c, c.recvx)
        // 直接从缓冲区拷贝数据给接收者
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
        // 从发送者拷贝数据到缓冲区
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  }
  gp := sg.g
  gp.param = unsafe.Pointer(sg)
    // 设置对应的goroutine为可运行状态
  goready(gp, skip+1)
}

该函数会根据缓冲区的大小分别处理不同的情况:

  • 如果channel不存在缓冲区:直接从发送者那里提取数据。
  • 如果channel存在缓冲区:将缓冲区中的数据拷贝到接收方的内存地址;将发送者数据拷贝到缓冲区,并唤醒发送者。
    无论发生哪种情况,运行时都会调用goready将等待发送数据的Goroutine标记成可运行状态(Grunnable)并将当前处理器的runnext设置成发送数据的Goroutine,在调度器下一次调度时将阻塞的发送方唤醒。

3.2 从缓冲区接收

如果channel缓冲区中有数据且发送者队列中没有等待发送的Goroutine时,直接从缓冲区中recvx的索引位置取出数据:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    // 如果缓冲区中有数据
  if c.qcount > 0 {
    qp := chanbuf(c, c.recvx)
        // 从缓冲区复制数据到ep
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
        // 接收数据的指针前移
    c.recvx++
        // 环形队列,如果到了末尾,再从0开始
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
        // 缓冲区中现存数据减一
    c.qcount--
        unlock(&c.lock)
    return true, true
  }
  ...
}

3.3 阻塞接收

当channel的发送队列中不存在等待的Goroutine并且缓冲区中也不存在任何数据时,从管道中接收数据的操作会被阻塞,使用 select 关键字可以非阻塞地接收消息:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
  // 非阻塞,直接返回
  if !block {
    unlock(&c.lock)
    return false, false
  } 
  // 创建sudog
  gp := getg()
  mysg := acquireSudog()
  ···
  gp.waiting = mysg
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.param = nil
  // 将sudog添加到等待接收队列中
  c.recvq.enqueue(mysg)
  // 阻塞Goroutine,等待被唤醒
  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
  ...
}

如果是非阻塞调用,直接返回。阻塞调用会将当前Goroutine封装成sudog,然后将sudog添加到等待接收队列中,调用gopark让出处理器的使用权并等待调度器的调度。

注意,接收数据的过程中包含几个会触发Goroutine调度的时机:

  1. 当channel为空时
  2. 当channel的缓冲区中不存在数据并且sendq中也不存在等待的发送者时

4 关闭chan

关闭通道会调用到runtime.closechan方法:


func closechan(c *hchan) {
    // 校验逻辑
    ...
    lock(&c.lock)
    // 设置chan已关闭
  c.closed = 1
  var glist gList
    // 获取所有接收者
  for {
    sg := c.recvq.dequeue()
    if sg == nil {
      break
    }
    if sg.elem != nil {
      typedmemclr(c.elemtype, sg.elem)
      sg.elem = nil
    }
    gp := sg.g
    gp.param = nil
    glist.push(gp)
  }
  // 获取所有发送者
  for {
    sg := c.sendq.dequeue()
    ...
  }
    unlock(&c.lock)
    // 唤醒所有glist中的goroutine
  for !glist.empty() {
    gp := glist.pop()
    gp.schedlink = 0
    goready(gp, 3)
  }
}

将recvq和sendq两个队列中的Goroutine加入到gList中,并清除所有sudog上未被处理的元素。最后将所有glist中的Goroutine加入调度队列,等待被唤醒。注意,发送者在被唤醒之后会panic。

5 总结

在这里插入图片描述


版权声明:本文为apple_56973763原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。