掘金 后端 ( ) • 2024-07-02 15:25

什么是channel

Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。

channel提供了一种同步的机制,确保在数据发送和接收之间的正确顺序和时机。通过使用channel,我们可以避免在多个goroutine之间共享数据时出现的竞争条件和其他并发问题。

func main() {
    ch := make(chan string)
    go func() {
        ch <- "test"
    }()
    
    msg := <-ch
    fmt.Println(msg)
}

Channel 可以看做goroutine之间的桥梁:

whiteboard_exported_image (7).png

channel的特性

单向通道

有的时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收。

Go语言中提供了单向通道来处理这种情况:

// 单向发送 out 通道
func cter(out chan <- int){
    for i := 0; i < 10; i++ {
        out <- i
    }
    close(out)
}

// 单向发送 out 通道, 单向接收 in 通道
func sqer(out chan <- int , in <- chan int){
    for i := range in{
        out <- i * i
    }
    close(out)
}

// 单向接收 in 通道
func prter(in <-chan int){
    for i := range in {
        fmt.Println(i)
    }
}

func main(){
    out := make(chan int)
    in := make(chan int)
    go cter(out)
    go sqer(out, in)
    prter(in)
}
  1. chan<- int 是一个只能发送的通道,可以发送但是不能接收;

  2. <-chan int 是一个只能接收的通道,可以接收但是不能发送。

在函数传参及任何赋值操作中将双向通道转换为单向通道是可以的,但反过来是不可以的。

无缓冲的通道(阻塞)

image

无缓冲的通道又被称为阻塞的通道。 我们看一下示例:

func main() {
    ch := make(chan int)
    ch <- 10
    fmt.Println("发送成功")
}

上面这段代码能够通过编译,但是执行时会出现一下错误:

fatal error: all goroutines are asleep - deadlock!

为什么会出现deadlock错误呢?

因为我们使用ch := make(chan int)创建的是无缓冲的通道,无缓冲的通道只有在有人接收值的时候才能发送值。就像你住的小区没有快递柜和代收点,快递员给你打电话必须要把这个物品送到你的手中,简单来说就是无缓冲的通道必须有接收才能发送。

上面的代码会阻塞在ch <- 10这一行代码形成死锁,那如何解决这个问题呢?

一种方法是启用一个goroutine去接收值,例如:

func recv(c chan int) {
    ret := <-c
    fmt.Println("接收成功", ret)
}
func main() {
    ch := make(chan int)
    go recv(ch) // 启用goroutine从通道接收值
    ch <- 10
    fmt.Println("发送成功")
}

无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。

使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。

有缓冲的通道(非阻塞)

解决无缓冲通道(阻塞)死锁的问题,就是使用有缓冲的通道。

通过缓存的使用,可以尽量避免阻塞,提供应用的性能。

image

我们使用 make 函数在初始化的时候为其指定通道的容量(缓冲大小):

func main(){
    ch := make(chan int ,1)  // 创建一个容量为 1 的有缓冲区的通道
    ch <- 10
    fmt.Println("发送成功")
}

只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。就像你小区的快递柜只有那么个多格子,格子满了就装不下了,就阻塞了,等到别人取走一个快递员就能往里面放一个。

我们可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量,虽然我们很少会这么做。

Channel 底层结构

实现 Channel 的结构并不神秘,本质上就是一个 mutex 锁加上一个环状缓存、 一个发送方队列和一个接收方队列:

// src/runtime/chan.go
type hchan struct {
    qcount   uint           // 队列中的所有数据数
    dataqsiz uint           // 环形队列的大小
    buf      unsafe.Pointer // 指向大小为 dataqsiz 的数组
    elemsize uint16         // 元素大小
    closed   uint32         // 是否关闭
    elemtype *_type         // 元素类型
    sendx    uint           // 发送索引
    recvx    uint           // 接收索引
    recvq    waitq          // recv 等待列表,即( <-ch )
    sendq    waitq          // send 等待列表,即( ch<- )
    lock mutex
}

重点字段

  • buf 指向底层循环数组,只有缓冲型的 channel 才有;

  • sendx,recvx 均指向底层循环数组,表示当前可以发送和接收的元素位置索引值(相对于底层数组)。

  • sendq,recvq 分别表示被阻塞的 goroutine,这些 goroutine 由于尝试读取 channel 或向 channel 发送数据而被阻塞。

  • waitq 是 sudog 的一个双向链表,而 sudog 实际上是对 goroutine 的一个封装:

type waitq struct {
    first *sudog
    last  *sudog
}


type sudog struct {
     g *g             // 等待发送队列或者等待接收队列的goroutine
     next *sudog
     prev *sudog
    elem unsafe.Pointer // goroutine的栈(发送或者接收的数据的地址)
    // ...
}
            
  • lock 用来保证每个读 channel 或写 channel 的操作都是原子的。

例如,创建一个容量为 6 的,元素为 int 型的 channel 数据结构如下 :

image

Channel 的创建

Channel 的创建语句由编译器完成如下翻译工作:

make(chan type, n) => makechan(type, n)
或者
make(chan type, n) => makechan64(type, n)

Channel 并不严格支持 int64 大小的缓冲,当 make(chan type, n) 中 n 为 int64 类型时, 运行时的实现仅仅只是将其强转为 int,提供了对 int 转型是否成功的检查:

  // src/runtime/chan.go

func makechan64(t *chantype, size int64) *hchan {
    if int64(int(size)) != size {
            panic("makechan: size out of range")
    }

    return makechan(t, int(size))
}

而具体的 makechan 实现的本质是根据需要创建的元素大小, 对 mallocgc 进行封装, 因此,Channel 总是在堆上进行分配,它们会被垃圾回收器进行回收, 这也是为什么 Channel 不一定总是需要调用 close(ch) 进行显式地关闭。

// src/runtime/chan.go

// 将 hchan 的大小对齐
const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&7)

func makechan(t *chantype, size int) *hchan {
    elem := t.elem
    ...

    // 检查确认 channel 的容量不会溢出
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
            panic("makechan: size out of range")
    }

    var c *hchan
    switch {
    case mem == 0:
            // 队列或元素大小为零
            c = (*hchan)(mallocgc(hchanSize, nil, true))
            ...
    case elem.ptrdata == 0:
            // 元素不包含指针
            // 在一个调用中分配 hchan 和 buf
            c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
            c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
            // 元素包含指针
            c = new(hchan)
            c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    ...
    return c
}
              

创建 channel 的逻辑主要分为三大块:

  • 当前 channel 不存在缓冲区或者元素大小为 0 的情况下,就会调用 mallocgc 方法分配一段连续的内存空间。

  • 当前 channel 存储的类型不存指针时,就会分配一段大小为hchanSize+mem连续的内存空间。

  • 包含指针的情况下,会为channel 和 底层数组分别分配内存空间。

向channel发送数据

发送数据完成的是如下的翻译过程:

ch <- v => chansend1(ch, v)

而本质上它会去调用更为通用的 chansend:

// go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true)
}

向nil channel发送数据

func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
    // 当向 nil channel 发送数据时,会调用 gopark
    // 而 gopark 会将当前的 Goroutine 休眠,从而发生死锁崩溃
    if c == nil {
        if !block {
                return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan)
        throw("unreachable")
    }

    ...
}
              

在这个部分中,我们可以看到,如果一个 Channel 为零值(比如没有初始化),这时候的发送操作会暂止当前的 Goroutine(gopark)。 而 gopark 会将当前的 Goroutine 休眠,从而发生死锁崩溃。

向关闭的channel发送数据

func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
    ...
    lock(&c.lock)
    
    
    // 持有锁之前我们已经检查了锁的状态,
    // 但这个状态可能在持有锁之前、该检查之后发生变化,
    // 因此还需要再检查一次 channel 的状态
    if c.closed != 0 { // 不允许向已经 close 的 channel 发送数据
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
              
              

向关闭的channel发送数据会触发panic。

直接发送到阻塞的goroutine


// 1. channel 上有阻塞的接收方,直接发送
if sg := c.recvq.dequeue(); sg != nil {
    send(c, sg, ep, func() { unlock(&c.lock) })
    return true
}
              

发送时如果recvq有阻塞的goroutine,则直接将数据复制到goroutine的栈上。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {
    ...
    
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }

    gp := sg.g
    unlockf() // unlock(&c.lock)
    gp.param = unsafe.Pointer(sg)
    ...
    // 复始一个 Goroutine,放入调度队列等待被后续调度
    goready(gp) // 将 gp 作为下一个立即被执行的 Goroutine
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    dst := sg.elem
    ... // 为了确保发送的数据能够被立刻观察到,需要写屏障支持,执行写屏障,保证代码正确性
    memmove(dst, src, t.size) // 直接写入接收方的执行栈!
}
                         

sg.elem指向goroutine的栈空间,sendDirect函数直接将数据复制到接收方的栈上。goready作用是唤醒阻塞的goroutine。

发送到channel缓存中


    // 2. 判断 channel 中缓存是否有剩余空间
    if c.qcount < c.dataqsiz {
        // 有剩余空间,存入 c.buf
        qp := chanbuf(c, c.sendx)
        ...
        typedmemmove(c.elemtype, qp, ep) // 将要发送的数据拷贝到 buf 中
        c.sendx++
        
        if c.sendx == c.dataqsiz { // 如果 sendx 索引越界则设为 0
                c.sendx = 0
      
        }
        
        c.qcount++ // 完成存入,记录增加的数据,解锁
        unlock(&c.lock)
        return true
    }
    
    if !block {
        unlock(&c.lock)
        return false
    }

    ...
              

如果channel的buf中还有空间,那么会将数据发送到buf中。

阻塞添加到sendq


         // 3. 阻塞在 channel 上,等待接收方接收数据
        gp := getg()
        mysg := acquireSudog()
        ...
        c.sendq.enqueue(mysg)
        gopark(chanparkcommit, unsafe.Pointer(&c.lock)) // 将当前的 g 从调度队列移出

        // 因为调度器在停止当前 g 的时候会记录运行现场,当恢复阻塞的发送操作时候,会从此处继续开始执行
        ...
        gp.waiting = nil
        gp.activeStackChans = false
        if gp.param == nil {
                if c.closed == 0 { // 正常唤醒状态,Goroutine 应该包含需要传递的参数,
                                   // 但如果没有唤醒时的参数,且 channel 没有被关闭,则为虚假唤醒
                        throw("chansend: spurious wakeup")
                }
                panic(plainError("send on closed channel"))
        }
        gp.param = nil
        ...
        mysg.c = nil // 取消与之前阻塞的 channel 的关联
        releaseSudog(mysg) // 从 sudog 中移除
        return true
}
              

如果发生阻塞则将当前goroutine添加到阻塞队列中,并将挂起goroutine。当goroutine唤起后还要判断channel是否被关闭,如果关闭后则panic。

发送数据流程图

向一个channel中写数据简单过程如下:

  1. 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程;

  2. 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;

  3. 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒;

简单流程图如下:

image

从channel接收数据

接收数据主要是完成以下翻译工作:

v <- ch      =>  chanrecv1(ch, v)
v, ok <- ch  =>  ok := chanrecv2(ch, v) 

他们的本质都是调用 chanrecv:

// go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

// go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)       
    return
}

接收nil channel

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    // nil channel,同 send,会导致两个 Goroutine 的死锁
    if c == nil {
        if !block {
                return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan)
        throw("unreachable")
    }
    // ...
              

接收 nil channel 会将当前G挂起,永远阻塞,造成死锁。

接收关闭的channel


     lock(&c.lock)

    // 1. channel 已经 close,且 channel 中没有数据,则直接返回
    if c.closed != 0 && c.qcount == 0 {
          
        ...
        unlock(&c.lock)
        if ep != nil {
                typedmemclr(c.elemtype, ep)
        }
        
        return true, false
    }
              

接收关闭的channel,并且缓存里面没有数据,这时只会接收到零值。

接收发送队列里的数据

    // 2. channel 上有阻塞的发送方,直接接收
    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) })
        return true, true
    }
          

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 {
        ...
        if ep != nil {
            // 直接从对方的栈进行拷贝
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // 从缓存队列拷贝
        qp := chanbuf(c, c.recvx)
        ...
        // 从队列拷贝数据到接收方
        if ep != nil {
                typedmemmove(c.elemtype, ep, qp)
        }
        // 从发送方拷贝数据到队列
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
                c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    ...
    goready(gp, skip+1)
}
                
  1. 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,最后把G唤醒,结束读取过程;

  2. 如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,结束读取过程;

接收buf里的数据


    // 3. channel 的 buf 不空
    if c.qcount > 0 {
        // 直接从队列中接收
        qp := chanbuf(c, c.recvx)
        ...
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
                c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

    if !block {
        unlock(&c.lock)
        return false, false
    }
              

ep是接收数据的目标地址,如果ep不为nil说明没有忽略接收的数据,接收时需要将数据复制到目标地址。

没有数据挂起


    // 4. 没有数据可以接收,阻塞当前 Goroutine
    gp := getg()
    mysg := acquireSudog()
    ...
    c.recvq.enqueue(mysg)  // 加入到recvq队列中
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive) // 挂起
              

唤起时逻辑


    ...
    // 被唤醒
    gp.waiting = nil
    ...
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}

读取流程

从一个channel读数据简单过程如下:

  1. 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,最后把G唤醒,结束读取过程;

  2. 如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,结束读取过程;

  3. 如果缓冲区中有数据,则从缓冲区取出数据,结束读取过程;

  4. 将当前goroutine加入recvq,进入睡眠,等待被写goroutine唤醒;

简单流程图如下:

image

关闭channel

关闭 Channel 主要是完成以下翻译工作:

 close(ch) => closechan(ch)

具体的实现中:

  1. 首先对 Channel 上锁

  2. 而后依次将阻塞在 Channel 的 g 添加到一个 gList 中

  3. 当所有的 g 均从 Channel 上移除时

  4. 可释放锁,并唤醒 gList 中的所有接收方和发送方

加锁关闭


func closechan(c *hchan) {
    if c == nil { // close 一个空的 channel 会 panic
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    if c.closed != 0 { // close 一个已经关闭的的 channel 会 panic
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    ...
    c.closed = 1
              

释放recvq


    var glist gList

    // 释放所有的接收方
    for {
        sg := c.recvq.dequeue()
        if sg == nil { // 队列已空
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem) // 清零
            sg.elem = nil
        }
        ...
        gp := sg.g
        gp.param = nil
        ...
        glist.push(gp)
    }

channel 关闭后所有的接受者都会收到相应的零值,并且被唤起。

释放sendq


    // 释放所有的发送方
    for {
            sg := c.sendq.dequeue()
            if sg == nil { // 队列已空
                    break
            }
            sg.elem = nil
            ...
            gp := sg.g
            gp.param = nil
            ...
            glist.push(gp)
    }
    // 释放 channel 的锁
    unlock(&c.lock)
              

唤起所有G


    // 就绪所有的 G
    for !glist.empty() {
            gp := glist.pop()
            gp.schedlink = 0
            goready(gp, 3)
    }
}
              

关闭 channel 后,对于等待接收者而言,会收到一个相应类型的零值。对于等待发送者,会直接 panic。所以,在不了解 channel 还有没有接收者的情况下,不能贸然关闭 channel。

Channel 操作总结

总结一下操作 channel 的结果:

操作 nil channel closed channel not nil, not closed channel close panic panic 正常关闭 读 <- ch 阻塞 读到对应类型的零值 阻塞或正常读取数据。缓冲型 channel 为空或非缓冲型 channel 没有等待发送者时会阻塞 写 ch <- 阻塞 panic 阻塞或正常写入数据。非缓冲型 channel 没有等待接收者或缓冲型 channel buf 满时会被阻塞

发生 panic 的三种情

  1. 向一个关闭的 channel 进行写操作;

  2. 关闭一个 nil 的 channel;

  3. 重复关闭一个 channel。

发生阻塞的三种情况

  1. 读一个 nil channel 都会被阻塞。

  2. 写一个 nil channel 都会被阻塞。

  3. 读写缓冲为空或者已经满了且没有挂起的接收者和发送者。

channel的应用

停止信号

channel 用于停止信号的场景还是挺多的,经常是关闭某个 channel 或者向 channel 发送一个元素,使得接收 channel 的那一方获知道此信息,进而做一些其他的操作。

package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)
    
    // ...
    const MaxRandomNumber = 100000
    const NumSenders = 1000
    
    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(1)
    
    // ...
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel.
        // Its sender is the receiver of channel dataCh.  
        // Its reveivers are the senders of channel dataCh.
    
    // senders
    for i := 0; i < NumSenders; i++ {
        go func() {
            for {
                value := rand.Intn(MaxRandomNumber)
                
                select {
                case <- stopCh:
                    return
                case dataCh <- value:
                }
            }
        }()
    }
    
    // the receiver
    go func() {
        defer wgReceivers.Done()
        
        for value := range dataCh {
            if value == MaxRandomNumber-1 {
                // the receiver of the dataCh channel is
                // also the sender of the stopCh cahnnel.
                // It is safe to close the stop channel here.
                close(stopCh)
                return
            }
            
            log.Println(value)
        }
    }()
    
    // ...
    wgReceivers.Wait()
}    

任务定时

与 timer 结合,一般有两种玩法:实现超时控制,实现定期执行某个任务。

有时候,需要执行某项操作,但又不想它耗费太长时间,上一个定时器就可以搞定:

select {
    case <-time.After(100 * time.Millisecond):
    case <-s.stopc:
            return false
}           

等待 100 ms 后,如果 s.stopc 还没有读出数据或者被关闭,就直接结束。这是来自 etcd 源码里的一个例子,这样的写法随处可见。

定时执行某个任务,也比较简单:

func worker() {
    ticker := time.Tick(1 * time.Second)
    for {
        select {
        case <- ticker:
            // 执行定时任务
            fmt.Println("执行 1s 定时任务")
        }
    }
}
              

每隔 1 秒种,执行一次定时任务。

解耦生产方和消费方

服务启动时,启动 n 个 worker,作为工作协程池,这些协程工作在一个 for {} 无限循环里,从某个 channel 消费工作任务并执行:

func main() {
    taskCh := make(chan int, 100)
    go worker(taskCh)

    // 塞任务
    for i := 0; i < 10; i++ {
        taskCh <- i
    }

    // 等待 1 小时 
    select {
    case <-time.After(time.Hour):
    }
}

func worker(taskCh <-chan int) {
    const N = 5
    // 启动 5 个工作协程
    for i := 0; i < N; i++ {
        go func(id int) {
            for {
                task := <- taskCh
                fmt.Printf("finish task: %d by worker %d\n", task, id)
                time.Sleep(time.Second)
            }
        }(i)
    }
}
              

5 个工作协程在不断地从工作队列里取任务,生产方只管往 channel 发送任务即可,解耦生产方和消费方。

程序输出:

finish task: 1 by worker 4
finish task: 2 by worker 2
finish task: 4 by worker 3
finish task: 3 by worker 1
finish task: 0 by worker 0
finish task: 6 by worker 0
finish task: 8 by worker 3
finish task: 9 by worker 1
finish task: 7 by worker 4
finish task: 5 by worker 2          

控制并发数

有时需要定时执行几百个任务,例如每天定时按城市来执行一些离线计算的任务。但是并发数又不能太高,因为任务执行过程依赖第三方的一些资源,对请求的速率有限制。这时就可以通过 channel 来控制并发数。

var limit = make(chan int, 3)

func main() {
    // …………
    for _, w := range work {
        go func() {
            limit <- 1
            w()
            <-limit
        }()
    }
    // …………
}
              

构建一个缓冲型的 channel,容量为 3。接着遍历任务列表,每个任务启动一个 goroutine 去完成。真正执行任务,访问第三方的动作在 w() 中完成,在执行 w() 之前,先要从 limit 中拿“许可证”,拿到许可证之后,才能执行 w(),并且在执行完任务,要将“许可证”归还。这样就可以控制同时运行的 goroutine 数。

参考

https://mp.weixin.qq.com/s/V7B9q4lZ6K3RjrP8Xd-cXw

https://mp.weixin.qq.com/s/rFyMNeZF_cwKkBBJhHlEFQ

https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/