掘金 后端 ( ) • 2024-04-22 09:59

golang 的信号弹性模式

创建信号量需要两个参数:

票数(一次发多少张票)。

超时(如果当前没有可用的票证,则等待多长时间)

开始分析前,我们需要先知道信号量的用途是什么,我用国内的智谱清言问了一下。

信号量的用途主要包括:

  1. 限制并发访问:通过信号量,可以限制同时访问某个资源或执行某个操作的并发 goroutine 的数量,防止过度的并发导致资源耗尽,如数据库连接、文件描述符或硬件设备等
  2. 流量控制:在处理大量请求时,可以用来控制请求处理的速率,防止后端服务因请求过多而崩溃
  3. 资源池管理:例如数据库连接池、线程池等,信号量可以确保池中的资源被合理地分配和回收
  4. 提高系统稳定性:在面临突发高流量或系统异常时,信号量机制能够保证系统按照预设的并发级别工作,避免雪崩效应
  5. 分布式系统中的协调:在分布式系统中,信号量可以用于不同服务或节点之间的协调,保证全局资源的合理使用

知道了信号量的用途后,我们来了解一下 go-resiliency 是如何实现信号量的

仓库给的例子:

sem := semaphore.New(3, 1*time.Second)

if err := sem.Acquire(); err != nil {
	// could not acquire semaphore
	// 无法获取信号量
	return err
}
defer sem.Release()

创建信号量时,传入了两个参数,一个是票证的数量,一个是等待的时间

Semaphore 的结构如下:

// Semaphore implements the semaphore resiliency pattern
// 信号量实现信号量弹性模式
type Semaphore struct {
	sem     chan struct{} // 票证
	timeout time.Duration // 等待时间
}

获取信号量

// Acquire tries to acquire a ticket from the semaphore. If it can, it returns nil.
// If it cannot after "timeout" amount of time, it returns ErrNoTickets. It is
// safe to call Acquire concurrently on a single Semaphore.
// Acquire 尝试从信号量获取票证。
// 如果可以,则返回 nil。
// 如果在“超时”时间后无法返回,则返回 ErrNoTickets。在单个信号量上同时调用 Acquire 是安全的。
func (s *Semaphore) Acquire() error {
	select {
	case s.sem <- struct{}{}:
		return nil
	case <-time.After(s.timeout):
		return ErrNoTickets
	}
}

// ErrNoTickets is the error returned by Acquire when it could not acquire
// a ticket from the semaphore within the configured timeout.
// ErrNoTickets 是 Acquire 在配置的超时时间内无法从信号量获取票证时返回的错误。
var ErrNoTickets = errors.New("could not acquire semaphore ticket")

释放信号量

// Release releases an acquired ticket back to the semaphore. It is safe to call
// Release concurrently on a single Semaphore. It is an error to call Release on
// a Semaphore from which you have not first acquired a ticket.
// Release 将获取的票证释放回信号量。
// 在单个信号量上同时调用 Release 是安全的。
// 在未首先获得票证的信号量上调用 Release 是错误的。
func (s *Semaphore) Release() {
	<-s.sem
}

这个实现比较简单,创建信号量的时候,根据我们传入的票证数量,创建 sem 信号量通道,如果传入的票证数量不为 0,那么这个 sem 是带缓冲区的通道,获取信号量的时候,往 sem 写数据,如果缓冲区数据没有满,写成功后,返回 nil,成功获取信号量;如果 sem 已经满了,等到 select 到超时的那个分支时,返回 ErrNoTickets 的错误

释放信号量的时候,从 sem 通道读数据,释放缓冲区的数据,让后面尝试去获取信号量的,能够正常往 sem 写数据,从而成功获取信号量、

注意点1

这里要注意的点是,我们看代码信号量的代码还有另外一个函数

// IsEmpty will return true if no tickets are being held at that instant.
// It is safe to call concurrently with Acquire and Release, though do note
// that the result may then be unpredictable.
// 如果当时没有持有票证,
// IsEmpty 将返回 true。
// 与 Acquire 和 Release 同时调用是安全的,
// 但请注意,结果可能是不可预测的
func (s *Semaphore) IsEmpty() bool {
	return len(s.sem) == 0
}

这里注意的点是,如果对 channel 不熟悉,可以会误以为这里判断的 sem 通道的长度,比如一下代码,我们创建票证数量的数为3

func TestSemaphoreEmpty2(t *testing.T) {
	sem := New(3, 200*time.Millisecond)
	t.Log(sem.IsEmpty())
	if !sem.IsEmpty() {
		t.Error("semaphore should be empty")
	}
}

output:

=== RUN   TestSemaphoreEmpty2
    semaphore_test.go:71: true
--- PASS: TestSemaphoreEmpty2 (0.00s)
PASS

Process finished with the exit code 0

可以看到 IsEmpty 函数返回的是 true,即 len(s.sem) == 0 条件成立,是不是觉得很奇怪,其实当我们去判断 channel 的长度时,判断的是它的缓冲区的还未被读取的数据 (学到了吧 😃)

注意点2

在没获取到信号量的时候,进行释放信号量操作,会导致死锁

func TestSemaphoreEmpty(t *testing.T) {
	sem := New(2, 200*time.Millisecond)

	if !sem.IsEmpty() {
		t.Error("semaphore should be empty")
	}

	sem.Release()
}

output:

=== RUN   TestSemaphoreEmpty
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
testing.(*T).Run(0xc000051380, {0x91f60a?, 0x84d513?}, 0x929520)
	C:/Program Files/Go/src/testing/testing.go:1630 +0x405

testing.runTests.func1(0xa13280?)
	C:/Program Files/Go/src/testing/testing.go:2036 +0x45
testing.tRunner(0xc000051380, 0xc000117c88)
	C:/Program Files/Go/src/testing/testing.go:1576 +0x10b

testing.runTests(0xc0000781e0?, {0xa0e1c0, 0x3, 0x3}, {0xc0001212a0?, 0x100c000117d10?, 0x0?})
	C:/Program Files/Go/src/testing/testing.go:2034 +0x489

testing.(*M).Run(0xc0000781e0)
	C:/Program Files/Go/src/testing/testing.go:1906 +0x63a

main.main()
	_testmain.go:51 +0x1aa

goroutine 6 [chan receive]:
github.com/eapache/go-resiliency/semaphore.(*Semaphore).Release(...)
	C:/code/go_code/good/go-resiliency/semaphore/semaphore.go:44
github.com/eapache/go-resiliency/semaphore.TestSemaphoreEmpty(0xc000051520)
	C:/code/go_code/good/go-resiliency/semaphore/semaphore_test.go:56 +0x97
testing.tRunner(0xc000051520, 0x929520)
	C:/Program Files/Go/src/testing/testing.go:1576 +0x10b

created by testing.(*T).Run
	C:/Program Files/Go/src/testing/testing.go:1629 +0x3ea

Process finished with the exit code 1

Release 操作,是从 sem 通道读取数据,如果这时没有数据,就阻塞在这里的

注意点3,创建信号量的时候,不要误传了票证的数量为 0

func TestSemaphoreEmpty(t *testing.T) {
	sem := New(0, 200*time.Millisecond)

	if !sem.IsEmpty() {
		t.Error("semaphore should be empty")
	}

	sem.Acquire()

	sem.Release()
	
}

output:

=== RUN   TestSemaphoreEmpty
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
testing.(*T).Run(0xc000051380, {0x87f60a?, 0x7ad513?}, 0x889520)
	C:/Program Files/Go/src/testing/testing.go:1630 +0x405

testing.runTests.func1(0x973280?)
	C:/Program Files/Go/src/testing/testing.go:2036 +0x45
testing.tRunner(0xc000051380, 0xc000117c88)
	C:/Program Files/Go/src/testing/testing.go:1576 +0x10b
testing.runTests(0xc000078140?, {0x96e1c0, 0x3, 0x3}, {0xc000121378?, 0x100c000117d10?, 0x0?})
	C:/Program Files/Go/src/testing/testing.go:2034 +0x489

testing.(*M).Run(0xc000078140)
	C:/Program Files/Go/src/testing/testing.go:1906 +0x63a

main.main()
	_testmain.go:51 +0x1aa

goroutine 6 [chan receive]:
github.com/eapache/go-resiliency/semaphore.(*Semaphore).Release(...)
	C:/code/go_code/good/go-resiliency/semaphore/semaphore.go:44
github.com/eapache/go-resiliency/semaphore.TestSemaphoreEmpty(0xc000051520)
	C:/code/go_code/good/go-resiliency/semaphore/semaphore_test.go:58 +0x9f
testing.tRunner(0xc000051520, 0x889520)
	C:/Program Files/Go/src/testing/testing.go:1576 +0x10b

created by testing.(*T).Run
	C:/Program Files/Go/src/testing/testing.go:1629 +0x3ea

Process finished with the exit code 1

这个原因跟注意点 2 一样,sem.Acquire() 获取信号量这一步是没有问题的,但是因为票证数量为 0,那么我们创建的信号量的 sem 字段是不带缓冲区的通道类型,这时候再去释放信号量的时候,就会阻塞在那里了

!!! select 的 case 是随机的,而 switch 里的 case 是顺序执行