# golang的并发处理
# 1、在 Go 中,独立的任务叫做 goroutine
goroutine 与其它语言的进程、线程相似,但 goroutine 和它们并不完全相同
在Java中,将顺序式代码转化为并发式代码需要做大量修改 在 Go 里,无需修改现有顺序式的代码,就可以通过 goroutine 以并发的方式运行任意数量的任务 只需要加个go关键字即可 表面上看,goroutine 似乎在同时运行,但由于计算机处理单元有限,其实技术上来说,这些 goroutine 不是真的在同时运行
# 如何处理goroutine之间的协同问题?
# 2、通道(channel)可以在多个 goroutine 之间传值
通道可以用作变量、函数参数、结构体字段… 创建通道用 make 函数,并指定其传输数据的类型 c := make(chan int) 使用左箭头操作符 <- 向通道发送值 或 从通道接收值 向通道发送值:c <- 99 从通道接收值:r := <- c 发送操作会等待直到另一个 goroutine 尝试对该通道进行接收操作为止。 执行发送操作的 goroutine 在等待期间将无法执行其它操作 未在等待通道操作的 goroutine 让然可以继续自由的运行 执行接收操作的 goroutine 将等待直到另一个 goroutine 尝试向该通道进行发送操作为止。
# 利用单通道:
通道是一种类似于队列的东西,在不同goroutine里取值传值,一旦通道关闭就不再可用
原理如图
package main
import (
"fmt"
"math/rand"
"time"
)
//main is the function where it all begins.
func main() {
c := make(chan int)
for i := 0; i < 5; i++ {
/**
相当于开了五个子线程去跑,三秒就可以出结果
*/
go sleepGroroutine(i, c)
}
for i := 0; i < 5; i++ {
getid := <-c
fmt.Printf("%v完毕", getid)
fmt.Println()
}
}
func sleepGroroutine(i int, c chan int) {
time.Sleep(3 * time.Second)
//fmt.Println("睡眠完成", i)
c <- i
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 利用select处理多通道
等待不同类型的值。 time.After 函数,返回一个通道,该通道在指定时间后会接收到一个值(发送该值的 goroutine 是 Go 运行时的一部分)。 select 和 switch 有点像。 该语句包含的每个 case 都持有一个通道,用来发送或接收数据。 select 会等待直到某个 case 分支的操作就绪,然后就会执行该 case 分支。 (例子 30.5、30.6)
【注】即使已经停止等待 goroutine,但只要 main 函数还没返回,仍在运行的 goroutine 将会继续占用内存。 【注】select 语句在不包含任何 case 的情况下将永远等下去。
下面是随机睡眠时间的代码:
func main() {
c := make(chan int)
for i := 0; i < 5; i++ {
/**
相当于开了五个子线程去跑,三秒就可以出结果
*/
go sleepGroroutine(i, c)
}
timeout := time.After(2 * time.Second)
for i := 0; i < 5; i++ {
select {
case getid := <-c:
fmt.Println("id成功返回:", getid)
case <-timeout:
fmt.Println("超时")
return
}
}
}
func sleepGroroutine(i int, c chan int) {
rand.Seed(time.Now().UnixNano())
time.Sleep(time.Duration(rand.Intn(3000)) * time.Millisecond)
//fmt.Println("睡眠完成", i)
c <- i
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 利用channel完成模拟流水线,上游下游goroutine
package main
import (
"fmt"
"strings"
)
func main() {
c0 := make(chan string)
c1 := make(chan string)
go sourceGopher(c0)
go filterDuplicates(c0, c1)
printGopher(c1)
}
//流水线源头
func sourceGopher(downstream chan string) {
for _, v := range []string{"a good apple", "a bad apple", "a normal apple"} {
downstream <- v
//把值丢给channel
}
//表示发完了
close(downstream)
}
//第二步流水线
func filterDuplicates(upstream, downstream chan string) {
//通道类似于队列,进出进出
//它是和第一步一块执行的,理想状态下不存在先后
//一直从upstream读取值,直到它关闭为止
for v := range upstream {
if !strings.Contains(v, "bad") {
downstream <- v
}
}
//下游发送完毕
close(downstream)
}
func printGopher(upstream chan string) {
for v := range upstream {
fmt.Println(v)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 3、Go 的互斥锁(mutex)
mutex = mutual exclusive Lock(),Unlock() sync 包 互斥锁定义在被保护的变量之上
模拟多个goroutine售票:
package main
import (
"fmt"
"sync"
"time"
)
//声明等待组变量
var wg sync.WaitGroup
//声明互斥锁
var mutex sync.Mutex
//声明票数
var ticket = 10
func main() {
//4个goroutine,模拟4个售票口
wg.Add(4)
go saleTickets("售票口1")
go saleTickets("售票口2")
go saleTickets("售票口3")
go saleTickets("售票口4")
wg.Wait()
}
func saleTickets(name string) {
defer wg.Done()
for {
mutex.Lock()
if ticket > 0 {
time.Sleep(200 * time.Millisecond)
fmt.Println(name, "售出1张,还剩:", ticket)
ticket--
} else {
mutex.Unlock()
fmt.Println(name, "售完,没有票了。。。")
break
}
mutex.Unlock()
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
效果如图
当把mutex全部去掉后
效果如图
# ** Go sync.WaitGroup的用法
经常会看到以下了代码:
package main
import (
"fmt"
"time"
)
func main(){
for i := 0; i < 100 ; i++{
go fmt.Println(i)
}
time.Sleep(time.Second)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
主线程为了等待goroutine都运行完毕,不得不在程序的末尾使用time.Sleep() 来睡眠一段时间,等待其他线程充分运行。对于简单的代码,100个for循环可以在1秒之内运行完毕,time.Sleep() 也可以达到想要的效果。
但是对于实际生活的大多数场景来说,1秒是不够的,并且大部分时候我们都无法预知for循环内代码运行时间的长短。这时候就不能使用time.Sleep() 来完成等待操作了。
可以考虑使用管道来完成上述操作:
func main() {
c := make(chan bool, 100)
for i := 0; i < 100; i++ {
go func(i int) {
fmt.Println(i)
c <- true
}(i)
}
for i := 0; i < 100; i++ {
<-c
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
首先可以肯定的是使用管道是能达到我们的目的的,而且不但能达到目的,还能十分完美的达到目的。
但是管道在这里显得有些大材小用,因为它被设计出来不仅仅只是在这里用作简单的同步处理,在这里使用管道实际上是不合适的。而且假设我们有一万、十万甚至更多的for循环,也要申请同样数量大小的管道出来,对内存也是不小的开销。
对于这种情况,go语言中有一个其他的工具sync.WaitGroup 能更加方便的帮助我们达到这个目的。
WaitGroup 对象内部有一个计数器,最初从0开始,它有三个方法:Add(), Done(), Wait() 用来控制计数器的数量。
Add(n) 把计数器设置为n Done() 每次把计数器-1 wait() 会阻塞代码的运行,直到计数器地值减为0
使用WaitGroup 将上述代码可以修改为:
func main() {
wg := sync.WaitGroup{}
wg.Add(100)
for i := 0; i < 100; i++ {
go func(i int) {
fmt.Println(i)
wg.Done()
}(i)
}
wg.Wait()
}
2
3
4
5
6
7
8
9
10
11
12
这里首先把wg 计数设置为100, 每个for循环运行完毕都把计数器减一,主函数中使用Wait() 一直阻塞,直到wg为零——也就是所有的100个for循环都运行完毕。相对于使用管道来说,WaitGroup 轻巧了许多。
注意事项
计数器不能为负值 我们不能使用Add() 给wg 设置一个负值,否则代码将会报错 同样使用Done() 也要特别注意不要把计数器设置成负数了。
WaitGroup对象不是一个引用类型 WaitGroup对象不是一个引用类型,在通过函数传值的时候需要使用地址