一、sync.WaitGroup 作用

sync.WaitGroup 的作用是可以通过阻塞主协程的手段等待其它一组子协程完成。工作原理是基于计数器实现的,通过计数器来获取协程的完成情况。具体来说,只要启动一个协程,计数器就会+1;一旦其中某个协程退出,计数器就会-1,然后使用 wait 方法阻塞主协程,等待计数器清零后才能继续执行后续操作。

二、sync.WaitGroup 应用场景

sync.WaitGroup 的应用场景也很好理解,从它的作用上就可以看出,通过多个子协程并行执行一组任务,且任务全部完成后才能进行下一步操作的情况,所以对于某项业务逻辑执行前,需要先执行多个任务,且这些任务我们认为相互独立,那么这时就可以使用 WaitGroup 实现整体功能。

三、sync.WaitGroup 陷阱

sync.WaitGroup 陷阱是指协程间传递时需要以指针的方式或闭包的方式引用 WaitGroup 对象,否则会造成死锁。造成死锁的原因:在 go 语言中进行参数的传递都是值拷贝,也就是说如果对 WaitGroup 对象进行值拷贝,那么计数器也会被复制出一个新的来,这样就永远不会等到计数器归 0,造成了死锁。

四、sync.WaitGroup 用法

需求一:方法 mulit 计算两个 int 类型的乘积,这里需要执行该方法 10 亿次,使用 sync.WaitGroup 并行计算缩短执行时常。

实现代码如下:

func WaitGroup() {
	start := time.Now()
	var a, b = 1000, 10000
	for i := 0; i < 10000000000; i++ {
		multi(a, b)
	}
	end := time.Since(start)
	fmt.Println(end)

	// sync.WaitGroup 多协程并行计算,此电脑cpu为四核
	start = time.Now()
	wg := sync.WaitGroup{}
	for i := 0; i < 4; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for j := 0; j < 2500000000; j++ {
				multi(a, b)
			}
		}()
	}
	wg.Wait()
	end = time.Since(start)
	fmt.Println(end)
}

func multi(a, b int) int {
	return a * b
}

并行执行开启的协程数设置为cpu的核数。

输出结果如下:

可以发现,开启四个协程并行执行比串行执行计算速度上快了近6倍。

需求二:生产者开启两个协程为 []int 数组类型的 channel 生产数据,消费者从 channel 中读取数据

实现代码如下:

func WaitGroup1() {
	ch := make(chan []int, 1000)

	wgConsumer := sync.WaitGroup{}
	wgConsumer.Add(1)
	// 消费数据
	go func() {
		defer wgConsumer.Done()
		i := 0
		for item := range ch {
			fmt.Println(multi(item[0], item[1]))
			i++
		}
		time.Sleep(time.Second * 3)
		fmt.Println("数据处理完成,数据条数: ", i)
	}()

	// 生产数据
	wgProducer := &sync.WaitGroup{}
	for i := 1; i < 3; i++ {
		wgProducer.Add(1)
		wgConsumer.Add(1)
		go func(wg1 *sync.WaitGroup) {
			defer wg1.Done()
			defer wgConsumer.Done()
			for j := 1; j < 501; j++ {
				ch <- []int{i, j}
			}
		}(wgProducer)
	}
	// 等待所有生产者协程完成信号
	wgProducer.Wait()
	// 关闭 channel 后才能从中读取数据
	close(ch)
	// 等待所有的消费者协程信号
	wgConsumer.Wait()

}

func multi(a, b int) int {
	return a * b
}

需求三:用三个线程分别打印 123321 123321 循环 10 次,其中一个循环展示的效果如下:

线程1->1
线程2->2
线程3->3
线程1->3
线程2->2
线程3->1

分析思路:一个循环中,三个线程打印 123321 共 6 个数字,可以将两个数字为一组分配给其中一个线程负责打印,比如线程1负责打印1和3,线程2负责打印2和2,线程3负责打印3和1。结合Go语言中的通道用法实现线程1打印1后通知线程2打印2,再通知线程3打印3,反过来,然后再由线程3打印1,线程2打印2,线程1打印3。至此,一个循环结束。

实现代码如下:

func main() {
	var wg sync.WaitGroup
	wg.Add(3)

	ch1 := make(chan struct{})
	ch2 := make(chan struct{})
	ch3 := make(chan struct{})

	// 每个协程内部打印10次
	count := 10

	// 协程1
	go func() {
		defer wg.Done()
		for i := 0; i < count; i++ {
			<-ch1
			fmt.Printf("协程1->%v\n", 1)
			ch2 <- struct{}{}

			<-ch1
			fmt.Printf("协程1->%v\n", 3)
			ch2 <- struct{}{}
		}
	}()

	// 协程2
	go func() {
		defer wg.Done()
		for i := 0; i < count; i++ {
			<-ch2
			fmt.Printf("协程2->%v\n", 2)
			ch3 <- struct{}{}

			<-ch2
			fmt.Printf("协程2->%v\n", 2)
			ch3 <- struct{}{}
		}
	}()

	// 协程3
	go func() {
		defer wg.Done()
		for i := 0; i < count; i++ {
			<-ch3
			fmt.Printf("协程3->%v\n", 3)
			ch1 <- struct{}{}

			<-ch3
			fmt.Printf("协程3->%v\n", 1)

			// 打印完123321,开始下一轮
			if i < count-1 {
				fmt.Println()
				ch1 <- struct{}{}
			}
		}
	}()

	// 启动第一个协程
	ch1 <- struct{}{}

	// 阻塞
	wg.Wait()
}

四、sync.Cond 作用

Cond 是 Condition 的缩写,代表条件阻塞。sync.Cond 的作用是设置一组协程,这些协程可以根据不同的条件选择是否阻塞。而 sync.WaitGroup 协程对这一组子协程不会阻塞,阻塞的是主协程。

五、sync.Cond 应用场景

应用于一发多收的场景,即一组协程需要等待某一个协程完成一些前置准备的情况。比如说,有一组协程都是读数据,那么必须有一个前置协程进行写操作后才可以读。在前置协程进行写的过程中,那一组读数据的协程都在等待,等到前置协程写完后,我们可以根据业务需求选择性的唤醒等待的读数据协程,可以唤醒一个等待中的读协程,也可以唤醒两个等待中的读协程,当然也可以唤醒全部等待中的读协程。

六、sync.Cond 注意事项

  • 被调方必须持有锁(一组协程)
  • 主调方可以持有锁,但允许不持有(前置协程)
  • 尽可能的减少无效唤醒

七、sync.Cond 用法

需求一:有一个 int 类型的数组,有多个被叫方协程等待从数组中读取数据,而主叫方对该数组进行写操作,需要唤醒被叫方协程。

设计:被叫方 readList 方法作用是读取数据,开启多个协程来执行 readList 方法。由于数组没有数据,所以这些 readList 协程都会被阻塞,等待主叫方写完数据后唤醒。而 initList 方法是主叫方,因为 initList 是写操作,可以由 initList 来唤醒所有其它被阻塞的读操作的协程,因此 initList 叫做主叫方。主叫方可以持有锁,也可以不持有锁,一切都取决于业务需求。

主叫方方法 initList 实现如下:

func initList(list *[]int, c *sync.Cond) {
	// 主叫方,可以持有锁,也可以不持有锁,根据业务需求来设置
	// 这里我们设置主叫方持有锁
	c.L.Lock()
	defer c.L.Unlock()

	// 向数组 list 中写入数据
	for i := 0; i < 10; i++ {
		*list = append(*list, i)
	}

	// 写完数据后唤醒其它读数据的协程,通过广播机制唤醒所有协程
	c.Broadcast()
}

被叫方方法 readList 实现如下:

func readList(list *[]int, c *sync.Cond) {
	// 被叫方,必须持有锁
	c.L.Lock()
	defer c.L.Unlock()

	// 通过循环判断等待
	for len(*list) == 0 {
		fmt.Println("read list waiting...")
		c.Wait()
	}

	// 有数据写入数组后该协程就会被唤醒,打印数据
	fmt.Println("list 数据为: ", list)
}

可导出方法 CondCase 实现写入与读取数组的功能,实现如下:

func CondCase() {
	list := make([]int, 0)
	cond := sync.NewCond(&sync.Mutex{}) // sync.Mutex 是互斥锁

	// 开启三个读数据协程
	for i := 0; i < 3; i++ {
		go readList(&list, cond)
	}

        // 这里睡眠1秒是为了防止三个读数据协程还没执行完,就执行了 initList 方法
	time.Sleep(time.Second * 1)

	// 调用主叫方协程,向数组中写入数据
	initList(&list, cond)
}

需求二:向队列中写入数据的同时开启多个协程来读取数据,要求每个协程读取的数据个数是该协程的标号数字对应的数量,即1号协程只能读取1个数字,5号协程只能读取5个数字。

1. 构建队列数据结构的结构体

// queue 队列结构体
type queue struct {
	list []int
	cond *sync.Cond
}

由于队列底层数据结构就是数组,所以结构体 queue 内设置 []int 类型的变量。

2. 初始化队列结构体

// newQueue 初始化队列结构体
func newQueue() *queue {
	q := &queue{
		list: []int{},
		cond: sync.NewCond(&sync.Mutex{}),
	}
	return q
}

Go 代码中贯彻面向接口编程的思想。

3. 向队列中添加数据的方法 Put

// Put 向队列中加入数据
func (q *queue) Put(item int) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	q.list = append(q.list, item)

	// 当数据写入成功后,唤醒一个协程来处理数据(这里只唤醒一个协程所以用Signal()方法)
	q.cond.Signal()
}

4. 从队列中读取 n 个数据的方法 GetMany

// GetMany 从队列中读取 n 个数据(如果队列中不够 n 个数据,就一直等待,读取时将前 n 个数据从队列中截取出来)
func (q *queue) GetMany(n int) []int {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	// 队列中不足 n 个数就一直等待
	for len(q.list) < n {
		q.cond.Wait()
	}
	// 队列中达到 n 个或者更多数据时,截取队列
	list := q.list[:n]
	q.list = q.list[n:]

	return list
}

5. 可导出的方法 CondQueueCase 实现写入和读取数据的功能,具体实现如下:

// CondQueueCase 一共会将 1-100,即100个数写入队列中,然后开启 10 个协程,每个协程根据自己的协程标号从这 100 个数中读取数据
// 比如说,1号协程只会读取1个数,5号协程只会读取5个数,10号协程只会读取10个数据。
// 1+2+3+4+5+6+7+8+9+10 = 55 < 100,因此在这个规则下,该队列中的100个数是不会被这10个协程读取完成的,一定会剩下45个数
// 主要是看这10个协程各自都读取了什么数字,通过这个示例让读者对 sync.Cond 的等待与唤醒机制了解的更透彻
func CondQueueCase() {
	q := newQueue()

	// 创建 sync.WaitGroup 来等待开启的这10个协程都执行完成后才开始执行主协程
	var wg sync.WaitGroup

	// 开启 10 个协程获取数据
	for i := 1; i <= 10; i++ {
		wg.Add(1)
		go func(n int) {
			defer wg.Done()
			list := q.GetMany(n)
			fmt.Printf("协程【%v】读取的数据为:%v\n", n, list)
		}(i)
	}

	// 向队列中写入100个数
	for i := 0; i < 100; i++ {
		q.Put(i)
	}

	// 阻塞主协程,等待前面的一组子协程执行完成
	wg.Wait()
}

在 main.go 中调用 CondQueueCase 方法


func main() {

	//_case.CondCase()
	_case.CondQueueCase()

	// 阻塞主协程
	ctx, stop := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
	defer stop()
	<-ctx.Done()
}

输出结果如下:

从输出结果中可以看出,每个协程获取得到的数据数量都与其协程编号相等,符合需求。

从两个需求示例中使用的 sync.Cond 情况可以看出来,都是先开启多个协程调用被叫方方法,然后因为被叫方不能读取数据导致这些协程阻塞等待,之后再调用主叫方,向数组中写入数据由此来唤醒被叫方所在的协程,实现”一发多收“的效果。

Categories:

Tags:

No responses yet

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注