go实现系统调用 (go开发用的消息队列)

实际业务中常常会遇到这种场景:一个负责准备资源,等资源准备好后,其他的多个再来读。

Go有两种方式来实现这种场景。

  • 一、条件锁

资源准备好,可以视为一个条件。Go的sync标准库中提供了Cond条件锁可以来做这件事情。下面是sync.Cond的结构:

type Cond struct {
	noCopy noCopy
	L Locker
	notify  notifyList
	checker copyChecker
}

noCopy是go vet用来检查是否有Cond类型的变量被copy了。checker是存放了Cond变量本身的地址,同样用来判断是否被copy了。这两项是保障Cond变量不能被复制做的检查,了解即可。

主要的部分是L这个Locker接口,实现了Lock()、Unlock()方法。这个是用来锁条件的,保证条件的原子操作。还有notify是个双向链表,存放资源准备好以后要通知哪些goroutine。

package sync

import (
	"log"
	"strconv"
	"sync"
	"testing"
	"time"
)

var done = false

func read(name string, c *sync.Cond) {
	c.L.Lock()
	for !done {
		c.Wait()
	}
	log.Println(name, "starts reading")
	c.L.Unlock()
}

func write(name string, c *sync.Cond) {
	log.Println(name, "starts writing")
	time.Sleep(time.Second)
	c.L.Lock()
	done = true
	c.L.Unlock()
	log.Println(name, "wakes all")
	c.Broadcast()
}

func TestSyncCond(t *testing.T) {
	cond := sync.NewCond(&sync.Mutex{})

	for i := 0; i < 10; i++ {
		go read("reader"+strconv.Itoa(i), cond)
	}
	write("writer", cond)

	time.Sleep(time.Second * 3)
}

Wait()方法首先会L.Unlock()所以在用之前需要先L.Lock(),条件需要循环检查,所以用了for循环。然后,Wait()方法会挂起调用它的goroutine,当被Broadcast()或Signal()方法唤醒后,会执行L.Lock(),故而后边还得执行L.Unlock()。所以上边的read函数会那样写。

Broadcast()方法用于全部唤醒,Signal()仅唤醒一个。

  • 二、channel

利用关闭channel来实现通知,简单直接。

package main

import (
	"fmt"
	"sync"
	"time"
)

func makeBeginSignal() <-chan struct{} {
	begin := make(chan struct{})
	go func() {
		fmt.Println("waiting for resource to be ready...")
		time.Sleep(5 * time.Second)
		fmt.Println("ready to go!")
		close(begin)
	}()
	return begin
}

func worker(begin <-chan struct{}, wg *sync.WaitGroup) {
	<-begin
	fmt.Println("worker begin to work!")
	wg.Done()
}

func main() {
	sig := makeBeginSignal()
	wg := &sync.WaitGroup{}
	wg.Add(10)
	for i := 0; i < 10; i++ {
		worker(sig, wg)
	}
	wg.Wait()
}