理解 Go channel

此篇介绍 go channel

什么是 goroutine

They’re called goroutines because the existing terms — threads, coroutines, processes, and so on — convey inaccurate connotations. A goroutine has a simple model: it is a function executing in parallel with other goroutines in the same address space. It is lightweight, costing little more than the allocation of stack space. And the stacks start small, so they are cheap, and grow by allocating (and freeing) heap storage as required.

正如官方所言,goroutine 是一个轻量级的执行单元,相比线程开销更小,完全由 Go 语言负责调度,是 Go 支持并发的核心。开启一个 goroutine 非常简单:

1
2
3
4
5
6
7
8
9
10
11
package main
import (
"fmt"
"time"
)
func main() {
go fmt.Println("goroutine message")
time.Sleep(1) //1
fmt.Println("main function message")
}

#1 的代码是必须的,这是为了让新开启的 goroutine 有机会得到执行,开启一个 goroutine 之后,后续的代码会继续执行,在上面的例子中后续代码执行完毕程序就终止了,而开启的 goroutine 可能还没开始执行。

如果尝试去掉 #1 处的代码,程序也可能会正常运行,这是因为恰巧开启的 goroutine 只是简单的执行了一次输出,如果 goroutine 中耗时稍长就会导致只能看到主一句 main function message

换句话话说,这里的 time.sleep 提供的是一种调度机制,这也是 Go 中 channel 存在的目的:负责消息传递和调度。

Channel

Channel 是 Go 中为 goroutine 提供的一种通信机制,借助于 channel 不同的 goroutine 之间可以相互通信。channel 是有类型的,而且有方向,可以把 channel 类比成 unix 中的 pipe。Go 通过 <- 操作符来实现 channel 的写和读,send value <- 在 channel 右侧,receive value <- 在左侧,receive value 不赋值给任何变量是合法的。

1
2
3
4
5
i := make(chan int)//int 类型
s := make(chan string)//字符串类型
r := make(<-chan bool)//只读
w := make(chan<- []int)//只写
<- w //合法的语句

Channel 最重要的作用就是传递消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main
import (
"fmt"
)
func main() {
c := make(chan int)
go func() {
fmt.Println("goroutine message")
c <- 1 //1
}()
<-c //2
fmt.Println("main function message")
}

例子中声明了一个 int 类型的 channel,在 goroutine 中在代码 #1 处向 channel 发送了数据 1 ,在 main 中 #2 处等待数据的接收,如果 c 中没有数据,代码的执行将发生阻塞,直到有 goroutine 开始往 c 中 send value。

这是 channel 最简单的用法之一:同步,这种类型的 channel 容量是 0,称之为 unbuffered channel

Unbuffered channel

Channel 可以设置容量,表示 channel 允许接收的消息个数,默认的 channel 容量是 0 称为 unbuffered channel ,对 unbuffered channel 执行 操作 value := <-ch 会一直阻塞直到有数据可接收,执行 操作 ch <- value 也会一直阻塞直到有 goroutine 对 channel 开始执行接收,正因为如此在同一个 goroutine 中使用 unbuffered channel 会造成 deadlock。

1
2
3
4
5
6
7
8
9
10
11
package main
import (
"fmt"
)
func main() {
c := make(chan int)
c <- 1
<-c
fmt.Println("main function message")
}

执行报 fatal error: all goroutines are asleep - deadlock! ,读和写相互等待对方从而导致死锁发生。

来自 www.goinggo.net

Buffered channel

如果 channel 的容量不是 0,此类 channel 称之为 buffered channel ,buffered channel 在消息写入个数 未达到容量的上限之前不会阻塞 ,一旦写入消息个数超过上限,下次输入将会阻塞,直到 channel 有位置可以再写入。

来自 www.goinggo.net

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main
import (
"fmt"
)
func main() {
c := make(chan int, 3)
go func() {
for i := 0; i < 4; i++ {
c <- i
fmt.Println("write to c ", i)
}
}()
for i := 0; i < 4; i++ {
fmt.Println("reading", <-c)
}
}

上面的例子会输出:

1
2
3
4
5
6
7
8
write to c 0
reading 0
write to c 1
reading 1
write to c 2
reading 2
write to c 3
reading 3

根据上文对 buffered channel 的解释,这个例子中 channel c 的容量是 3,在写入消息个数不超过 3 时不会阻塞,输出应该是:

1
2
3
4
5
6
7
8
write to c 0
write to c 1
write to c 2
reading 0
reading 1
reading 2
write to c 3
reading 3

问题在哪里?问题其实是在 fmt.Println ,一次输出就导致 goroutine 的执行发生了切换(相当于发生了 IO 阻塞),因而即使 c 没有发生阻塞 goroutine 也会让出执行,一起来验证一下这个问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main
import (
"fmt"
"strconv"
)
func main() {
c := make(chan int, 3)
s := make([]string, 8)
var num int = 0
go func() {
for i := 0; i < 4; i++ {
c <- i
num++
v := "inner=>" + strconv.Itoa(num)
s = append(s, v)
}
}()
for i := 0; i < 4; i++ {
<-c
num++
v := "outer=>" + strconv.Itoa(num)
s = append(s, v)
}
fmt.Println(s)
}

这里创建了一个 slice 用来保存 c 进行写入和读取时的执行顺序,num 是用来标识执行顺序的,在没有加入 Println 之前,最终 s 是 [inner=>1 inner=>2 inner=>3 inner=>4 outer=>5 outer=>6 outer=>7 outer=>8] ,输出结果表明 c 达到容量上线之后才会发生阻塞。

相反有输出语句的版本结果则不同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main
import (
"fmt"
"strconv"
)
func main() {
c := make(chan int, 3)
s := make([]string, 8)
var num int = 0
go func() {
for i := 0; i < 4; i++ {
c <- i
num++
v := "inner=>" + strconv.Itoa(num)
s = append(s, v)
fmt.Println("write to c ", i)
}
}()
for i := 0; i < 4; i++ {
num++
v := "outer=>" + strconv.Itoa(num)
s = append(s, v)
fmt.Println("reading", <-c)
}
fmt.Println(s)
}

[outer=>1 inner=>2 outer=>3 inner=>4 inner=>5 inner=>6 outer=>7 outer=>8] 输出结果能表明两个 goroutine 是交替执行,也就是说 IO 的调用 Println 导致 goroutine 的让出了执行。

使用 select 读取多个 channel

Go 提供了 select 语句来处理多个 channel 的消息读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main
import (
"fmt"
"time"
)
func main() {
c1 := make(chan string)
c2 := make(chan string)
go func() {
for {
c1 <- "from 1"
time.Sleep(time.Second * 2)
}
}()
go func() {
for {
c2 <- "from 2"
time.Sleep(time.Second * 2)
}
}()
go func() {
for {
select {
case msg1 := <-c1:
fmt.Println(msg1)
case msg2 := <-c2:
fmt.Println(msg2)
}
}
}()
var input string
fmt.Scanln(&input)
}

select 语句可以从多个可读的 channel 中随机选取一个执行,注意是 随机选取。

关闭 Channel

Channel 可以被关闭 closechannel 关闭之后仍然可以读取,但是向被关闭的 channel send 会 panic。如果 channel 关闭之前有值写入,关闭之后将依次读取 channel 中的消息,读完完毕之后再次读取将会返回 channel 的类型的 zero value:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main
import (
"fmt"
)
func main() {
c := make(chan int, 3)
go func() {
c <- 1
c <- 2
c <- 3
close(c)
}()
fmt.Println(<-c)
fmt.Println(<-c)
fmt.Println(<-c)
fmt.Println(<-c)
fmt.Println(<-c)
fmt.Println(<-c)
}

输出 1 2 3 0 0 0 ,0 是 int channel c 的 zero value。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main
import (
"fmt"
)
func main() {
c := make(chan int, 3)
go func() {
c <- 1
c <- 2
c <- 3
close(c)
}()
for i := range c {
fmt.Println(i)
}
}

c 可以进行 range 迭代,如果 channel 没有被关闭 range 会一直等待 channel,但是关闭 channel 之后可以隐式的中断 range 的迭代

判断 channel 的关闭

Go 提供了 ok 表达式来判断 channel 的关闭状态。

1
value, ok <- c

如果 channel 是关闭状态,ok 是 false,value 是 channel 的 zero value,否则 ok 是 true 表示 channel 未关闭,value 表示 channel 中的值。

参考资料

三月沙 wechat
扫描关注 wecatch 的公众号