并发编程
字数
1339 字
阅读时间
7 分钟
用法
Goroutines
go
func say(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func main() {
go say("world")
say("hello")
}注意并发读写竞争
- 下面的例子会出现竞争问题,因为多个 goroutine 同时读写 counter 变量。
go
package main
import "time"
const (
FISRT = "Mummy"
SECOND = "code"
)
func main() {
var a string
go func() {
i := 1
for {
i = 1 - i
if i == 0 {
a = FISRT
} else {
a = SECOND
}
time.Sleep(10)
}
}()
for {
if a == "Mumm" {
panic("a 丢失了字符?")
}
time.Sleep(10)
}
}这里之所以会 panic 是因为字符串结构如下:
go
sturct {
str uintptr
len int
}当协程里修改了 str 指针指向了新的字符串"Mummy",但长度还没来得及修改,仍然是4,此时另一个协程读出来的就是 "Mumm"。
使用 go run -race main.go 可以检测到竞争问题。
64位操作系统中,字节型、布尔型、整型、浮点型、字符型的位宽不会超过 64 位,在 64 位的指令集架构中可以由一条机器指令完成,不存在被细分为更小的操作单位,所以这些类型的并发赋值是安全的,不会发生写一半的问题。
可以改写为使用 Mutex 互斥锁来保护共享资源的并发访问(见下文)。
- 另一个例子是引用的库提供的函数需要放在 init 里来做初始化,因为 init 是串行执行的,这个函数是并发不安全的。如果没有放在 init 里,就可能导致数据竞争。
Channels
1. 基本使用
go
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // send sum to c
}
func main() {
s := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
x, y := <-c, <-c // receive from c
fmt.Println(x, y, x+y)
}2. 有缓冲的 channel
go
ch := make(chan int, 100)
ch <- 1
ch <- 2
fmt.Println(<-ch)
fmt.Println(<-ch)1. 内存泄漏
go
func abandonedWorker(ch chan string) {
// 由于chan没有关闭,实际上这里会不断去读,没数据时会阻塞等待
for data := range ch {
...
}
fmt.Println("Worker is done, shutting down")
}
func handler(inputData []string) {
ch := make(chan string, len(inputData))
for _, data := range inputData {
ch <- data
}
go abandonedWorker(ch)
}应该使用 close 及时关闭:
go
func abandonedReceiver(ch chan int) {
// channel 一关闭,这里就不会阻塞,拿到 0 值
data := <- ch
fmt.Println(data)
}
func handler() {
ch := make(chan int)
// Defer the CLOSING of channel
defer close(ch)
go abandonedReceiver(ch)
}2. 子协程的控制
子协程建议使用 ctx 来控制。
go
func say(ctx context.Context) {
ch := make(chan string)
// sender 里面会一直往 ch 里写数据,拿到 ctx.Done() 也需要退出
go sender(ctx, ch)
for {
select {
// 如果 ctx 在上层被取消了,就退出
case <-ctx.Done():
return
case s := <-ch:
fmt.Println(s)
default:
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
}Range and Close
go
func main() {
c := make(chan int, 10)
go fibonacci(cap(c), c)
// 可以用 range 遍历 channel,直到 channel 关闭。
for i := range c {
fmt.Println(i)
}
}
func fibonacci(n int, c chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
// 关闭 channel
close(c)
}使用 Select
go
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}使用 sync.Mutex 互斥锁
go
// 需要使用互斥锁来保护 map 的并发访问。
type SafeCounter struct {
mu sync.RWMutex
v map[string]int
}
// 写锁保证同一时间只有一个 goroutine 可以访问 map。
func (c *SafeCounter) Inc(key string) {
c.mu.Lock()
c.v[key]++
c.mu.Unlock()
}
// 读锁可以同时被多个 goroutine 访问。
func (c *SafeCounter) Value(key string) int {
c.mu.RLock()
// Lock so only one goroutine at a time can access the map c.v.
defer c.mu.RUnlock()
return c.v[key]
}
func main() {
c := SafeCounter{v: make(map[string]int)}
for i := 0; i < 1000; i++ {
go c.Inc("somekey")
}
time.Sleep(time.Second)
fmt.Println(c.Value("somekey"))
}底层设计
线程到协程
- 线程是操作系统内核调度的最小单位,是程序执行的基本单位。其创建、销毁、调度都需要操作系统内核参与。
- 协程是处于用户态的调度单位,由用户态完成调度,无需操作系统内核参与。
Goroutine 也就是 Go 语言的协程实现。
