掘金 后端 ( ) • 2024-04-28 10:00

写在文章开头

本文会给基于我们之前所学的知识进行一个综合的实践,通过单个协程实现一个生产者生产者模式,希望能够让你对go语言的开发有着更深刻的理解和掌握。

Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

案例描述

我们希望实现一个单消费者的模型,外部协程通过我们的消费者所提供的入口将任务提交至消费列表中,在规定时间内,单例协程消费者会不断执行任务,知道时间停止或者任务执行完成,若规定时间内未能将任务执行完成,则抛出超时异常:

实现

我们给出消费者的结构体:

  1. 包含程序必要的终止信号,如果收到系统发出的中断信号我们的程序则会直接停止。
  2. 存储任务的切片列表tasks ,该切片存储的是函数,这意味着我们后续可以直接从切片中拿到函数并直接执行该方法。
  3. 当我们任务执行完成后我们会通过complete 发送信号,如果为nil则说明所有任务执行完成,中间没有任何异常。若有异常我们则会通过complete 获取错误内容并返回。
  4. 上文我们提到这个消费者需要在规定时间内完成任务,所以我们通过timeout 这个变量指定超时时间,如果timeout这个通道收到消息则说明时间到,若任务没有完成则抛出异常。
type Runner struct {
 //监听操作系统中断信号
 interrupt chan os.Signal
 //完成信号,可能会抛出异常
 complete chan error
 //时间截至信号,到期会发信号
 timeout <-chan time.Time
 //任务切片
 tasks []func(int)
}

针对上文中的系统中断和超时我们给出两个错误的全局变量定义:

var ErrInterrupt = errors.New("interrupt")
var ErrTimeout = errors.New("timeout")

我们通过New方法完成消费者Runner的创建,可以看到这个方法会默认初始化中断信号和完成信号的通道,并通过外部传入的数值完成超时信号的初始化:

func New(d time.Duration) *Runner {
 return &Runner{
  //默认容量为1
  interrupt: make(chan os.Signal, 1),
  //完成信号初始化
  complete: make(chan error),
  timeout:  time.After(d),
 }
}

同时我们也给出Runner添加任务的入口,我们的Add方法支持添加多个任务,该方法内部是基于append方法将多任务添加到我们的切片中,然后将最终切片的指针地址赋值给我们的tasks

func (r *Runner) Add(tasks ...func(int)) {
 r.tasks = append(r.tasks, tasks...)
}

基于上述的tasks,我们的Runner会在程序没有收到中断信号期间直接遍历并执行这些任务:

func (r *Runner) run() error {
 //遍历切片执行任务
 for i, task := range r.tasks {

  if r.isInterrupt() {
   return ErrInterrupt
  }
  //执行当前任务
  task(i)
 }

 return nil
}

//判断当前协程是否收到操作系统的中断信号,若有则直接终止程序运行
func (r *Runner) isInterrupt() bool {
 select {
 case <-r.interrupt:
  signal.Stop(r.interrupt)
  return true
 default:
  return false
 }
}

最后我们就是暴露一个Start提供给外部启动的我们的消费者,该方法会直接监听操作系统的中断信号,并启动协程开始执行tasks,并将最终执行结果通过complete 告知用户,如果不为nil则说明有错误,我们通过select 阻塞判断这个错误并返回:

func (r *Runner) Start() error {
 //监听中断信号
 signal.Notify(r.interrupt, os.Interrupt)

 //启动协程工作
 go func() {
  r.complete <- r.run()
 }()
 //根据协程执行结果返回的complete看看是否是否为nil,若不是则返回对应异常
 select {
 case err := <-r.complete:
  return err
 case <-r.timeout:
  return ErrTimeout

 }

}

自此我们就将一个单协程的消费者编写完成,我们给出完整的代码:

type Runner struct {
 //监听操作系统中断信号
 interrupt chan os.Signal
 //完成信号,可能会抛出异常
 complete chan error
 //时间截至信号,到期会发信号
 timeout <-chan time.Time
 //任务切片
 tasks []func(int)
}

var ErrInterrupt = errors.New("interrupt")
var ErrTimeout = errors.New("timeout")

func New(d time.Duration) *Runner {
 return &Runner{
  //默认容量为1
  interrupt: make(chan os.Signal, 1),
  //完成信号初始化
  complete: make(chan error),
  timeout:  time.After(d),
 }
}

// Add 添加任务
func (r *Runner) Add(tasks ...func(int)) {
 r.tasks = append(r.tasks, tasks...)
}

func (r *Runner) Start() error {
 //监听中断信号
 signal.Notify(r.interrupt, os.Interrupt)

 //启动协程工作
 go func() {
  r.complete <- r.run()
 }()

 select {
 case err := <-r.complete:
  return err
 case <-r.timeout:
  return ErrTimeout

 }

}

func (r *Runner) run() error {
 //遍历切片执行任务
 for i, task := range r.tasks {

  if r.isInterrupt() {
   return ErrInterrupt
  }
  //执行当前任务
  task(i)
 }

 return nil
}

// 判断当前协程是否收到操作系统的中断信号,若有则直接终止程序运行
func (r *Runner) isInterrupt() bool {
 select {
 case <-r.interrupt:
  signal.Stop(r.interrupt)
  return true
 default:
  return false
 }
}

测试

基于这个runner,我们给出测试代码,我们通过createTask返回一个休眠一段时间并输出结果的方法,将其存到我们创建的runner列表中。最后通过Start方法将其启动,并阻塞等待完成或者超时,将结果打印到控制台上:

const timeout = 5 * time.Second

func main() {
 log.Println("runner开始工作")
 //创建一个runner
 r := runner.New(timeout)

 //添加任务
 r.Add(createTask(), createTask(), createTask())
 
 //启动runner执行任务
 if err := r.Start(); err != nil {
  //根据Start返回的结果决定是否输出错误信息
  switch err {
  case runner.ErrTimeout:
   log.Println("任务超时")
   os.Exit(1)
  case runner.ErrInterrupt:
   log.Println("任务被中断")
   os.Exit(2)
  }
 }

 log.Println("runner任务处理完成")
}

//创建一个休眠一段时间输出id的方法
func createTask() func(int2 int) {
 return func(id int) {
  log.Println("执行任务", id)
  time.Sleep(time.Duration(id) * time.Second)
  log.Println("任务", id, "处理完成")
 }
}

对应的输出结果如下:

2024/04/27 23:44:34 runner开始工作
2024/04/27 23:44:34 执行任务 0
2024/04/27 23:44:34 任务 0 处理完成
2024/04/27 23:44:34 执行任务 1
2024/04/27 23:44:35 任务 1 处理完成
2024/04/27 23:44:35 执行任务 2
2024/04/27 23:44:37 任务 2 处理完成
2024/04/27 23:44:37 runner任务处理完成

当然如果我们添加更多任务或者减小超时时间,就可能出现超时异常,对应输出如下:

2024/04/27 23:45:31 runner开始工作
2024/04/27 23:45:31 执行任务 0
2024/04/27 23:45:31 任务 0 处理完成
2024/04/27 23:45:31 执行任务 1
2024/04/27 23:45:32 任务 1 处理完成
2024/04/27 23:45:32 执行任务 2
2024/04/27 23:45:32 任务超时  

小结

自此我们就通过go语言的协程、切片、channel实现了一个简单的单协程消费者模型,希望对你有帮助。

我是 sharkchiliCSDN Java 领域博客专家开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。 因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

参考

《go in action》

本文使用 markdown.com.cn 排版