Go Channel
数据存储
go/src/runtime/chan.go中,chan的底层结构为 hchan
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// hchan是Channel的底层结构体
type hchan struct {
qcount uint // 队列中元素数量
dataqsiz uint // 环形队列容量
buf unsafe.Pointer // 环形队列缓冲区
elemsize uint16 // 元素大小
closed uint32 // 是否关闭
elemtype *_type // 元素类型信息
sendx uint // 发送操作的指针位置
recvx uint // 接收操作的指针位置
recvq waitq // 等待接收的goroutine队列
sendq waitq // 等待发送的goroutine队列
// 核心:Channel的互斥锁,保护所有字段
lock mutex
}
- buf unsafe.Pointer:指向实际缓冲区的一个指针,缓冲区是一个数组。并且是环形数组。
- sendx uint:发送索引,指明下次发送数据应进缓冲区的哪一位置。
- recvx uint:接收索引,指明下次接受数据应取缓冲区的哪一位置。
下图中s代表 sendx发送索引;r代表 recvx接收索引。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
buf [][][][] //初始状态,dataqsiz为4
r
s
buf [1][][][] //ch<-1, sendx = (sendx+1)%dataqsiz
r s
buf [1][2][][] //ch<-2, sendx = (sendx+1)%dataqsiz
r s
buf [1][2][3][] //ch<-3, sendx = (sendx+1)%dataqsiz
r s
buf [1][2][3][4] //ch<-4, sendx = (sendx+1)%dataqsiz !阻塞
r
s
buf [][2][3][4] // <-ch,取1 recvx = (recvx+1)%dataqsiz
s r
buf [5][2][3][4] //ch<-5, sendx = (sendx+1)%dataqsiz !阻塞
r
s
注意到:recvx==sendx时阻塞。buf全空则读阻塞,全满则写阻塞。
实际上,gochan对于环形数组索引的约束,不采用%取模运算,而是
if index==dataqsiz {index =0}
具体每一次存取到底操作哪一地址,由 chanbuf函数决定,这个函数很简单只有一行。
1
2
3
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
我们在使用 ch := make(chan int,4)时,ch就是一个指向buf的指针,ch地址值 + i * 元素大小 可得出元素地址值。
并发安全
1
2
3
4
5
6
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//....
lock(&c.lock)
//... 执行写操作
unlock(&c.lock)
hchan.lock 的类型为 mutex,但这里的 mutex 并非标准库 sync.Mutex,而是 Go 运行时内部的 runtime.mutex(定义在 src/runtime/mutex.go),是专门为运行时高频、短持有时间场景定制的极简自旋锁。
runtime.mutex 和标准库 sync.Mutex 核心逻辑差异极大:runtime.mutex 是纯自旋锁(无阻塞、无等待队列),而 sync.Mutex 是「自旋 + 信号量阻塞」的混合锁;runtime.mutex 舍弃了复杂的阻塞唤醒逻辑,仅保留核心的原子操作,以适配运行时无 GC 干扰、锁持有时间极短的场景。核心结构和流程如下:
1. 核心状态(简化版,贴合 Go 源码)
1 2 3 4 5 6 7 // 定义在 src/runtime/mutex.go type mutex struct { // 仅用 0/1 表示锁状态,无等待者标记/计数: // 0: 未锁定 // 1: 锁定 key uint32 }2. 加锁(
lock(&c.lock))流程Channel 的
lock()调用最终映射到runtime.mutex的lock()方法,核心是 “CAS 抢锁 + 轻量自旋”,无阻塞、无等待队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func lock(l *mutex) { // 步骤1:快速尝试 CAS 加锁(无竞争时直接成功) if atomic.CompareAndSwapUint32(&l.key, 0, 1) { return } // 步骤2:有竞争时轻量自旋(CPU 空转),循环尝试 CAS 抢锁 // 自旋仅执行 PAUSE 指令让出 CPU 周期,不休眠 Goroutine for { // procyield(30):执行 30 次 PAUSE 指令,空转等待锁释放 procyield(30) // 再次尝试 CAS 抢锁,成功则退出循环 if atomic.CompareAndSwapUint32(&l.key, 0, 1) { return } } }3. 解锁(
unlock(&c.lock))流程解锁逻辑极简,仅释放锁,无任何唤醒操作(因为没有等待队列):
1 2 3 4 func unlock(l *mutex) { // 直接将锁状态置为未锁定,无需判断等待者(无等待队列) atomic.StoreUint32(&l.key, 0) }
Channel 的所有读写操作(ch<-/<-ch)、关闭操作(close(ch))都必须先加锁,核心保护以下临界资源,避免多 Goroutine 并发访问导致的数据竞争:
- 环形缓冲区的读写指针(
sendx/recvx):防止多 Goroutine 同时修改指针,导致缓冲区读写位置错乱; - 队列元素计数(
qcount):保证缓冲区中元素数量统计的准确性; - 等待队列(
recvq/sendq):避免在唤醒等待 Goroutine 时,出现重复唤醒、漏唤醒或队列操作异常; - 关闭标记(
closed):防止在发送 / 接收数据的同时关闭 Channel,导致程序 panic 或数据读写异常。
阻塞机制
1
2
3
4
5
6
7
8
9
10
┌─────────┐
│ M │ ----> 绑定 ──> ┌─────────┐
│ 线程 │ │ P │
└─────────┘ │ 逻辑处理器│
│ └─────────┘
▼ │ 维护本地可运行G队列
G0 ▼
┌───G───G───G───G───┐
│ 协程1 协程2 协程3 │
└────────────────────┘
上述为GMP调度场景,现在M所绑定执行的协程G0正在读写channel而被阻塞。
| question | answer |
|---|---|
| 谁来阻塞Go | 调度器将G0从runable转化为waiting |
| G0阻塞完放在什么地方 | sendq|recvq保存waitting态的协程 |
| G0 什么时候被唤醒 | buf不全满|buf不全空时唤醒 |
| 谁来唤醒Go | 调度器runtime.schedule调用goready(),将G0从waiting转为runable |
读取优化
1
2
3
4
5
6
// Channel 初始状态
buf [][][][] // 缓冲区,dataqsiz=4(容量4)
r
s
recvq // 读协程阻塞队列(存储 sudog 节点)
sendq // 写协程阻塞队列(本场景暂未用到)
- 协程 G1 执行读操作
x <- ch,此时 Channel 无数据、缓冲区为空 → G1 读阻塞。 - 调度器 P 处理:将 G1 从 runnable(就绪态) 转为 waiting(等待态),并存入
recvq(读阻塞队列)。 -
队列节点结构:每个节点不仅存阻塞协程 G,还存协程读写的「目标变量地址」
示例:(G1 的接收变量 x 是其私有栈空间的局部变量)
1
recvq <— [G1 | &x] <— [...]
现在举个例子来说明gochannel是如何利用上述recvq的结构实现读写优化的
1
2
3
4
5
6
go func(){
x <-ch //G1. x是局部变量,在G1的栈空间
}()
go func(){
ch<-1 //G2 G2会把栈空间中的1 ,直接值拷贝给G1栈空间的x
}()
正常流程(未优化)
G2 执行 ch <- 1 → 数据写入 buf → 唤醒 G1 → G1 抢锁 → 从 buf 取值 → G1 转为 runnable。
优化后流程(核心)
G2 执行 ch <- 1 时,数据不写入 buf,直接传递给 G1 的 x,避免锁竞争和缓冲区操作:
1
2
3
4
5
6
7
8
9
10
11
12
【G1私有栈】 【Channel】 【G2私有栈】
x(&x) recvq:[G1|&x] 1(&1)
│ │ │
1. │──G1阻塞入队───────>│ │
│ │ │
2. │ │<──G2发起写操作────│
│ │ │
3. │<──直接拷贝1→&x─────│──值拷贝(&1→&x)──>│
│ │ │
4. │──G1唤醒(runnable)─>│ │
│ │ │
5. │──G1直接用x(值=1)──>│ (无需抢锁/读buf) │
优化实现
- 协程栈空间特性:G1 和 G2 均有独立私有栈空间,x 属于 G1 栈,1 属于 G2 栈。
- 数据传递本质:G2 通过
recvq头节点获取 G1 的「x 地址(&x)」,将自身栈中的「1」直接 值拷贝 到 G1 栈的 x 地址中。 - 调度器动作:G2 完成拷贝后,调度器唤醒 G1 → 将 G1 从 waiting 转为 runnable,放入 P 的就绪队列,等待调度执行。
- 核心优势:G1 无需抢锁、无需读取缓冲区,直接使用已赋值的 x,提升效率。
总结
- 优化核心:利用
recvq存储的「变量地址」,实现跨协程栈的直接数据拷贝,跳过缓冲区和锁竞争。 - 队列节点作用:
recvq的 sudog 节点(存 G + 变量地址)是实现该优化的核心数据结构支撑。 - 数据流向:G2 栈的 1 → 直接拷贝 → G1 栈的 x(无需经过 buf)。
源码
go/src/runtime/chan.go line 312
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 1. 同步测试校验(synctest 相关,非核心逻辑)
if c.synctest && sg.g.syncGroup != getg().syncGroup {
unlockf()
panic(plainError("send on synctest channel from outside bubble"))
}
// 2. 竞态检测(raceenabled 开启时的模拟缓冲区操作,非核心)
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// 即使直接拷贝,也模拟缓冲区的 head/tail 偏移(仅竞态检测用)
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx
}
}
// 3. 【核心重点】直接值拷贝(跳过缓冲区的关键)
if sg.elem != nil { // sg.elem 是接收方(如G1)的变量地址(&x)
// sendDirect:将发送方的 ep(如G2的&1)拷贝到 sg.elem(G1的&x)
sendDirect(c.elemtype, sg, ep)
sg.elem = nil // 拷贝完成后清空,避免重复操作
}
// 4. 唤醒接收协程的收尾操作
gp := sg.g // 获取阻塞的接收协程(如G1)
unlockf() // 解锁 channel(无需操作缓冲区,提前解锁)
gp.param = unsafe.Pointer(sg) // 设置协程参数
sg.success = true // 标记发送成功
if sg.releasetime != 0 {
sg.releasetime = cputicks() // 记录唤醒时间(性能统计)
}
goready(gp, skip+1) // 唤醒接收协程(G1),转为 runnable 态
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem // 取接收方地址(如G1的&x)
// 内存屏障:保证多CPU下拷贝的原子性/可见性
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
memmove(dst, src, t.Size_) // 核心:src(G2的&1) → dst(G1的&x) 字节级拷贝
}
Closed Channel
当你尝试向一个已经关闭的通道写入数据时,Go 运行时会直接触发 panic,错误信息通常为 send on closed channel。这是一种运行时错误,会导致当前协程终止,如果没有被 recover 捕获,还会导致整个程序崩溃。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int)
close(ch)
go func() {
defer func() {
// 捕获 panic 并打印信息
if err := recover(); err != nil {
fmt.Printf("协程写入时触发 panic: %v\n", err)
}
}()
ch <- 10 // 向已关闭的通道写数据
}()
time.Sleep(100 * time.Millisecond)
fmt.Println("程序正常退出")
}
1
2
协程写入时触发 panic: send on closed channel
程序正常退出
关键说明:无论通道是有缓冲还是无缓冲,写入已关闭的通道都会触发 panic,这是 Go 语言的硬性规则,目的是避免向已终止的通信管道写入无效数据。
从已关闭的通道读取数据不会触发 panic,行为分为两种场景,核心规则是:先读完通道中已缓冲的数据,再读则返回对应类型的零值,同时第二个返回值(ok)为 false。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main
import "fmt"
func main() {
ch := make(chan int, 1)
ch <- 5
close(ch)
// 第一个读取:获取通道中已有的数据
val1, ok1 := <-ch
fmt.Printf("第一次读取:值=%d, ok=%t\n", val1, ok1) // 输出:值=5, ok=true
// 第二个读取:通道已空,返回零值和 false
val2, ok2 := <-ch
fmt.Printf("第二次读取:值=%d, ok=%t\n", val2, ok2) // 输出:值=0, ok=false
// 启动协程读取已关闭的空通道
go func() {
val, ok := <-ch
fmt.Printf("协程读取:值=%d, ok=%t\n", val, ok) // 输出:值=0, ok=false
}()
// 等待协程执行
fmt.Scanln()
}
补充:
- 对于有缓冲通道:关闭后仍能按顺序读取完缓冲区内的所有数据,每一次读取的
ok都是true;当缓冲数据读完后,后续所有读取都会返回零值 +false。 - 对于无缓冲通道:如果关闭前没有数据写入,关闭后第一次读取就会返回零值 +
false。 - 即使是在协程中读取已关闭的通道,行为和主协程完全一致,不会有任何异常。
总结
- 向已关闭的通道写入数据:必然触发
send on closed channelpanic,需通过提前判断或避免关闭后写入来规避。 - 从已关闭的通道读取数据:安全且不 panic,先读取通道内剩余数据(ok=true),数据读完后返回对应类型零值(ok=false)。
- 通道关闭的核心原则:关闭操作应由写端发起,读端通过
ok判断通道是否关闭,避免写端误写已关闭通道引发 panic。