GoLang之使用sync.Cond

GoLang之使用sync.Cond

1.Cond结构体

1.1.Cond结构体

Cond实现了一个条件变量,一个线程集合地,供线程等待或者宣布某事件的发生。
每个Cond实例都有一个相关的锁(一般是Mutex或RWMutex类型的值),它必须在改变条件时或者调用Wait方法时保持锁定。Cond可以创建为其他结构体的字段,Cond在开始使用后不能被拷贝。

type Cond struct {
    // 在观测或更改条件时L会冻结
    L Locker
    // 包含隐藏或非导出字段
}

注: Locker是一个接口

type Locker interface {
	Lock()
	Unlock()
}

1.2Broadcast方法

Broadcast唤醒所有等待c的线程。调用者在调用本方法时,建议(但并非必须)保持c.L的锁定。

func (*Cond) Broadcast()

1.3Signal方法

Signal唤醒等待c的一个线程(如果存在)。调用者在调用本方法时,建议(但并非必须)保持c.L的锁定。

func (c *Cond) Signal()

1.4Wait方法

func (c *Cond) Wait()

Wait自行解锁c.L并阻塞当前线程,在之后线程恢复执行时,Wait方法会在返回前锁定c.L。和其他系统不同,Wait除非被Broadcast或者Signal唤醒,不会主动返回。

因为线程中Wait方法是第一个恢复执行的,而此时c.L未加锁。调用者不应假设Wait恢复时条件已满足,相反,调用者应在循环中等待:

c.L.Lock()
for !condition() {
    c.Wait()
}
... make use of condition ...
c.L.Unlock()

2.NewCond函数

使用锁l创建一个*Cond。

func NewCond(l Locker) *Cond

3.介绍

3.1Cond结构体

(1)Cond结构体

Golang的sync包中的Cond实现了一种条件变量,可以使用在多个Reader等待共享资源ready的场景(如果只有一读一写,一个锁或者channel就搞定了)。
Cond的汇合点:多个goroutines等待、1个goroutine通知事件发生。
每个Cond都会关联一个Lock(*sync.Mutex or *sync.RWMutex),当修改条件或者调用Wait方法时,必须加锁,保护condition。

type Cond struct {
        // L is held while observing or changing the condition
        L Locker
        // contains filtered or unexported fields
}

(2)Broadcast

Broadcast会唤醒所有等待c的goroutine。
调用Broadcast的时候,可以加锁,也可以不加锁。

func (c *Cond) Broadcast()

(3)Signal

Signal只唤醒1个等待c的goroutine。
调用Signal的时候,可以加锁,也可以不加锁。

func (c *Cond) Signal()

(4)Wait

func (c *Cond) Wait()

Wait()会自动释放c.L,并挂起调用者的goroutine。之后恢复执行,Wait()会在返回时对c.L加锁。
除非被Signal或者Broadcast唤醒,否则Wait()不会返回。
由于Wait()第一次恢复时,C.L并没有加锁,所以当Wait返回时,调用者通常并不能假设条件为真。
取而代之的是, 调用者应该在循环中调用Wait。(简单来说,只要想使用condition,就必须加锁。)

c.L.Lock()
for !condition() {
    c.Wait()
}
... make use of condition ...
c.L.Unlock()

3.2NewCond函数

新建一个Cond条件变量。

func NewCond(l Locker) *Cond

4.例子

4.1例子

package main

import (
	"fmt"
	"sync"
	"time"
)

var sharedRsc = false

func main() {
	var wg sync.WaitGroup
	wg.Add(2)
	m := sync.Mutex{}
	c := sync.NewCond(&m)
	go func() {
		// this go routine wait for changes to the sharedRsc
		c.L.Lock()
		for sharedRsc == false {
			fmt.Println("goroutine1 wait")
			c.Wait()
		}
		fmt.Println("goroutine1", sharedRsc)
		c.L.Unlock()
		wg.Done()
	}()

	go func() {
		// this go routine wait for changes to the sharedRsc
		c.L.Lock()
		for sharedRsc == false {
			fmt.Println("goroutine2 wait")
			c.Wait()
		}
		fmt.Println("goroutine2", sharedRsc)
		c.L.Unlock()
		wg.Done()
	}()

	// this one writes changes to sharedRsc
	time.Sleep(2 * time.Second)
	c.L.Lock()
	fmt.Println("main goroutine ready")
	sharedRsc = true
	c.Broadcast()
	fmt.Println("main goroutine broadcast")
	c.L.Unlock()
	wg.Wait()
}

执行结果如下:

/*
		goroutine1 wait
		goroutine2 wait
		main goroutine ready
		main goroutine broadcast
		goroutine1 true
		goroutine2 true
	*/
	/*
		goroutine2 wait
		goroutine1 wait
		main goroutine ready
		main goroutine broadcast
		goroutine2 true
		goroutine1 true
	*/
	/*
		goroutine1 wait
		goroutine2 wait
		main goroutine ready
		main goroutine broadcast
		goroutine2 true
		goroutine1 true
	*/

goroutine1和goroutine2进入Wait状态,在main goroutine在2s后资源满足,发出broadcast信号后,从Wait中恢复并判断条件是否确实已经满足(sharedRsc不为空),满足则消费条件,并解锁、wg.Done()。

4.2修改1

我们做个修改,删除main goroutine中的2s延时。
代码就不贴了。
执行结果如下。

/*
main goroutine ready
main goroutine broadcast
goroutine2 true
goroutine1 true
*/
/*
main goroutine ready
main goroutine broadcast
goroutine1 true         
goroutine2 true     
*/

很有意思,两个goroutine都没有进入Wait状态。
原因是,main goroutine执行的更快,在goroutine1/goroutine2加锁之前就已经获得了锁,并完成了修改sharedRsc、发出Broadcast信号。
当子goroutine调用Wait之前检验condition时,条件已经满足,因此就没有必要再去调用Wait了。

4.2修改2

如果我们在子goroutine中不做校验呢?
我们会得到1个死锁。

main goroutine ready
main goroutine broadcast
goroutine2 wait
goroutine1 true
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x414028, 0x19)
	/usr/local/go/src/runtime/sema.go:56 +0x40
sync.(*WaitGroup).Wait(0x414020, 0x40c108)
	/usr/local/go/src/sync/waitgroup.go:130 +0x60
main.main()
	/tmp/sandbox947808816/prog.go:44 +0x2c0

goroutine 6 [sync.Cond.Wait]:
runtime.goparkunlock(...)
	/usr/local/go/src/runtime/proc.go:307
sync.runtime_notifyListWait(0x43e268, 0x0)
	/usr/local/go/src/runtime/sema.go:510 +0x120
sync.(*Cond).Wait(0x43e260, 0x40c108)
	/usr/local/go/src/sync/cond.go:56 +0xe0
main.main.func2(0x43e260, 0x414020)
	/tmp/sandbox947808816/prog.go:31 +0xc0
created by main.main
	/tmp/sandbox947808816/prog.go:27 +0x140

为什么呢?
main goroutine(goroutine 1)先执行,并停留在 wg.Wait()中,等待子goroutine的wg.Done();而子goroutine(goroutine 6)没有判断条件直接调用了cond.Wait。
我们知道cond.Wait会释放锁并等待其他goroutine调用Broadcast或者Signal来通知其恢复执行,除此之外没有其他的恢复途径。但此时main goroutine已经调用了Broadcast并进入了等待状态,没有任何goroutine会去拯救还在cond.Wait中的子goroutine了,而该子goroutine也没有机会调用wg.Done()去恢复main goroutine,造成了死锁。

5.一个真实的例子

我们来看看k8s中使用Cond实现的FIFO,它是如何处理条件的消费的。

func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
			// When Close() is called, the f.closed is set and the condition is broadcasted.
			// Which causes this loop to continue and return from the Pop().
			if f.IsClosed() {
				return nil, FIFOClosedError
			}

			f.cond.Wait()
		}
		id := f.queue[0]
    f.queue = f.queue[1:]
    ...
	}
}

func NewFIFO(keyFunc KeyFunc) *FIFO {
	f := &FIFO{
		items:   map[string]interface{}{},
		queue:   []string{},
		keyFunc: keyFunc,
	}
	f.cond.L = &f.lock
	return f
}

Cond共用了FIFO的lock,在Pop时,会先加锁 f.lock.Lock(),而在f.cond.Wait()前,会先检查len(f.queue)是否为0,防止2种情况:
1.如上面的例子3,条件已经满足,不需要wait
2.唤醒时满足,但被其他goroutine捷足先登,阻塞在f.lock的加锁中;当获得了锁,加锁成功以后,f.queue已经被消费为空,直接访问f.queue[0]会访问越界。


版权声明:本文为weixin_52690231原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。