掘金 后端 ( ) • 2024-04-03 10:51

go源码之锁sync.Mutex和sync.RWMutex - Jxy 博客

一、总结

sync.RWMutex

  • 写锁 需要阻塞 写锁:一个协程拥有 写锁 时,其他协程 写锁定 需要阻塞
  • 写锁 需要阻塞 读锁:一个协程拥有 写锁 时,其他协程 读锁定 需要阻塞
  • 读锁 需要阻塞 写锁:一个协程拥有 读锁 时,其他协程 写锁定 需要阻塞
  • 读锁 不能阻塞 读锁:一个协程拥有 读锁 时,其他协程也可以拥有 读锁
  • 写操作通过 readerCount 的操作来阻止读操作的

二、源码

(一)RWMutex数据结构

type RWMutex struct {
	w           Mutex  // 写锁
  writerSem   uint32 // 缓冲信号量,获取写锁的阻塞等待信号队列
	readerSem   uint32 // 缓冲信号量,获取读锁的阻塞等待信号队列
	readerCount int32  // 当前持有读锁的 goroutine 数量,负数表示有个写锁在执行
  readerWait  int32  // 获取写锁时,如果之前还有 readerWait 数量的读锁在执行,则需要等待执行完才能获取写锁
}

  • w

    写锁

  • writerSem

    缓冲信号量,当有goroutine获取写锁时,如果当前有读锁在占有,则调用 runtime_SemacquireMutex(&rw.writerSem, false, 0)

    将当前goroutine进行睡眠,并排队到 writerSem 队列的队尾,等待所有的读锁释放之后再调用runtime_Semrelease(&rw.writerSem, false, 1)进行唤醒

  • readerSem

    缓冲信号量,当有goroutine获取读锁时,如果当前有写锁在占有(readerCount),则调用 runtime_SemacquireMutex(&rw.readerSem, false, 0),将当前goroutine进行睡眠,并排队到 readerSem 队列的队尾,等待写锁释放之后再调用runtime_Semrelease(&rw.readerSem, false, 0)进行唤醒

  • readerCount

    当前持有读锁的goroutine数量,负数表示有个写锁在执行,在获取写锁时,会将readerCount-rwmutexMaxReaders变为负数

    写锁释放后readerCount会再+rwmutexMaxReaders变为正数

  • readerWait

    首先读锁是会阻塞写锁的获取的,当一个goroutine尝试去获取一个写锁时,会将当前持有读锁的数量readerCount赋值给readerWait,表示当前goroutine要等待readerWait个goroutine释放读锁之后才能成功获取写锁

(二)Lock

// 获取写锁,会等待所有的读锁释放
func (rw *RWMutex) Lock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	//  获取互斥锁.
	rw.w.Lock()
	 
  // readerCount+(-rwmutexMaxReaders),将readerCount变为负数,表示阻塞的读数量
  // 结果 + rwmutexMaxReaders 重新将结果变为正数,所以r的值就是readerCount的绝对值,
  // 然后readerWait累加r,表示阻塞等待的读数量
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// r!=0表示当前还有读锁,需要将写锁加入writerSem队尾,等待唤醒
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
     // 将当前goroutine进行睡眠,并排队到writerSem队列的队尾,等待
    // 函数rUnlockSlow中当所有读锁readerWait=0释放之后,会唤醒写锁
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
		race.Acquire(unsafe.Pointer(&rw.writerSem))
	}
}

(三)Unlock

// 唤醒因读锁定而被阻塞的协程(如果有的话)
// 解除互斥锁
func (rw *RWMutex) Unlock() {
	if race.Enabled {
		_ = rw.w.state
		race.Release(unsafe.Pointer(&rw.readerSem))
		race.Disable()
	}

	// 在上锁的时候,readerCount-rwmutexMaxReaders
  // 所以解锁的时,再+rwmutexMaxReaders,使readerCount变为正数,表示无写锁存在
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders+)
	if r >= rwmutexMaxReaders { // 读的数量超了
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
	// 唤醒readerSem队列中的读等待goroutine
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// 释放互斥锁
	rw.w.Unlock()
  
	if race.Enabled {
		race.Enable()
	}
}

(四)TryRLock

非阻塞读锁,和Rlock的区别在于:TryRLock是非阻塞的,也就是当锁被占用时,直接返回false,而RLock会自旋等待其他 goroutine 释放读锁

(五)Rlock

读锁定会先将RWMutext.readerCount加1,此时写操作到来时发现读者数量不为0,会阻塞等待所有读操作结束。

也就是说,读操作通过readerCount来将来阻止写操作的。

// 上读锁
func (rw *RWMutex) RLock() {
	if race.Enabled { // 竞态检测
		_ = rw.w.state
		race.Disable()
	}
  // 增加读的次数readerCount+1
    // 1、如果readerCount<0。说明写锁已经获取了,那么这个读锁需要等待写锁的完成
    // 2、如果readerCount>=0。当前读直接获取锁
 
	if atomic.AddInt32(&rw.readerCount, 1) < 0 { 
  
    // 当写锁定进行时,会先将readerCount减去2^30(最大读个数),从而readerCount变成了负值,
    // 此时再有读锁定到来时检测到readerCount为负值,便知道有写操作在进行,只好阻塞等待。
    // 而真实的读操作个数并不会丢失,只需要将readerCount加上2^30即可获得。
		 // 当前有个写锁,读操作阻塞等待写锁释放, 则当前goroutine进入休眠,在readerSem信号中排队(排到最后一位).
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
	}
}

(六)RUnlock

func (rw *RWMutex) RUnlock() {
	if race.Enabled {
		_ = rw.w.state
		race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
		race.Disable()
	}
  // 读个数减1,readerCount+(-1)
  	// 1.若readerCount>0,证明当前还有读锁,直接结束本次操作
    // 2.若readerCount<=0,证明已经没有读锁,可以唤醒写锁(若有)
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// 唤醒可能等待的写锁
		rw.rUnlockSlow(r)
	}
	if race.Enabled {
		race.Enable()
	}
}
// 唤醒可能等待的写锁 ,r 是读者的数量
func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		race.Enable()
		throw("sync: RUnlock of unlocked RWMutex")
	}
	// 若等待读取的 goroutine(readerWait) 数量为0,则唤醒等待的写锁 (writerSem)
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// The last reader unblocks the writer.
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

三、常见问题

1. 什么是CAS,什么是原子操作

CAS(Compare and Swap)比较并交换,顾名思义:比较两个值,如果他们两者相等就把他们交换。这是一个由CPU硬件提供并实现的原子操作

原子操作:操作系统提高的锁机制来保证操作的原子性和线程安全性。这种锁机制可以使执行==原子操作的 CPU 独占内存总线==或者缓存,并防止其他 CPU 对同一内存地址进行读写操作,从而避免了数据竞争的问题。

具体来说,在执行原子操作时,CPU 会向内存总线或者缓存发送锁请求信号,然后等待锁授权。一旦锁授权成功,CPU 就可以将操作的结果写入内存,然后释放锁。其他 CPU 在锁被释放之前不能对同一内存地址进行读写操作,从而保证了操作的原子性和线程安全性。

需要注意的是,==原子操作增加 CPU 的开销和内存带宽的消耗==

2. 写操作是如何阻止写操作的

sync.Mutex中的state字段标记了当前锁的状态,如果是锁定状态,那新的获取写锁的goroutine会自旋阻塞等待(sema信号量)

3. 写操作是如何阻止读操作的

sync.RWMutex中的readCount记录了当前持有读锁的数量,当有写锁获取时,readCount会变为负数,因此在获取读锁时,如果readCount<0则表示当前写锁占有,读锁的获取操作阻塞等待(readerSem)

4. 读操作是如何阻止写操作的

sync.RWMutex中的readCount记录了当前持有读锁的数量,当有写锁获取时,如果readCount==0则写锁获取成功,然后将readCount会变为负数,如果readCount!=0,则会进行阻塞等待,排队到信号量writerSem队列中,直达所有的读锁都释放才唤醒

5. 为什么写锁定不会被饿死

go1.9 引入了饥饿模式,防止goroutine长时间阻塞

有劳各位看官 点赞、关注➕收藏 ,你们的支持是我最大的动力!!! 接下来会不断更新 golang 的一些底层源码及个人开发经验(个人见解)!!! 同时也欢迎大家在评论区提问、分享您的经验和见解!!!

本文由博客一文多发平台 OpenWrite 发布!