Go并发编程实战课

Concurrent Concepts

  • 互斥锁 Mutex
    • go race detector (c++ sanitizers)
    • wrong usage cases
      • lock/unlock 非成对出现
      • Copy 已使用的 Mutex
      • 重入
      • 死锁
  • RWMutex
    • Write-preferring
      • 一个正在阻塞的 Lock 调用会排除新的 reader 请求到锁
    • noCopy
    • no-reentry
    • release non-locking RWMutex
  • WG
    • state, counter + waiter
    • sema (the waiting object)
    • frequent wrong usage
      • wg.Add(-1) -> trigger panic
      • refresh use after finishing one round of Add-Wait
  • Cond
    • Lock before Cond.Wait, and Unlock after usage
    • Waiter goroutine 被唤醒不等于等待条件已满足
  • Once
  • map
    • wrong usage of uninitialized map
    • wrong usage of concurrent read/write
    • sync.Map, RWMap
  • Pool
    • possibility of memory leak
      • we could modify the size of the array Get from the pool, when this array was put back to pool, the size remained, cause memory loss
      • valyala/bytebufferpool
  • Context
    • 放在第一个参数
    • 从不把 nil 当作 Context 类型的值
    • Context 只用来临时传递参数, 不可以持久化
    • 使用 WithValue 时, key 应该是自定义类型
  • atomic
    • CAS
  • channel
    • 共享资源的并发访问使用传统并发原语
    • 复杂的任务编排和消息传递使用 channel
    • 消息通知机制使用 channel, 如果只需要 signal 一个 goroutine, 可以使用 Cond
    • 简单等待所有任务完成使用 WaitGroup
    • 需要和 select 搭配, 使用 channel
    • 需要和超时搭配, 使用 channel 和 context
  • semaphore

Code Samples

Recursive Mutex

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import (
    "fmt"
    "runtime"
    "strings"
    "sync"
    "sync/atomic"
)

func getGID() int64 {
    b := make([]byte, 64)
    b = b[:runtime.Stack(b, false)]
    b = b[:strings.Index(string(b), " ")]
    b = b[10:]
    var gid int64
    fmt.Sscanf(string(b), "%d", &gid)
    return gid
}

type RecursiveMutex struct {
    sync.Mutex
    owner     int64
    recursion uint32
}

func (rm *RecursiveMutex) Lock() {
    gid := getGID()
    if atomic.LoadInt64(&rm.owner) == gid {
        rm.recursion++
        return
    }
    rm.Mutex.Lock()
    atomic.StoreInt64(&rm.owner, gid)
    rm.recursion = 1
}

func (rm *RecursiveMutex) UnLock() {
    gid := getGID()
    if atomic.LoadInt64(&rm.owner) != gid {
        panic(fmt.Sprintf("wrong owner(%d): %d!", rm.owner, gid))
    }
    rm.recursion--
    if rm.recursion != 0 {
        return
    }
    atomic.StoreInt64(&rm.owner, -1)
    rm.Mutex.Unlock()
}

type RWMutex struct {
    w           Mutex        // held if there are pending writers
    writerSem   uint32       // semaphore for writers to wait for completing readers
    readerSem   uint32       // semaphore for readers to wait for completing writers
    readerCount atomic.Int32 // number of pending readers
    readerWait  atomic.Int32 // number of departing readers
}

const rwmutexMaxReaders = 1 << 30

type WaitGroup struct {
    noCopy noCopy

    state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
    sema  uint32
}

//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
func (c *Cond) Wait() {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

func (o *Once) doSlow(f func()) {
    // use lock to prevent race
    o.m.Lock()
    defer o.m.Unlock()
    // final check
    if o.done.Load() == 0 {
        defer o.done.Store(1)
        f()
    }
}

channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
type Token struct{}

func newWorker(id int, ch chan Token, nextCh chan Token) {
    for {
        token := <-ch
        fmt.Printf("Worker %d\n", id)
        time.Sleep(1 * time.Second)
        nextCh <- token
    }
}

func Test_chan2() {
    num := 5
    chs := make([]chan Token, num)
    for i := 0; i < num; i++ {
        chs[i] = make(chan Token, 1)
    }
    for i := 0; i < num; i++ {
        go newWorker(i, chs[i], chs[(i+1)%num])
    }
    chs[0] <- Token{}
    select {}
}

type Mutex struct {
    ch chan struct{}
}

func NewMutex() *Mutex {
    mu := &Mutex{ch: make(chan struct{}, 1)}
    mu.ch <- struct{}{}
    return mu
}

func (mu *Mutex) Lock() {
    <-mu.ch
}

func (mu *Mutex) Unlock() {
    select {
    case mu.ch <- struct{}{}:
    default:
        panic("unlock of unlocked mutex")
    }
}

func (mu *Mutex) TryLock() bool {
    select {
    case <-mu.ch:
        return true
    default:
        return false
    }
}

func (mu *Mutex) LockTimeout(timeout time.Duration) bool {
    timer := time.NewTimer(timeout)
    select {
    case <-mu.ch:
        timer.Stop()
        return true
    case <-timer.C:
    }
    return false
}

func (mu *Mutex) IsLocked() bool {
    return len(mu.ch) == 0
}
Get Things Done
Built with Hugo
Theme Stack designed by Jimmy