[Go 入门] 第十一章 通道简介

Go 入门系列参考于互联网资料与 人民邮电出版社 《Go 语言入门经典》 与 《Effective Go》,编写目的在于学习交流,如有侵权,请联系删除

通道能够管理 Goroutine 之间的通信, 通道和 Goroutine 提供了一个受控的环境, 能够开发并发软件, 本文内容:

使用通道

如果说 Goroutine 是一种支持并发编程的方式,那么通道就是一种 Goroutine 通信的方式。通道让数据能够进入和离开 Goroutine,可方便 Goroutine 之间进行通信。《Effective Go》有一句话很好地说明了Go语言的并发实现理念:

不要通过共享内存来通信,而通过通信来共享内存

这说明了Go语言并发实现方式的不同之处,这部分有必须要做进一步解释。在其他编程语言中,并发编程通常是通过在多个进程或线程之间共享内存实现的。共享内存能够让程序同步,确保程序以合乎逻辑的方式执行。在程序执行过程中,进程或线程可能对共享内存加锁,以禁止其他进程或者线程修改它。这合乎情理,因为如果在操作期间共享内存被其他进程修改,可能会引发 bug 或导致程序崩溃。通过这种方式给内存加锁,可确保它是独占的——只有一个进程或线程能够访问它。

如:两个人持有一个联名的账户,他们要同时从这个账户支付费用,但这两笔交易的总额超过了账户余额。如果两个交易同时进行且不加锁,则余额检查可能表明资金充足,但实际上资金不够;然而,如果第一个交易将账户加锁,则直到交易完成,都可以避免这样情况的发生。对于简单的并发而言,这种方法看似合理,但如果联名账户有20个持有人,且他们经常使用这个账户进行交易呢?这种情况下,加锁管理工作可能会更加复杂

共享内存和锁的管理并非那么容易,很多编程语言要求程序员对内存和内存管理有深入认识。即便是久经沙场的程序员也会遇到这样的情形:找出进程或者线程争用共享内存而引发的bug,需要花费数天的时间。在使用共享内存的并发环境中,如果不能始终知道程序哪部分将先更新数据,那么将难以推断其中发生的情况。

虽然使用共享内存有其用武之地,但是Go语言使用通道在 Goroutine 之间收发消息,避免了使用共享内存。严格地说,Goroutine 并不是线程,但您可将其视为线程,因为它们能够以非阻塞方式执行代码。在前面关于两人持有一个联合账户的例子中,如果使用 Goroutine,将在账户持有人之间打开一个通道,让它们能够通信并采取相应的措施。例如,一个交易可能向通道发送一条消息,而通道可能限制后续交易或另一个账户持有人的行为。通过收发消息,使得能够以推送方式协调并发事件。事件发生时,可将触发的消息推送给接收者。使用共享内存时,程序必须检查共享内存。在变化频繁的并发编程环境中,很多人都认为使用消息是一种更佳的通信方式。

下面使用了 Goroutine 来执行运行缓慢的函数,以免阻塞整个程序的执行

1
2
3
4
5
6
7
8
9
10
func slowFunc() {
time.Sleep(time.Second * 2)
fmt.Println("sleeper() finished")
}

func main() {
go slowFunc()
fmt.Println("I am now show straightaway !")
time.Sleep(time.Second * 3)
}

time.Sleep(time.Second * 2)使用了一个定时器,旨在避免程序在 Goroutine 返回前退出。在介绍 Goroutine 的示例中,这样做完全可行,但在更复杂的并发程序中,使用定时器绝非好主意,为了管理 Goroutine 和并发,Go 语言提供了通道。如果能够在 Goroutine 和程序之间通信,并让 Goroutine 结束时能够告诉程序主程序就好了

通道的创建语法:
c := make(chan string)

解读如下:

  • 使用简短变量赋值,将变量c初始化为:=右边值
  • 使用内置函数 make 创建一个通道,这是使用关键字 chan 指定的
  • 关键字 chan 后面的 string 指出这个通道将用于储存字符串数据,这意味着这个通道只能用于收发字符串值

向通道发送消息的语法如下:

c <- "Hello world"

请注意其中的 <-,这将表示将右边的字符串发送给左边的通道。如果通道被指定为收发字符串,则只能向它发送字符串消息,如果向它发送其他类型的数据将导致错误

从通道那里接受消息的语法如下:

msg := <-c

从很大程序上来说,了解通道创建语法以及消息收发语法后,就大致掌握了通道的用法,现在对之前的代码进行修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import (
"fmt"
"time"
)

func slowFunc(c chan string) {
time.Sleep(time.Second * 2)
c <- "slowFunc() finished"
}

func main() {
c := make(chan string)
go slowFunc(c)

msg := <-c
fmt.Println(msg)
}

解读:

  • 创建一个储存字符串数据的通道,并将其赋值给变量 c
  • 使用一个 Goroutine 来执行函数 slowFunc
  • 函数 slowFunc 将通道当作参数
  • slowFunc 函数的单个参数指定了一个通道和字符串的数据类型
  • 声明变量 msg, 用于接受来自通道 c 的消息。这将阻塞进程直到收到消息为止,从而避免进程过早退出
  • 函数 slowFunc 执行完毕后向通道 c 发送一条消息
  • 接收并打印这条消息
  • 由于没有其他的语句,因此程序就此退出

使用缓冲通道

通常,通道收到消息后就可以将其发送给接收者,但有时候可能没有接收者。在这种情况下,可使用缓冲通道。缓冲意味着可将数据储存在通道中,等接收者准备就绪再交给它。要创建缓冲通道,可向内置函数 make 传递另一个表示缓冲区长度的参数

messages := make(chan string, 2)

这些代码创建一个可储存两条消息的缓冲通道。现在可在这个通道中添加两条消息了——虽然没有接收者。请注意,缓冲通道最多只能储存指定数量的消息,如果它发送更多的消息将导致错误

1
2
messages <- "hello"
messages <- "world"

消息将储存在储存在通道中,直到接收者准备就绪。在一个缓冲通道中添加了两条消息,并在接收者准备就绪后接收了这些消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"time"
)

func receiver(c chan string) {
for msg := range c {
fmt.Println(msg)
}
}

func main() {
messages := make(chan string, 2)
messages <- "Hello"
messages <- "World"
close(messages)
fmt.Println("Pushed two messages onto Channel with no receivers")
time.Sleep(time.Second * 1)
receiver(messages)
}

close 用来关闭通道,禁止再向通道发送消息

解读如下:

  • 创建一个长度为2的缓冲通道
  • 向通道发送两条消息。此时没有可用的接收者,因此消息被缓冲
  • 关闭通道,这意味着不能再向它发送消息
  • 程序打印一条消息,指出通道包含两条消息,随后再休眠1s
  • 将通道作为参数传递给函数 receiver
  • 函数 receiver 使用 range 迭代通道,并将通道中的缓冲消息打印到控制台

在知道需要启动多少个 Goroutine 或需要限制调度的工作量时,缓冲通道很有效

阻塞和流程控制

Goroutine 是Go语言提供的一种并发编程方式。速度缓慢的网络调用或函数会阻塞程序的执行,而 Goroutine 能够让您对此进行管理。在并发编程中,通常应避免阻塞时操作,但有时需要代码处于阻塞状态。例如,需要在后台运行的程序必须阻塞,这样才不会退出。

Goroutine 会立即返回(非阻塞),因此要让程序处于阻塞状态,必须采用一些流程控制技巧。例如,从通道接受并打印消息的查程序需要阻塞,以免终止。

给通道指定消息接收者是一个阻塞操作,因为它将阻止函数返回,直到收到一条消息为止。下面演示了阻塞和通道的一些微妙之处:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"fmt"
"time"
)

func slowFunc(c chan string) {
t := time.NewTicker(1 * time.Second)
for {
c <- "ping"
<-t.C
}
}

func main() {
messages := make(chan string)
go slowFunc(messages)
msg := <-messages
fmt.Println(msg)
}

这个程序将在打印消息 ping 后退出。收到一条消息后,阻塞操作将返回,而程序将退出。那么,如果不断监听通道中消息的监听器呢?这与通道的关系不大,而主要是与Go运行环境和执行流程有关。学习了for语句,通过使用for语句,可永久性地阻塞进程,也可让阻塞时间持续特定的迭代次数。

不断地执行指定的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"fmt"
"time"
)

func slowFunc(c chan string) {
t := time.NewTicker(1 * time.Second)
for {
c <- "ping"
<-t.C
}
}

func main() {
messages := make(chan string)
go slowFunc(messages)
for {
msg := <-messages
fmt.Println(msg)
}
}

执行后,将不断在控制台输出

1
2
3
ping
ping
ping

如果要在接收指定消息的消息后结束,则可使用包含迭代器的for语句。指定的迭代次数完成后,进程将退出。

1
2
3
4
for i := 0; i < 5; i++ {
msg := <-messages
fmt.Println(msg)
}

Goroutine 是非阻塞的,因此如果程序要阻塞,以接收大量的消息或不断地重复某个过程,必须使用其他流程控制技术

将通道用作函数参数

将通道作为参数传递给函数,并在函数中向通道发送消息。要进一步指定在函数中如何使用传入的通道,可在传递通道时,将其指定为只读、只写或读写的。指定通道是只读、只写、读写的语法差别不大

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func channelReader(messages <-chan string) {
msg := <-messages
fmt.Println(msg)
}

func channelWriter(messages chan<- string) {
messages <- "Hello world"
}

func channelReaderAndWriter(messages chan string) {
msg := <-messages
fmt.Println(msg)
messages <- "Hello world"
}

<-位于关键字 chan 左边时,表示通道在函数内是只读的; <-位于关键字 chan 右边时,表示通道在函数内是只写的;没有指定 <- 时,代表通道是可读可写的

通过指定通道访问权限,有助于确保通道中数据的完整性,还可以指定程序的那部分可向通道发送数据或接收来自通道的数据

使用 select 语句

假设有多个 Goroutine, 而程序将根据最先返回的 Goroutine 执行相应的操作,此时可使用 select 语句。select语句类似于switch语句,它为通道创建一系列的接收者,并执行最先收到消息的接收者。

1
2
3
4
5
6
7
8
9
channel1 := make(chan string)
channel2 := make(chan string)

select {
case msg1 := <-channel1:
fmt.Println("received", msg1)
case msg2 := <-channel2:
fmt.Println("received", msg2)
}

如果从通道 channel1 接受到了消息,将执行第一条 case 语句,如果从通道 channel2 接收到了消息,将执行第二条 case 语句。具体执行哪条 case 语句,取决于消息到达的时间,那条消息最先到达决定了将执行哪条 case 语句。通常,接下来收到的消息将被丢弃,收到一条消息后,select 语句将不再阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"time"
)

func ping1(c chan string) {
time.Sleep(time.Second * 1)
c <- "ping on channel1"
}

func ping2(c chan string) {
time.Sleep(time.Second * 2)
c <- "ping on channel2"
}

func main() {
channel1 := make(chan string)
channel2 := make(chan string)

go ping1(channel1)
go ping2(channel2)

select {
case msg1 := <-channel1:
fmt.Println("received", msg1)
case msg2 := <-channel2:
fmt.Println("received", msg2)
}
}

解读如下:

  • 创建两个用于存储字符串数据的通道
  • 创建两个向这些通道发送消息的函数。为模拟函数的执行速度,第一个函数休眠了1s,而第二个休眠了2s
  • 启动两个 Goroutine, 分别用于执行这些函数
  • select 语句创建了两个接收者,分别用于接收来自通道 channel 和 channel2 的消息
  • 1s 后,函数ping1返回,并向通道channel1发送一条消息
  • 收到来自通道channel1的消息后,执行第一条case语句——向终端打印一条消息
  • 整个 select 语句就此结束,不再阻塞进程,因此程序退出

从示例中可知,要根据最先收到的消息采取相应的措施,select 语句是一个不错的选择。但如果没有收到消息呢?为此可使用超时时间,这让 select 语句在指定时间后,不再阻塞,以便接着往下执行。

1
2
3
4
5
6
7
8
9
10
11
channel1 := make(chan string)
channel2 := make(chan string)

select {
case msg1 := <-channel1:
fmt.Println("received", msg1)
case msg2 := <-channel2:
fmt.Println("received", msg2)
case <- time.After(500 * time.Millisecond):
fmt.Println("no messages received. giving up.")
}

退出通道

在已知需要停止执行的时间的情况下,使用超时时间是不错的选择,但在有些情况下,不确定 select 语句该在何时返回,因此不能使用定时器。这种情况下,可使用退出通道,这种技术并非语言规范的组成部分,但可通过向管道发送消息来理解退出阻塞的 select 语句。

来看这种情形:程序需要使用 select 语句实现无限制地阻塞,但同时要求能够随时返回。通过在 select 语句中添加一个退出通道,可向退出通道发送消息来结束该语句,从而停止阻塞。可将退出通道视为阻塞式 select 语句开关。对于退出通道,可随便命名,但通道将其命名为 stop 或 quit。在下面的示例中,在 for 循环中使用了一条 select 语句,这意味着它将无限制地阻塞,并不断地接受消息。通过向通道 stop 发送消息,可让 select 语句停止阻塞:从 for 循环中返回,并继续往下执行

1
2
3
4
5
6
7
8
9
10
11
messages := make(chan string)
stop := make(chan bool)

for {
select {
case <-stop:
return
case msg := <-messages:
fmt.Println(msg)
}
}

在应用程序的某部分向通道发送消息,并要在未来的某个位置时点终止时,这种技术是很有效的。

为了提供这样的示例,我们在 Goroutine 中创建一个函数,它每隔1s向通道发送一条消息:

1
2
3
4
5
6
7
8
9
10
func sender(c chan string) {
t := time.NewTicker(1 * time.Second)
for {
c <- "I'm sending a message"
<-t.C
}
}

messages := make(chan string)
go sender(messages0

通过 for 循环中使用 select语句,可在收到消息后立即打印它。由于这是一个阻塞操作,因此将不断打印消息,直到手动终止这个过程

1
2
3
4
5
6
for {
select {
case msg := <-messages:
fmt.Println(msg)
}
}

在这个示例中,除非杀死进程,否则不能退出这个程序——它将没完没了地运行下去。通过创建一个退出通道,可让程序向这个通道发送一条消息,从而结束 for 循环

1
2
3
4
5
6
7
8
9
10
stop := make(chan bool)

for {
select {
case <- stop:
return
case msg := <-messages:
fmt.Println(msg)
}
}

这个示例中,等待一定时间后向退出通道发送了消息。但在实际工作中,具体等待多长时间可能取决于程序其他地方未知事件何时发生

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"fmt"
"time"
)

func sender(c chan string) {
t := time.NewTicker(1 * time.Second)
for {
c <- "I'm sending a message"
<-t.C
}
}

func main() {
messages := make(chan string)
stop := make(chan bool)
go sender(messages)
go func() {
time.Sleep(time.Second * 2)
fmt.Println("Time's up")
stop <- true
}()

for {
select {
case <-stop:
return
case msg := <-messages:
fmt.Println(msg)
}
}
}

运行后输出:

1
2
3
I'm sending a message
I'm sending a message
Time's up

问题列表

  • 可将通道执行多种数据类型吗

    不能。通道只有一种数据类型。您可创建任何类型的通道,因此可使用结构体来储存复杂的数据结构。

  • 在 select 语句中,如果同时从两个通道那里收到消息,结果将如何

    将随机选择并执行一条 case 语句,且只执行被选中的 case 语句

  • 关闭通道时会导致缓冲的消息丢失吗

    关闭缓冲通道意味着不能再向它发送消息。缓冲的消息会被保留,可供接收者读取