189 8069 5689

Go36-27,28-条件变量

条件变量

条件变量(conditional variable),和互斥锁一样,也是一个同步工具。我们常常会把条件变量与互斥锁一起讨论。实际上,条件变量是基于互斥锁的,它必须有互斥锁的支撑才能发挥作用。

黄冈ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为创新互联的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:028-86922220(备注:SSL证书合作)期待与您的合作!

作用

条件变量并不是被用来保护临界区和共享资源的,它是用于协调想要访问共享资源的那些线程的。当共享资源的状态发生变化时,它可以被用来通知被互斥锁阻塞的线程。
使用条件变量的最大优势就是在效率方面的提升。当共享资源的状态不满足条件的时候,想操作它的线程再也不用循环往复的做检查了,只要等待通知就好了。

使用条件变量

条件变量需要与互斥锁配合使用。条件变量的初始化需要互斥锁,并且它的方法有的也是基于互斥锁的。
条件变量提供的方法有三个:

  • 等待通知(wait)
  • 单发通知(signal)
  • 广播通知(broadcast)

在利用条件变量等待通知的时候,需要在它基于的那个互斥锁的保护下进行。
在进行单发通知或光爆通知的时候,需要在对应的互斥锁解锁之后再做操作。

创建条件变量
结合代码理解上面的含义,先创建几个变量:

var lock sync.RWMutex
sendCond := sync.NewCond(&lock)
recvCond := sync.NewCond(lock.RLocker())

条件变量的类型
lock是一个读写锁,基于这把锁,创建了2个代表条件变量的变量,这两个变量的类型是*sync.Cond,是由sync.NewCond函数来初始化的。

初始化
与互斥锁锁不同,这里不是开箱即用的,只能使用sync.NewCond函数来创建它的指针值,这个函数需要一个sync.Locker类型的参数。
前面说过,条件变量是基于互斥锁的,它必须有互斥锁的支持才能够起作用。因此,这里的参数是必须的,它也会参与到条件变量的方法实现中去。
sync.Locker接口
sync.Locker其实是一个接口,包含两个方法Lock()和Unlock():

type Locker interface {
    Lock()
    Unlock()
}

sync.Mutex类型sync,RWMutex类型都拥有这两个方法,不过都是指针方法。因此这两个类型的指针类型才是sync.Locker接口的实现类型。

初始化的过程
在为sendCond初始化的时候,把lock变量的指针作为参数。这里lock变量的Lock方法和Unlock方法分别用于对其中写锁的锁定和解锁。这里与实现接口的两个方法的名称是对应的。
在为recvCond初始化的时候,需要的是lock变量的读锁,并且还得是sync.Locker接口类型,就是要实现了Lock和Unlock方法的读锁。可是lock变量中用于读锁的方法却是RLock方法和RUnlock方法,这里名称不对应了。不过有一个RLocker方法可以实现这一需求,下面是源码里实现的部分,很简单:

// RLocker returns a Locker interface that implements
// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.
func (rw *RWMutex) RLocker() Locker {
    return (*rlocker)(rw)
}

type rlocker RWMutex

func (r *rlocker) Lock()   { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }

这里我有一些小疑惑,3个方法里面都是类型断言吧。RLocker方法把原来的读写锁类型转成一个新的类型然后返回。后面的两个方法,为了用新类型调用读写锁类型里的方法,先进行类型断言,转成读写锁原本的类型,然后调用它的方法。

使用条件变量
下面是截取的使用时的部分代码:

lock.Lock()
for !isEmpty {
    sendCond.Wait()
}
isEmpty = false
// 这里可以做写入的操作
lock.Unlock()
recvCond.Signal()

上面是一个写入的流程。之前的代码定义了一个状态变量isEmpty,只有状态为空的时候,才允许写入,写入后把状态设置为非空。
这里要先调用Lock方法,等待通知(wait)是要在互斥锁的保护下进行的。
然后再操作完之后,先调用Unlock方法,再发送通知,发送通知的操作要在互斥锁解锁之后。
这里等待的出sendCond的信号,而最后发送的是recvCond的信号。在另一个读取的流程里则正好相反。利用条件变量可以实现单向的通知,而这里要实现双向的通知,就需要两个条件变量。这是条件变量的基本使用原则。

示例代码

上面把关键的代码分析了一下,下面是完整的示例代码:

package main

import (
    "fmt"
    "sync"
    "time"
    "flag"
)

var useCond bool

func init() {
    flag.BoolVar(&useCond, "cond", false, "是否使用条件变量")
}

type msgBox struct {
    message  string
    isEmpty  bool
    sendCond *sync.Cond
    recvCond *sync.Cond
}

func main() {
    flag.Parse()
    fmt.Println("是否开启了条件变量保护:", useCond)

    var lock sync.RWMutex
    msgBox := msgBox{
        isEmpty:  true,  // 默认值是false,状态初始值应该为true
        sendCond: sync.NewCond(&lock),  // 不是开箱即用的,需要在使用前初始化
        recvCond: sync.NewCond(lock.RLocker()),
    }

    done := make(chan struct{})
    max := 5

    // 写操作的goroutine
    go func(max int) {
        defer func() {
            done <- struct{}{}
        }()
        for i := 0; i < max; i++ {
            time.Sleep(time.Millisecond * 200)
            // 先进行保护
            lock.Lock()
            // 再等待通知
            for useCond && !msgBox.isEmpty {
                msgBox.sendCond.Wait()
            }
            msgBox.isEmpty = false
            msg := fmt.Sprintf("第 %d 条消息", i)
            msgBox.message = msg
            fmt.Printf("发送消息[%d]: %s\n", i, msg)
            // 先解锁
            lock.Unlock()
            // 再发送通知
            msgBox.recvCond.Signal()
        }
    }(max)

    // 读操作的goroutine
    go func(max int) {
        defer func() {
            done <- struct{}{}
        }()
        for j := 0; j < max; j++ {
            time.Sleep(time.Millisecond * 500)
            lock.RLock()
            for useCond && msgBox.isEmpty {
                msgBox.recvCond.Wait()
            }
            msgBox.isEmpty = true
            msg := msgBox.message
            fmt.Printf("接收消息[%d]: %s\n", j, msg)
            lock.RUnlock()
            msgBox.sendCond.Signal()
        }
    }(max)
    <-done
    <-done
    fmt.Println("Over")
}

代码中条件变量的作用
在这个例子里,写的时候要获取到写锁,读的时候要获取到读锁,这个逻辑和之前互斥锁是一样的。但是只是获取到锁还不能做操作,这里还要再做一个限制,所以就用到了条件变量。
在这个例子里,写操作和读操作是需要成对出现的。写完一次之后,依然能获取到写锁,但是不能立刻写。而是要等待读操作把之前写入的数据读过之后,才能再次写入,把之前的内容覆盖掉。读操作也是一样。这里就需要两个goroutine之间传递信号了。
通过命令行参数分别在开启/关闭条件变量的环境下运行,可以看到其中的作用:

go run main.go
go run main.go -cond

Wait方法

条件变量的Wait方法主要做了4件事:

  1. 把调用它的goroutine加入到当前条件变量的通知队列中
  2. 解锁当前的条件变量基于的那个互斥锁
  3. 让当前的goroutine处于等待状态,等到通知来了再决定是否唤醒它。此时阻塞在调用Wait方法的那行代码上
  4. 如果通知来了并且决定唤醒当前goroutine,就在唤醒它之后重新锁定当前条件变量基于的互斥锁

先解锁,在阻塞
在Wait方法里,必须要先解锁,在阻塞当前goroutine。否则就违背了互斥锁要成对出现的原则。并且当前goroutine在解锁千就阻塞的话,当前goroutine就不可能在执行解锁了。即使不考虑原则,让别的goroutine来解锁,又会有重复解锁可能。

使用for语句
并且Wait方法建议是放在一个for循环里的。这里似乎也是可以用if语句的。但是if语句只能检查状态一次,而for的话可以进行多次检查。如果goroutine收到了通知而唤醒,但是此时检查时发现状态还是不对,那么就应该再次调用Wait方法。保险起见,在包裹条件变量的Wait方法总是应该使用for语句。

Signal方法和Broadcast方法

这2个方法都是用来发送通知的。Signal方法的通知只会唤醒一个goroutine,而Broadcast方法的通知会唤醒所有等待的goroutine。Wait方法会把当前的goroutine添加到通知队列的队尾,而Signal方法会从通知队列的队首开始查找可以被唤醒的goroutine。因此Signal方法唤醒的一般是最早等待的那个goroutine。

适用场景
这2个方法的行为决定他们的适用场景。确定只有一个goroutine在等待通知,或者值需要唤醒一个goroutine的时候,就使用Signal方法。否则,使用Broadcast方法总是没错的,Broadcast方法的适用场景更多。

通知的即时性
条件变量的通知具有即时性。如果发送通知的时候没有goroutine在等待,那么该次通知就会被直接丢弃。之后再开始等待的goroutine需要等待之后的通知。

示例代码2

还是前面那个示例,稍微改了改,把读写锁换成了互斥锁,通知方法把Signal换成了Broadcast:

package main

import (
    "fmt"
    "sync"
    "time"
)

var lock sync.Mutex

// 匿名结构体,定义并初始化赋值
// 嵌入式锁(Embedded lock)的场景适合使用匿名结构体
var msgBox = struct {
    message  string
    isEmpty  bool
    sendCond *sync.Cond
    recvCond *sync.Cond
}{
    isEmpty: true,
    sendCond: sync.NewCond(&lock),
    recvCond: sync.NewCond(&lock),
}

// 用于设置消息的函数
func send(id, index int) {
    lock.Lock()
    for !msgBox.isEmpty {
        msgBox.sendCond.Wait()
    }
    msg := fmt.Sprintf("msg: [%d-%d]", id, index)
    msgBox.message = msg
    fmt.Printf("发送消息[%d-%d]: %s\t", id, index, msg)
    msgBox.isEmpty = false
    lock.Unlock()
    msgBox.recvCond.Broadcast()
}

// 用于读取消息的函数
func recv(id, index int) {
    lock.Lock()
    for msgBox.isEmpty {
        msgBox.recvCond.Wait()
    }
    msg := msgBox.message
    msgBox.message = ""
    fmt.Printf("接收消息[%d-%d]: %s\n", id, index, msg)
    msgBox.isEmpty = true
    lock.Unlock()
    msgBox.sendCond.Broadcast()
}

func main() {
    done := make(chan struct{})
    count := 5

    // 启动一个goroutine用于发送
    go func(id, count int) {
        defer func() {
            done <- struct{}{}
        }()
        for i := 0; i < count; i++ {
            time.Sleep(time.Millisecond * 100)
            send(id, i)
        }
    }(0, count * 2)

    // 启动两个goroutine用于接收
    go func(id, count int) {
        defer func() {
            done <- struct{}{}
        }()
        for i := 0; i < count; i++ {
            time.Sleep(time.Millisecond * 300)
            recv(id, i)
        }
    }(1, count)
    go func(id, count int) {
        defer func() {
            done <- struct{}{}
        }()
        for i := 0; i < count; i++ {
            time.Sleep(time.Millisecond * 400)
            recv(id, i)
        }
    }(2, count)

    <- done
    <- done
    <- done
    fmt.Println("Over")
}

网站名称:Go36-27,28-条件变量
网页网址:http://cdxtjz.cn/article/psjidj.html

其他资讯