掘金 后端 ( ) • 2024-04-25 09:52

GO高性能编程之sync.Map和 sync.RWMutex+Map 应该如何选择

1、各自的特点

1、1 sync.Map 特点

-   适用于需要在多个 Goroutine 之间安全地共享并发访问的键值对集合。
-   使用 sync.Map 不需要额外的锁来保护并发访问,因为它内部已经实现了并发安全的机制。适用心智负担少.
-   写少读多场景: 当map主要用于读取且写入操作相对较少时,sync.Map 能够提供出色的并发性能。
-   长时间运行且有大量删除操作的应用可能会导致 sync.Map 内存占用持续增加,因为旧数据虽然被标记为 expunged,但并未立即释放。需定期调用 sync.Map 的 Range 方法触发清理。
-   不支持按序遍历,也不保证遍历期间的稳定性(即遍历过程中可能有其他 Goroutine 修改映射)。

1、2 sync.RWMutex+Map

-   适用于需要更精细的读写控制的情况,允许多个 Goroutine 同时读取数据,但只允许一个 Goroutine 写入数据。
-   使用 `sync.RWMutex` 可以提高读取操作的并发性能,因为多个 Goroutine 可以同时获得读锁。
-   适用于读写操作相对平衡的情况,如果写入操作频繁且耗时较长,可能会导致读取操作的性能下降。

2、talk is cheap 跑个Benchmark先

type MutexMap struct {
    lck sync.Mutex
    m   map[int]int
}

type RWMutexMap struct {
    lck sync.RWMutex
    m   map[int]int
}

var normalMap MutexMap
var syncMap sync.Map
var rwMutexMap RWMutexMap

func TestMain(m *testing.M) {
    normalMap = MutexMap{
       lck: sync.Mutex{},
       m:   make(map[int]int, 100000),
    }

    m.Run()
}

func BenchmarkLockMapWrite(b *testing.B) {
    b.RunParallel(func(pb *testing.PB) {
       for pb.Next() {
          a := rand.Intn(100) + 1
          ra := rand.Intn(a)
          normalMap.lck.Lock()
          normalMap.m[a] = ra
          normalMap.lck.Unlock()
       }
    })
}

func BenchmarkLockMapRead(b *testing.B) {
    b.RunParallel(func(pb *testing.PB) {
       for pb.Next() {
          a := rand.Intn(100) + 1
          normalMap.lck.Lock()
          _, _ = normalMap.m[a]
          normalMap.lck.Unlock()
       }
    })
}

func BenchmarkRWMutexWrite(b *testing.B) {

    b.RunParallel(func(pb *testing.PB) {
       for pb.Next() {
          a := rand.Intn(100) + 1
          ra := rand.Intn(a)
          rwMutexMap.lck.Lock()
          normalMap.m[a] = ra
          rwMutexMap.lck.Unlock()
       }
    })
}

func BenchmarkRWMutexMapRead(b *testing.B) {
    b.RunParallel(func(pb *testing.PB) {
       for pb.Next() {
          a := rand.Intn(100) + 1
          rwMutexMap.lck.RLock()
          _, _ = rwMutexMap.m[a]
          rwMutexMap.lck.RUnlock()
       }
    })
}

func BenchmarkSyncMapWrite(b *testing.B) {

    b.RunParallel(func(pb *testing.PB) {
       for pb.Next() {
          a := rand.Intn(100) + 1
          ra := rand.Intn(a)
          syncMap.Store(a, ra)
       }
    })
}

func BenchmarkSyncMapRead(b *testing.B) {
    b.RunParallel(func(pb *testing.PB) {
       for pb.Next() {
          a := rand.Intn(100) + 1
          syncMap.Load(a)
       }
    })
}
func 单次耗时 BenchmarkLockMapWrite-10 193.7 ns/op BenchmarkLockMapRead-10 118.9 ns/op BenchmarkRWMutexWrite-10 202.8 ns/op BenchmarkRWMutexMapRead-10 113.0 ns/op BenchmarkSyncMapWrite-10 217.4 ns/op BenchmarkSyncMapRead-10 5.146 ns/op

可以看出来,写操作耗时大家都是差不多的,但是只读操作sync.Map快了一个数量级.这肯定是有原因的.

3、 解析sync.map

这里将sync.Map的核心的几个func拿了出来,加上了注释和传统的Map+Mutex比较,主要是通过以下方式提高性能

  • 分离读写路径: sync.Map 使用两个内部数据结构:readOnly(只读map)和 dirty(脏map)。大部分读操作仅访问 readOnly map,无需加锁。只有在 readOnly map中找不到元素或者元素已标记为删除时,才尝试加锁访问 dirty map。这种设计极大地减少了读操作对锁的依赖,提高了并发读取性能。

  • 延迟更新: 当 readOnly map中的元素被修改时,不会立即更新 readOnly,而是将更改记录在 dirty map中。只有当 readOnly 中的缺失次数(misses)达到一定阈值时,才会将 dirty 提升为新的 readOnly。这种策略减少了对 readOnly map的频繁更新,降低了锁竞争。

  • 原子操作: 对于值的读取和某些简单更新(如 LoadAndDelete),sync.Map 利用原子操作(如 atomic.CompareAndSwapPointer 和 sync/atomic.Value)在不需要锁的情况下安全地操作数据。这进一步减少了锁的使用,提高了并发性能。

  • 无锁读: 对于大部分读操作(如 Load),sync.Map 完全避免了锁的使用。读取操作仅需访问 readOnly ma p,而该map本身是不可变的,因此可以安全地并发访问。 优化的删除逻辑: sync.Map 不直接从map中移除键值对,而是将值标记为 expunged。在后续读取时,会跳过这些已标记的条目。这种做法避免了在高并发场景下因频繁删除导致的锁竞争问题。

  • 避免不必要的内存分配: 在 Store 和 LoadOrStore 操作中,sync.Map 尽可能复用已存在的 entry 对象,减少内存分配。这有助于降低 GC 压力,特别是在大量写入操作的场景下。

  • 优化的删除逻辑: sync.Map 不直接从映射中移除键值对,而是将值标记为 expunged。在后续读取时,会跳过这些已标记的条目。这种做法避免了在高并发场景下因频繁删除导致的锁竞争问题。


type Map struct {
    mu Mutex

    // 一个只读的map,用于存储已稳定(不频繁变动)的数据,提供快速的读取操作。
    read atomic.Pointer[readOnly]
    // 一个常规的 map[K]entry,用于存储新插入或待更新的数据。当 readOnly 中的数据被修改时,这些修改会被同步到 dirty 中
    dirty map[any]*entry
    // 记录从 readOnly 中未找到键值对的次数,用于决定何时将 dirty 提升为新的 readOnly。
    misses int
}

//首先尝试从 readOnly 中读取,若未找到或遇到已删除的标记(expunged),则尝试从 dirty 中查找。若 dirty 中找到,则更新 misses 计数。
func (m *Map) Load(key any) (value any, ok bool) {
    //先拿只读的部分
    read := m.loadReadOnly()
    e, ok := read.m[key]
    if !ok && read.amended {
       //只读部分没有拿到
       m.mu.Lock()
       // double check
       read = m.loadReadOnly()
       e, ok = read.m[key]
       if !ok && read.amended {
          //去dirty map 里面读取
          e, ok = m.dirty[key]
          // 更新misses计数器 ,dirtyMap 覆盖read用
          m.missLocked()
       }
       m.mu.Unlock()
    }
    if !ok {
       return nil, false
    }
    // 读取到了,用的atomic
    return e.load()
}

func (m *Map) Store(key, value any) {
    _, _ = m.Swap(key, value)
}

func (m *Map) Swap(key, value any) (previous any, loaded bool) {
    // 先拿只读的部分
    read := m.loadReadOnly()
    if e, ok := read.m[key]; ok {
       // 如果在只读里面
       // 尝试直接修改指针对象的值
       // 成功返回true
       // 失败返回false
       if v, ok := e.trySwap(&value); ok {
          if v == nil {
             return nil, false
          }
          return *v, true
       }
    }

    m.mu.Lock()
    read = m.loadReadOnly()
    if e, ok := read.m[key]; ok {
       // 如果在只读里面 将只读的对应value 置为nil
       if e.unexpungeLocked() {
          更新dirty map
          m.dirty[key] = e
       }
       // 获得旧的数据
       if v := e.swapLocked(&value); v != nil {
          loaded = true
          previous = *v
       }
    } else if e, ok := m.dirty[key]; ok {
       //不在只读map,
       //如果在dirty map 里面
       // 尝试直接交换指针对象的值
       if v := e.swapLocked(&value); v != nil {
          loaded = true
          previous = *v
       }
    } else {
       //如果在dirty map 里面 也不存在
       if !read.amended {
          // 更新只读
          m.dirtyLocked()
          m.read.Store(&readOnly{m: read.m, amended: true})
       }
       //更新dirty map
       m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
    return previous, loaded
}

func (m *Map) missLocked() {
    m.misses++
    // 在只读map 里面misses 超过dirty map的长度的时候
    // 这个 (m.dirty) 我感觉是一个经验值
    if m.misses < len(m.dirty) {
       return
    }
    // 更新只读
    m.read.Store(&readOnly{m: m.dirty})
    // 置空dirty map
    m.dirty = nil
    m.misses = 0
}

//不直接从map中移除键值对,而是将值标记为 expunged。在后续读取时,会跳过这些已标记的条目。
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
    read := m.loadReadOnly()
    e, ok := read.m[key]
    if !ok && read.amended {
       m.mu.Lock()
       read = m.loadReadOnly()
       e, ok = read.m[key]
       if !ok && read.amended {
          e, ok = m.dirty[key]
          delete(m.dirty, key)
          // Regardless of whether the entry was present, record a miss: this key
          // will take the slow path until the dirty map is promoted to the read
          // map.
          m.missLocked()
       }
       m.mu.Unlock()
    }
    if ok {
       // 假删
       return e.delete()
    }
    return nil, false
}

4、使用推荐

  • 存储复杂结构体,我建议使用rwmutex+map,这样更加灵活;
  • 如果是简单结构类型,也就是key和value 都是基础类型的map,我倾向选择sync.map这样性能要好很多
  • sync.map的创始人自己很少用这个,在业务开发用到的也比较少. 毕竟哪里来的这么多简单结构的业务并发数据