Skip to content

并发编程

字数
1339 字
阅读时间
7 分钟

用法

Goroutines

go
func say(s string) {
    for i := 0; i < 5; i++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    } 
}

func main() {  
    go say("world")
    say("hello")
}

注意并发读写竞争

  1. 下面的例子会出现竞争问题,因为多个 goroutine 同时读写 counter 变量。
go
package main

import "time"

const (
	FISRT  = "Mummy"
	SECOND = "code"
)

func main() {
	var a string
	go func() {
		i := 1
		for {
			i = 1 - i
			if i == 0 {
				a = FISRT
			} else {
				a = SECOND
			}
			time.Sleep(10)
		}
	}()
	for {
		if a == "Mumm" {
			panic("a 丢失了字符?")
		}
		time.Sleep(10)
	}
}

这里之所以会 panic 是因为字符串结构如下:

go
sturct {
    str uintptr
    len int
}

当协程里修改了 str 指针指向了新的字符串"Mummy",但长度还没来得及修改,仍然是4,此时另一个协程读出来的就是 "Mumm"。

使用 go run -race main.go 可以检测到竞争问题。

64位操作系统中,字节型、布尔型、整型、浮点型、字符型的位宽不会超过 64 位,在 64 位的指令集架构中可以由一条机器指令完成,不存在被细分为更小的操作单位,所以这些类型的并发赋值是安全的,不会发生写一半的问题。

可以改写为使用 Mutex 互斥锁来保护共享资源的并发访问(见下文)。

  1. 另一个例子是引用的库提供的函数需要放在 init 里来做初始化,因为 init 是串行执行的,这个函数是并发不安全的。如果没有放在 init 里,就可能导致数据竞争。

Channels

1. 基本使用

go
func sum(s []int, c chan int) {
    sum := 0
    for _, v := range s {
        sum += v
    }
    c <- sum // send sum to c
}

func main() {
    s := []int{7, 2, 8, -9, 4, 0}

    c := make(chan int)
    go sum(s[:len(s)/2], c)
    go sum(s[len(s)/2:], c) 
    x, y := <-c, <-c // receive from c

    fmt.Println(x, y, x+y)
}

2. 有缓冲的 channel

go
ch := make(chan int, 100)
ch <- 1
ch <- 2
fmt.Println(<-ch)
fmt.Println(<-ch)

1. 内存泄漏

go
func abandonedWorker(ch chan string) {​
    // 由于chan没有关闭,实际上这里会不断去读,没数据时会阻塞等待​
    for data := range ch {
        ...
    }​
    fmt.Println("Worker is done, shutting down")​
}

func handler(inputData []string) {​
    ch := make(chan string, len(inputData))​
    for _, data := range inputData {​
        ch <- data
    }​
    go abandonedWorker(ch)​
}​​

应该使用 close 及时关闭:

go
func abandonedReceiver(ch chan int) {
    // channel 一关闭,这里就不会阻塞,拿到 0 值
    data := <- ch
    fmt.Println(data)
}​

func handler() {
    ch := make(chan int)
    // Defer the CLOSING of channel
    defer close(ch)​
    go abandonedReceiver(ch)
}

2. 子协程的控制

子协程建议使用 ctx 来控制。

go
func say(ctx context.Context) {
    ch := make(chan string)
    // sender 里面会一直往 ch 里写数据,拿到 ctx.Done() 也需要退出
    go sender(ctx, ch)
    for {
        select {
        // 如果 ctx 在上层被取消了,就退出
        case <-ctx.Done():
            return
        case s := <-ch:
            fmt.Println(s)
        default:
            time.Sleep(100 * time.Millisecond)
            fmt.Println(s)
        }
    }
}

Range and Close

go

func main() {
    c := make(chan int, 10)
    go fibonacci(cap(c), c)
   // 可以用 range 遍历 channel,直到 channel 关闭。
    for i := range c {
        fmt.Println(i)
    }
}

func fibonacci(n int, c chan int) {
    x, y := 0, 1
    for i := 0; i < n; i++ {
        c <- x
        x, y = y, x+y
    } 
    // 关闭 channel
    close(c)
}

使用 Select

go
func fibonacci(c, quit chan int) {
    x, y := 0, 1
    for {
        select {
        case c <- x:
            x, y = y, x+y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

func main() {
    c := make(chan int)
    quit := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        } 
        quit <- 0
    }()
    fibonacci(c, quit)
}

使用 sync.Mutex 互斥锁

go
// 需要使用互斥锁来保护 map 的并发访问。
type SafeCounter struct {
	mu sync.RWMutex
	v  map[string]int
}

// 写锁保证同一时间只有一个 goroutine 可以访问 map。
func (c *SafeCounter) Inc(key string) {
	c.mu.Lock()
	c.v[key]++
	c.mu.Unlock()
}

// 读锁可以同时被多个 goroutine 访问。
func (c *SafeCounter) Value(key string) int {
	c.mu.RLock()
	// Lock so only one goroutine at a time can access the map c.v.
	defer c.mu.RUnlock()
	return c.v[key]
}

func main() {
	c := SafeCounter{v: make(map[string]int)}
	for i := 0; i < 1000; i++ {
		go c.Inc("somekey")
	}

	time.Sleep(time.Second)
	fmt.Println(c.Value("somekey"))
}

底层设计

线程到协程

  • 线程是操作系统内核调度的最小单位,是程序执行的基本单位。其创建、销毁、调度都需要操作系统内核参与。
  • 协程是处于用户态的调度单位,由用户态完成调度,无需操作系统内核参与。

Goroutine 也就是 Go 语言的协程实现。

GPM 模型

贡献者

页面历史


总访问量 次, 访客数 人次