有时候我们的代码中可能会存在多个 goroutine 同时操作一个资源(临界区)的情况,这种情况下就会发生竞态问题(数据竞态race condition)。

原则:对一个共享资源的读和写操作必须是原子化的——同一时刻只能有一个 goroutine 对共享资源进行读和写操作。

互斥锁:

Go 语言中使用sync包中提供的Mutex类型来实现互斥锁。

func (m *Mutex) Lock() 获取互斥锁
func (m *Mutex) Unlock() 释放互斥锁

使用互斥锁能够保证同一时间有且只有一个 goroutine 进入临界区,其他的 goroutine 则在等待锁;当互斥锁释放后,等待的 goroutine 才可以获取锁进入临界区,多个 goroutine 同时等待一个锁时,唤醒的策略是随机的。

锁的两种模式

互斥锁在设计上主要有两种模式: 正常模式和饥饿模式。

之所以引入了饥饿模式,是为了保证goroutine获取互斥锁的公平性。所谓公平性,其实就是多个goroutine在获取锁时,goroutine获取锁的顺序,和请求锁的顺序一致,则为公平。

正常模式下,所有阻塞在等待队列中的goroutine会按顺序进行锁获取,当唤醒一个等待队列中的goroutine时,此goroutine并不会直接获取到锁,而是会和新请求锁的goroutine竞争。 通常新请求锁的goroutine更容易获取锁,这是因为新请求锁的goroutine正在占用cpu片执行,大概率可以直接执行到获取到锁的逻辑。

饥饿模式下, 新请求锁的goroutine不会进行锁获取,而是加入到队列尾部阻塞等待获取锁。

饥饿模式的触发条件:
  • 当一个goroutine等待锁的时间超过1ms时,互斥锁会切换到饥饿模式
饥饿模式的取消条件:
  • 当获取到锁的这个goroutine是等待锁队列中的最后一个goroutine,互斥锁会切换到正常模式
  • 当获取到锁的这个goroutine的等待时间在1ms之内,互斥锁会切换到正常模式

sync.Mutex的数据结构

Go中的sync.Mutex的结构体为:

type Mutex struct {
	state int32
	sema  uint32
}

Sync.Mutex由两个字段构成,state用来表示当前互斥锁处于的状态,sema用于控制锁状态的信号量。相信各位道友读完这两个字段的描述后,好像懂了,又好像没懂。下面我们详细理解下这两个字段到底都作了哪些事。

互斥锁state主要记录了如下四种状态:

![image-20240320224646795](C:\Users\GEORGE DING\AppData\Roaming\Typora\typora-user-images\image-20240320224646795.png)

waiter_num: 记录了当前等待抢这个锁的goroutine数量

starving: 当前锁是否处于饥饿状态 (后文会详解锁的饥饿状态) 0: 正常状态 1: 饥饿状态

woken: 当前锁是否有goroutine已被唤醒。 0:没有goroutine被唤醒; 1: 有goroutine正在加锁过程

locked: 当前锁是否被goroutine持有。 0: 未被持有 1: 已被持有

sema信号量的作用:

当持有锁的gorouine释放锁后,会释放sema信号量,这个信号量会唤醒之前抢锁阻塞的gorouine来获取锁。

To know much more, here (Mutex底层结构)!


读写互斥锁:

互斥锁是完全互斥的,但是实际上有很多场景是读多写少的,当我们并发的去读取一个资源而不涉及资源修改的时候是没有必要加互斥锁的,这种场景下使用读写锁是更好的一种选择。读写锁在 Go 语言中使用sync包中的RWMutex类型。

方法名 功能
func (rw *RWMutex) Lock() 获取写锁
func (rw *RWMutex) Unlock() 释放写锁
func (rw *RWMutex) RLock() 获取读锁
func (rw *RWMutex) RUnlock() 释放读锁
func (rw *RWMutex) RLocker() Locker 返回一个实现Locker接口的读写锁

当一个 goroutine 获取到读锁之后,其他的 goroutine 如果是获取读锁会继续获得锁,如果是获取写锁就会等待;而当一个 goroutine 获取写锁之后,其他的 goroutine 无论是获取读锁还是写锁都会等待。

即:读锁可读不可写,写锁读写均不可。


sync.WaitGroup

方法名 功能
func (wg * WaitGroup) Add(delta int) 计数器+delta
(wg *WaitGroup) Done() 计数器-1
(wg *WaitGroup) Wait() 阻塞直到计数器变为0

sync.WaitGroup 内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了 N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用 Done 方法将计数器减1。通过调用 Wait 来等待并发任务执行完,当计数器值为 0 时,表示所有并发任务已经完成。


sync.Once

在某些场景下我们需要确保某些操作即使在高并发的场景下也只会被执行一次,例如只加载一次配置文件等。

Go语言中的sync包中提供了一个针对只执行一次场景的解决方案——sync.Oncesync.Once只有一个Do方法,其签名如下:

func (o *Once) Do(f func())

sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次,源码如下:

func (o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    if o.done.Load() == 0 {
       defer o.done.Store(1)
       f()
    }
}

sync.Map

先来看一下源码注释罢:

// The Map type is optimized for two common use cases: (1) when the entry for a given
// key is only ever written once but read many times, as in caches that only grow,
// or (2) when multiple goroutines read, write, and overwrite entries for disjoint
// sets of keys. In these two cases, use of a Map may significantly reduce lock
// contention compared to a Go map paired with a separate Mutex or RWMutex.

使用方法:

func main()  {
    var m sync.Map
    // 1. 写入
    m.Store("qwe", 18)
    m.Store("asd", 20)

    // 2. 读取
    age, _ := m.Load("qwe")
    fmt.Println(age.(int))

    // 3. 遍历
    m.Range(func(key, value interface{}) bool {
        name := key.(string)
        age := value.(int)
        fmt.Println(name, age)
        return true
    })

    // 4. 删除
    m.Delete("qwe")
    age, ok := m.Load("qwe")
    fmt.Println(age, ok)

    // 5. 读取或写入,这个 key 已经存在,因此写入不成功,并且读出原值。
    m.LoadOrStore("asd", 100)
    age, _ = m.Load("asd")
    fmt.Println(age)
}
方法名 功能
func (m *Map) Store(key, value interface{}) 存储key-value数据
func (m *Map) Load(key interface{}) (value interface{}, ok bool) 查询key对应的value
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) 查询或存储key对应的value
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) 查询并删除key
func (m *Map) Delete(key interface{}) 删除key
func (m *Map) Range(f func(key, value interface{}) bool) 对map中的每个key-value依次调用f

浅浅看一下源码罢:

type Map struct {
    mu Mutex
    
     // readOnly,`read` 是 atomic.Value 类型,可以并发地读。但如果需要更新 `read`,则需要加锁保护。对于 read 中存储的 entry 字段,可能会被并发地 CAS 更新。但是如果要更新一个之前已被删除的 entry,则需要先将其状态从 expunged 改为 nil,再拷贝到 dirty 中,然后再更新。
    read atomic.Value
    
    //`dirty` 是一个非线程安全的原始 map。包含新写入的 key,并且包含 `read` 中的所有未被删除的 key。这样,可以快速地将 `dirty` 提升为 `read` 对外提供服务。如果 `dirty` 为 nil,那么下一次写入时,会新建一个新的 `dirty`,这个初始的 `dirty` 是 `read` 的一个拷贝,但除掉了其中已被删除的 key。
    dirty map[interface{}]*entry 
    
    //每当从 read 中读取失败,都会将 `misses` 的计数值加 1,当加到一定阈值以后,需要将 dirty 提升为 read,以期减少 miss 的情形。
    misses int
}

sync.map 适用于读多写少的场景。对于写多的场景,会导致 read map 缓存失效,需要加锁,导致冲突变多;而且由于未命中 read map 次数过多,导致 dirty map 提升为 read map,这是一个 O(N) 的操作,会进一步降低性能。

总之:

  1. sync.map 是线程安全的,读取,插入,删除也都保持着常数级的时间复杂度。
  2. 通过读写分离,降低锁时间来提高效率,适用于读多写少的场景。
  3. Range 操作需要提供一个函数,参数是 k,v,返回值是一个布尔值:f func(key, value interface{}) bool
  4. 调用 Load 或 LoadOrStore 函数时,如果在 read 中没有找到 key,则会将 misses 值原子地增加 1,当 misses 增加到和 dirty 的长度相等时,会将 dirty 提升为 read。以期减少“读 miss”。
  5. 新写入的 key 会保存到 dirty 中,如果这时 dirty 为 nil,就会先新创建一个 dirty,并将 read 中未被删除的元素拷贝到 dirty。
  6. 当 dirty 为 nil 的时候,read 就代表 map 所有的数据;当 dirty 不为 nil 的时候,dirty 才代表 map 所有的数据。