# golang的并发处理

# 1、在 Go 中,独立的任务叫做 goroutine

goroutine 与其它语言的进程、线程相似,但 goroutine 和它们并不完全相同

在Java中,将顺序式代码转化为并发式代码需要做大量修改 在 Go 里,无需修改现有顺序式的代码,就可以通过 goroutine 以并发的方式运行任意数量的任务 只需要加个go关键字即可 表面上看,goroutine 似乎在同时运行,但由于计算机处理单元有限,其实技术上来说,这些 goroutine 不是真的在同时运行

# 如何处理goroutine之间的协同问题?

# 2、通道(channel)可以在多个 goroutine 之间传值

通道可以用作变量、函数参数、结构体字段… 创建通道用 make 函数,并指定其传输数据的类型 c := make(chan int) 使用左箭头操作符 <- 向通道发送值 或 从通道接收值 向通道发送值:c <- 99 从通道接收值:r := <- c 发送操作会等待直到另一个 goroutine 尝试对该通道进行接收操作为止。 执行发送操作的 goroutine 在等待期间将无法执行其它操作 未在等待通道操作的 goroutine 让然可以继续自由的运行 执行接收操作的 goroutine 将等待直到另一个 goroutine 尝试向该通道进行发送操作为止。

# 利用单通道:

通道是一种类似于队列的东西,在不同goroutine里取值传值,一旦通道关闭就不再可用

原理如图

package main

import (
   "fmt"
   "math/rand"
   "time"
)

//main is the function where it all begins.
func main() {
   c := make(chan int)
   for i := 0; i < 5; i++ {
   	/**
   	相当于开了五个子线程去跑,三秒就可以出结果
   	*/
   	go sleepGroroutine(i, c)
   }
   for i := 0; i < 5; i++ {
   	getid := <-c
   	fmt.Printf("%v完毕", getid)
   	fmt.Println()
   }

}
func sleepGroroutine(i int, c chan int) {
   time.Sleep(3 * time.Second)
   //fmt.Println("睡眠完成", i)
   c <- i
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 利用select处理多通道

等待不同类型的值。 time.After 函数,返回一个通道,该通道在指定时间后会接收到一个值(发送该值的 goroutine 是 Go 运行时的一部分)。 select 和 switch 有点像。 该语句包含的每个 case 都持有一个通道,用来发送或接收数据。 select 会等待直到某个 case 分支的操作就绪,然后就会执行该 case 分支。 (例子 30.5、30.6)

【注】即使已经停止等待 goroutine,但只要 main 函数还没返回,仍在运行的 goroutine 将会继续占用内存。 【注】select 语句在不包含任何 case 的情况下将永远等下去。

下面是随机睡眠时间的代码:

func main() {
   c := make(chan int)
   for i := 0; i < 5; i++ {
   	/**
   	相当于开了五个子线程去跑,三秒就可以出结果
   	*/
   	go sleepGroroutine(i, c)
   }
   timeout := time.After(2 * time.Second)
   for i := 0; i < 5; i++ {
   	select {
   	case getid := <-c:
   		fmt.Println("id成功返回:", getid)
   	case <-timeout:
   		fmt.Println("超时")
   		return
   	}
   }
}
func sleepGroroutine(i int, c chan int) {
   rand.Seed(time.Now().UnixNano())
   time.Sleep(time.Duration(rand.Intn(3000)) * time.Millisecond)
   //fmt.Println("睡眠完成", i)
   c <- i
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 利用channel完成模拟流水线,上游下游goroutine

package main

import (
   "fmt"
   "strings"
)

func main() {
   c0 := make(chan string)
   c1 := make(chan string)
   go sourceGopher(c0)
   go filterDuplicates(c0, c1)
   printGopher(c1)
}

//流水线源头
func sourceGopher(downstream chan string) {
   for _, v := range []string{"a good apple", "a bad apple", "a normal apple"} {
   	downstream <- v
   	//把值丢给channel
   }
   //表示发完了
   close(downstream)
}

//第二步流水线
func filterDuplicates(upstream, downstream chan string) {
   //通道类似于队列,进出进出
   //它是和第一步一块执行的,理想状态下不存在先后
   //一直从upstream读取值,直到它关闭为止
   for v := range upstream {
   	if !strings.Contains(v, "bad") {
   		downstream <- v
   	}
   }
   //下游发送完毕
   close(downstream)
}

func printGopher(upstream chan string) {
   for v := range upstream {
   	fmt.Println(v)
   }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

# 3、Go 的互斥锁(mutex)

mutex = mutual exclusive Lock(),Unlock() sync 包 互斥锁定义在被保护的变量之上

模拟多个goroutine售票:

package main

import (
	"fmt"
	"sync"
	"time"
)

//声明等待组变量
var wg sync.WaitGroup

//声明互斥锁
var mutex sync.Mutex

//声明票数
var ticket = 10

func main() {
	//4个goroutine,模拟4个售票口
	wg.Add(4)
	go saleTickets("售票口1")
	go saleTickets("售票口2")
	go saleTickets("售票口3")
	go saleTickets("售票口4")
	wg.Wait()
}
func saleTickets(name string) {
	defer wg.Done()
	for {
		mutex.Lock()
		if ticket > 0 {
			time.Sleep(200 * time.Millisecond)
			fmt.Println(name, "售出1张,还剩:", ticket)
			ticket--
		} else {
			mutex.Unlock()
			fmt.Println(name, "售完,没有票了。。。")
			break
		}
		mutex.Unlock()
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

效果如图

当把mutex全部去掉后

效果如图

# ** Go sync.WaitGroup的用法

经常会看到以下了代码:

package main

import (
    "fmt"
    "time"
)

func main(){
    for i := 0; i < 100 ; i++{
        go fmt.Println(i)
    }
    time.Sleep(time.Second)
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14

主线程为了等待goroutine都运行完毕,不得不在程序的末尾使用time.Sleep() 来睡眠一段时间,等待其他线程充分运行。对于简单的代码,100个for循环可以在1秒之内运行完毕,time.Sleep() 也可以达到想要的效果。

但是对于实际生活的大多数场景来说,1秒是不够的,并且大部分时候我们都无法预知for循环内代码运行时间的长短。这时候就不能使用time.Sleep() 来完成等待操作了。

可以考虑使用管道来完成上述操作:

func main() {
    c := make(chan bool, 100)
    for i := 0; i < 100; i++ {
        go func(i int) {
            fmt.Println(i)
            c <- true
        }(i)
    }

    for i := 0; i < 100; i++ {
        <-c
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14

首先可以肯定的是使用管道是能达到我们的目的的,而且不但能达到目的,还能十分完美的达到目的。

但是管道在这里显得有些大材小用,因为它被设计出来不仅仅只是在这里用作简单的同步处理,在这里使用管道实际上是不合适的。而且假设我们有一万、十万甚至更多的for循环,也要申请同样数量大小的管道出来,对内存也是不小的开销。

对于这种情况,go语言中有一个其他的工具sync.WaitGroup 能更加方便的帮助我们达到这个目的。

WaitGroup 对象内部有一个计数器,最初从0开始,它有三个方法:Add(), Done(), Wait() 用来控制计数器的数量。

Add(n) 把计数器设置为n Done() 每次把计数器-1 wait() 会阻塞代码的运行,直到计数器地值减为0

使用WaitGroup 将上述代码可以修改为:


func main() {
    wg := sync.WaitGroup{}
    wg.Add(100)
    for i := 0; i < 100; i++ {
        go func(i int) {
            fmt.Println(i)
            wg.Done()
        }(i)
    }
    wg.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12

这里首先把wg 计数设置为100, 每个for循环运行完毕都把计数器减一,主函数中使用Wait() 一直阻塞,直到wg为零——也就是所有的100个for循环都运行完毕。相对于使用管道来说,WaitGroup 轻巧了许多。

注意事项

  1. 计数器不能为负值 我们不能使用Add() 给wg 设置一个负值,否则代码将会报错 同样使用Done() 也要特别注意不要把计数器设置成负数了。

  2. WaitGroup对象不是一个引用类型 WaitGroup对象不是一个引用类型,在通过函数传值的时候需要使用地址