Communication and Concurrency Statements
通信与并发将在第7章中讲解,但为了完整的描述过程式编程,我们先讲解它们的基本用法。
一个goroutine是一个函数或者方法的调用在任何其它的goroutine中独立并且同时执行。每一个Go程序至少有一个goroutine, main goroutine是在main package的main()函数执行时。Goroutine非常像轻量的线程或者协程,可以被大量的创造(然而对于线程来说, 小部分的数量,将会消耗大部分的计算机资源). 所有的Goroutine共享相同的地址空间,并且提供锁原语(locking primitives)使得跨goroutine安全分享数据。可是,在Go的并发编程中,推荐使用communicate data, 而不是分享数据。
一个Go channel可以是双向或者单向的通道,用于两个或者多个goroutine之间的数据通信(发送和接收).
goroutine和channel为我们提供了轻量级的并发编程,它们不需要分享内存,更因此不需要使用到锁。尽管如此,相对于非并发编程,我们仍需要像其它并法方法,在创建和维护并发程序会面临更多的挑战。
一个goroutine的创建可以使用go statement:
go function(arguments)
go func(parameters) { block }(arguments)
我们即可以调用一个已存在的函数或者创建一个匿名函数。这个函数可以有零个或者多个参数,而如果它有参数,则参数的传递跟其它函数的调用是一样的。
函数调用后会立即开始执行,但这是在一个单独的goroutine中执行。而当前的goroutine(i.e.,有go statement)则立即从下一条语句执行。
很少会只创建goroutine,然后等待它们结束,而在会在goroutine之前通信。大部分情况下goroutine之间需要协作,为此我们需要给它们赋于通信的能力。以下是发送和接收数据
channel <- value // Blocking send
<-channel // Receive and discard
x := <-channel // Receive and store
x, ok := <-channel // As above & check for channel closed & empty
Channel的创建可以通过内置的make()函数,语法如下
make(chan Type)
make(chan Type, capacity)
如果没有指定缓存的大小(capacity), 这个channel是同步的,它会阻塞,直到发送者已经发送,而接收者已经接收。如果给定了capacity, channel是异步的,对于发送者来说,只要有未使用的capacity, 则它就可以发送,而对于接收者来说,只要channel中有数据,它就不会被阻塞。
我们用一个简单的例子来说明刚刚学到的知识,我们将写一个createCounter()函数,它返回一个channel,用于发送一个int. 第一个接收的值将是我们传递给createCounter()开始的值,而随后接收的值都是以递增1。以下我们创建了两个独立的计算器channel.
counterA := createCounter(2) // counterA is of type chan int
counterB := createCounter(102) // counterB is of type chan int
for i := 0; i < 5; i++ {
a := <-counterA
fmt.Printf("(A→%d, B→%d) ", a, <-counterB)
}
fmt.Println()
//(A→2, B→102) (A→3, B→103) (A→4, B→104) (A→5, B→105) (A→6, B→106)
我们展示了两种不同的接收方法。第一种是将接收到的值赋值给一个变量,而第二中,将接收到的值以一个参数的形式传递给一个函数。
两次调用createCounter()函数都是在main goroutine,然后在createCounter()中创建了两个goroutine(最初时阻塞). 在main gogroutine, 一旦我们想要从channel中获取数据,就立即发送,并且我们接收这个值。然后发送中的goroutine又被再次阻塞(channel capacity没有指定时,只能在接收到请求后才会发送数据,否则一直阻塞),等待新的接收请求。
如果我们不在需要counter channels时,我们只需要退出无限循环,则它们会停止发送数据,并且关闭channel.
func createCounter(start int) chan int {
next := make(chan int)
go func(i int) {
for {
next <- i
i++
}
}(start)
return next
}
这个函数接受一个起始值,并且创建一个channel用于发送和接收int. 然后在一个新的goroutine中执行匿名函数,并给它传递起始值start。这个函数有一个无限循环,它只是简单的发送,然后增加下一次要发送的数字。由于channel被创建时是zero capacity. 发送的那条语句会一直阻塞,直到从cahnnel中有一个接收请求。 这个阻塞只会影响到匿名函数所在的goroutine。一旦goroutine被设置到运行状态(当然运行到nex <-i是会被阻塞), 匿名函数之后的语句会立即执行,返回一个channel给它的调用者。
Select Statements
Go的Select有如下的语法:
select {
case sendOrReceive1: block1
...
case sendOrReceiveN: blockN
default: blockD
}
在一个select语句中,Go会按照从上到下的顺序,对每一个发送或者接收的语句进行求值。如果它们中有多个语句可以继续执行(i.e., 不在被阻塞), 则会任意继续执行。 如果没有一个可以继续(i.e., 所有的都被阻塞), 这里有两种情况,一种是有default, default分支会被执行,并且控制权交由select之后的语句。但如果没有defautl分支, select将继续阻塞,直到有一个case可以继续。
select 语句的逻辑如下。一个select语句,不带default分支,将被阻塞,并且只有当一个case发生(接收或者发送了数据),整个select才完成。一个带default的select不会被阻塞,而是立即执行。原因可能为一个case发生,或者没有case发生,而是执行default分支。
为了掌握这种语法,我们将看两个例子。第一个例子是虚构的,但很能说明select语句是如何工作的。第二个例子是现实中的使用。
channels := make([]chan bool, 6)
for i := range channels {
channels[i] = make(chan bool)
}
go func() {
for {
channels[rand.Intn(6)] <- true
}
}()
在上面的例子中,我们创建了6个channel,它们可以用来发送和接收Booleans. 然后我们创建了单个goroutine,它有一个无限循环,每一次循环都从channels数组中随机选择一个channel,用于发送true值。由于channels都是非缓冲的类型,所以整个goroutine会立即被阻塞。
for i := 0; i < 36; i++ {
var x int
select {
case <-channels[0]:
x = 1
case <-channels[1]:
x = 2
case <-channels[2]:
x = 3
case <-channels[3]:
x = 4
case <-channels[4]:
x = 5
case <-channels[5]:
x = 6
}
fmt.Printf("%d ", x)
}
fmt.Println()
//6 4 6 5 4 1 2 1 2 1 5 5 4 6 2 3 6 5 1 5 4 4 3 2 3 3 3 5 3 6 5 2 2 3 6 2
在这段代码中,我们使用了6个channels来模仿骰子。这个 select 语句等待channels数中的一个channel有发送数据,一旦有一个或者多个channels已经发送了数据,则有一个case会被选择。由于select在一个for循环中,所以这个select会执行多次。
现在让我们看一个真实的例子,假设我们想要在两个单独的数据集上执行相同昂贵的计算,并且计算将产生一系列的结果。以下是整个函数的框架。
func expensiveComputation(data Data, answer chan int, done chan bool) {
// setup ...
finished := false
for !finished {
// computation ...
answer <- result
}
done <- true
}
这个函数接收要处理的数据和两个channel. answer channel用来发送每次的结果给监控代码,而done channel用来通知监控器,整个计算完成。
const allDone = 2
doneCount := 0
answerα := make(chan int)
answerβ := make(chan int)
defer func() {
close(answerα)
close(answerβ)
}()
done := make(chan bool)
defer func() { close(done) }()
go expensiveComputation(data1, answerα, done)
go expensiveComputation(data2, answerβ, done)
for doneCount != allDone {
var which, result int
select {
case result = <-answerα:
which = 'α'
case result = <-answerβ:
which = 'β'
case <-done:
doneCount++
}
if which != 0 {
fmt.Printf("%c→%d ", which, result)
}
}
fmt.Println()
//α→3 β→3 α→0 β→9 α→0 β→2 α→9 β→3 α→6 β→1 α→0 β→8 α→8 β→5 α→0 β→0 α→3
我们在代码的开始创建了两个channel,answerα and answerβ,用于接受结果, 以及一个用于记录计算是完成的channel, done. 创建的匿名函数用于关闭这些channel. 下一步,我们开始expensive computations(各自的goroutines), 给这个函数传递各自的数据, 和各自的answer channel, 以及一个共享的data channel. 我们也可以给两个expenive computations传递一个answer channel, 但如果是这样,我们就不知道哪个数据集上产生的结果。 如果我们想要使用相同的channel,并且能够区分结果的起源,我们可以创建单个channel,用来发送和接收一个struct——比如type Answer struct {id, answer int}.
expensive computations开始时是被阻塞的,因为它们的channel是非缓冲的。for loop每一次遍历都会重置变量which和result. 并且阻塞的select语句可以执行任意的case, 只要这个case上的channel有数据。如果有一个answer接收到了,我们设置which用来标识产生结果的源,并且打印数据源和结果。如果done channel已经好了,我们增加doneCount——当它达到我们要计算的数字时,结束整个for循环。