Go进阶之路——并发

goroutine 并发指的是多个任务被(一个)cpu 轮流切换执行,在 Go 语言里面主要用 goroutine (协程)来实现并发,类似于其他语言中的线程(绿色线程),但与线程本质上是有区别的。

操作系统线程(Native Thread)的意思就是,程序里面的线程会真正映射到操作系统的线程,线程的运行和调度都是由操作系统控制的
绿色线程(Green Thread)的意思是,程序里面的线程不会真正映射到操作系统的线程,而是由语言运行平台自身来调度。
goroutine 是由 Go 运行时环境管理的轻量级线程。
语法
go f(x, y, z)

开启一个新的 goroutine 执行
f(x, y, z)

f , x , yz 是当前 goroutine 中定义的,但是在新的 goroutine 中运行 `f`。
goroutine 在相同的地址空间中运行,因此访问共享内存必须进行同步。sync 提供了这种可能,不过在 Go 中并不经常用到,因为有其他的办法。
例子1
package mainimport ( "fmt" "time" )func say(s string) { for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) fmt.Println(s) } }func main() { go say("world") say("hello") }

例子2
package mainimport ( "log" "time" )func doSomething(id int) { log.Printf("before do job:(%d) \n", id) time.Sleep(3 * time.Second) log.Printf("after do job:(%d) \n", id) }func main() { go doSomething(1) go doSomething(2) go doSomething(3) }

当运行代码的时候,会发现没有任何输出。
这是因为程序启动时,其主函数即在一个单独的 goroutine 中运行,go语句会使其语句中的函数在一个新创建的goroutine中运行,而go语句本身会迅速地完成。主函数返回时,所有的goroutine都会被直接打断。 main() 执行完毕后,其他三个 goroutine 还没开始执行,所以就无法看到输出结果。
【Go进阶之路——并发】为了看到输出结果,我们可以使用 time.Sleep() 方法让 main() 函数延迟结束。
例如:
package mainimport ( "log" "time" )func doSomething(id int) { log.Printf("before do job:(%d) \n", id) time.Sleep(3 * time.Second) log.Printf("after do job:(%d) \n", id) }func main() { go doSomething(1) go doSomething(2) go doSomething(3) time.Sleep(3 * time.Second) }

输出结果:
Go进阶之路——并发
文章图片

package mainimport ( "log" "time" )func doSomething(id int) { log.Printf("before do job:(%d) \n", id) time.Sleep(3 * time.Second) log.Printf("after do job:(%d) \n", id) }func main() { go doSomething(1) go doSomething(2) go doSomething(3) time.Sleep(4 * time.Second) }

输出结果:
Go进阶之路——并发
文章图片

使用 sync.WaitGroup 实现同步
跟java的CountdownLatch差不多,也是阻塞等待所有任务完成之后再继续执行。
简单使用就是在创建一个任务的时候wg.Add(1), 任务完成的时候使用wg.Done()来将任务减一。使用wg.Wait()来阻塞等待所有任务完成。
上面例子中,其实我们还可以使用 sync.WaitGroup 来等待所有的 goroutine 结束,从而实现并发的同步,这比使用 time.Sleep() 更加优雅,例如:
package mainimport ( "log" "sync" "time" )func doSomething(id int, wg *sync.WaitGroup) { defer wg.Done() log.Printf("before do job:(%d) \n", id) time.Sleep(3 * time.Second) log.Printf("after do job:(%d) \n", id) }func main() { var wg sync.WaitGroup wg.Add(3) go doSomething(1, &wg) go doSomething(2, &wg) go doSomething(3, &wg) wg.Wait() log.Printf("finish all jobs\n") }

输出结果:
Go进阶之路——并发
文章图片

注意:wg不是对象(与java区别开),在go中传入函数的参数如果不是地址,则会进行拷贝,故参数wg *sync.WaitGroup传入的是指针。
例子3
package mainimport ( "fmt" "time" )func main() { for i := 0; i < 3; i++ { go func() { fmt.Println(i) }() }time.Sleep(1 * time.Second) }

输出结果:
Go进阶之路——并发
文章图片

其实我们期望的结果是打印 0 1 2 三个数字,但实际输出结果却和我们预期不一致,这是因为:
  1. 所有 goroutine 代码片段中的 i 是同一个变量,待循环结束的时候,它的值为 3
  2. 新创建 goroutine 的执行顺序是随机的,由于没有过多的约束,其在for循环过程中的执行时间也是随机的。其打印出的结果就是当时的 i 值。
修改后:
package mainimport ( "fmt" "time" )func main() { for i := 0; i < 3; i++ { go func(v int) { fmt.Println(v) }(i) } time.Sleep(1 * time.Second) }

输出结果:
Go进阶之路——并发
文章图片

通过方法传参的方式,将 i 的值拷贝到新的变量 v 中,而在每一个 goroutine 都对应了一个属于自己作用域的 v 变量, 所以最终打印结果为随机的 0,1,2
channel goroutine 是 Go 中实现并发的重要机制,channel 是 goroutine 之间进行通信的重要桥梁。
channel 是有类型的管道,可以用 channel 操作符 <- 对其发送或者接收值。
ch <- v// 将 v 送入 channel ch。 v := <-ch// 从 ch 接收,并且赋值给 v。 <- ch// channel 发送数据,忽略接受者

(“箭头”就是数据流的方向。)
可以用 var 声明 channel, 如下:
var ch chan int

channel 使用前必须创建:
ch := make(chan int) // 注意: channel 必须定义其传递的数据类型

和 map 与 slice 类似,channel也是一个对应make创建的底层数据结构的引用。当我们复制一个channel或用于函数传参时,都只是拷贝了一个channel对象。和其它的引用类型一样,channel的零值也是nil。
默认情况下,在另一端准备好之前,发送和接收都会阻塞。这使得 goroutine 可以在没有明确的锁或静态变量的情况下进行同步。
channel buffer
上文提到,可以通过 make(chan int) 创建channel,此类 channel 称之为非缓冲通道。事实上 channel 可以定义缓冲大小,如下:
chInt := make(chan int)// unbuffered channel非缓冲通道 chBool := make(chan bool, 0)// unbuffered channel非缓冲通道 chStr := make(chan string, 2) // bufferd channel缓冲通道

需要注意的是,程序中必须同时有不同的 goroutine 对非缓冲通道进行发送和接收操作,否则会造成阻塞。
向缓冲 channel 发送数据的时候,只有在缓冲区满的时候才会阻塞。当缓冲区清空的时候接收阻塞。
以下是一个错误的使用示例:
func main() { ch := make(chan string)ch <- "ping"fmt.Println(<-ch) }

这一段代码运行后提示错误: fatal error: all goroutines are asleep - deadlock!
因为 main 函数是一个 goroutine, 在这一个 goroutine 中发送了数据给非缓冲通道,但是却没有另外一个 goroutine 从非缓冲通道中里读取数据, 所以造成了阻塞或者称为死锁。
在以上代码中添加一个 goroutine 从非缓冲通道中读取数据,程序就可以正常工作。如下所示:
func main() { ch := make(chan string)go func() { ch <- "ping" }()fmt.Println(<-ch) }

与非缓冲通道不同,缓冲通道可以在同一个 goroutine 内接收容量范围内的数据,即便没有另外的 goroutine 进行读取操作,如下代码可以正常执行:
func main() { ch := make(chan int, 2)ch <- 1ch <- 2 }

向缓冲channel的发送操作就是向内部缓存队列的尾部插入元素,接收操作则是从队列的头部删除元素。
package mainimport "fmt"func sum(a []int, c chan int) { sum := 0 for _, v := range a { sum += v } c <- sum // 将和送入 c }func main() { a := []int{7, 2, 8, -9, 4, 0} c := make(chan int) go sum(a[:len(a)/2], c) go sum(a[len(a)/2:], c) x, y := <-c, <-c // 从 c 中获取 fmt.Println(x, y, x+y) }

输出结果:
Go进阶之路——并发
文章图片

channel 遍历和关闭
close() 函数可以用于关闭 channel,关闭后的 channel 中如果有缓冲数据,依然可以接收到之前已经成功发送的数据;如果channel中已经没有数据的话将产生一个零值的数据。但是无法再发送数据给已经关闭的channel,继续发送将导致panic异常。
package mainimport "fmt"func main() { ch := make(chan int, 10) for i := 0; i < 10; i++ { ch <- i } close(ch) //ch <- 1 res := 0 for v := range ch { res += v } fmt.Println(res) l := <- ch fmt.Println(l) }

输出结果:
Go进阶之路——并发
文章图片

select 语句
select 专门用于通道发送和接收操作,看起来和 switch 很相似,但是进行选择和判断的方法完全不同。
select 语句使得一个 goroutine 在多个通讯操作上等待。
select 阻塞,直到条件分支中的某个可以继续执行,这时就会执行那个条件分支。当多个都准备好的时候,会随机选择一个。
在下述例子中,通过 select 的使用,保证了 worker 中的事务可以执行完毕后才退出 main 函数。
package mainimport ( "fmt" "time" )func strWorker(ch chan string) { time.Sleep(1 * time.Second) fmt.Println("do something with strWorker...") ch <- "str" }func intWorker(ch chan int) { //time.Sleep(1 * time.Second) fmt.Println("do something with intWorker...") ch <- 1 }func main() { chStr := make(chan string) chInt := make(chan int) go strWorker(chStr) go intWorker(chInt) for i := 0; i < 2; i++ { select { case <-chStr: fmt.Println("get value from strWorker")case <-chInt: fmt.Println("get value from intWorker")} } }

输出结果:
Go进阶之路——并发
文章图片

select 中的其他条件分支都没有准备好的时候,`default` 分支会被执行。
为了非阻塞的发送或者接收,可使用 default 分支:
select { case i := <-c: // 使用 i default: // 从 c 读取会阻塞 }

通过 channel 实现同步机制
一个经典的例子如下,main 函数中起了一个 goroutine,通过非缓冲队列的使用,能够保证在 goroutine 执行结束之前 main 函数不会提前退出。
package mainimport ( "fmt" )func worker(done chan bool){ fmt.Println("start working...") done <- true fmt.Println("end working...") }func main() { done := make(chan bool, 1) go worker(done) <- done }

输出结果:
Go进阶之路——并发
文章图片




    推荐阅读