做网站注册什么公司好,徐州网站建设网站制作,宁波有没有开发网站的公司,洞口建设局网站这章是我并发系列中最后的一章。这章主要讲的是锁。但是也会讲上一章channl遗留下的一些没有讲到的内容。select关键字的用法#xff0c;以及错误的一些channl用法。废话不多说。。。 文章目录 select多路复用通道错误示例并发安全和锁问题描述互斥锁读写互斥锁 syncsync.Wait…这章是我并发系列中最后的一章。这章主要讲的是锁。但是也会讲上一章channl遗留下的一些没有讲到的内容。select关键字的用法以及错误的一些channl用法。废话不多说。。。 文章目录 select多路复用通道错误示例并发安全和锁问题描述互斥锁读写互斥锁 syncsync.WaitGroup加载配置文件示例并发安全的单例模式 sync.Map 原子操作读取操作写入操作修改操作交换操作比较并交换操作 select多路复用
使用场景需要同时从多个通道接收数据
通道在接收数据时如果没有数据可以被接收那么当前 goroutine 将会发生阻塞。
当然办法不是没有。遍历呗。
for{// 尝试从ch1接收值data, ok : -ch1// 尝试从ch2接收值data, ok : -ch2…
}这种方式虽然可以实现从多个通道接收值的需求但是程序的运行性能会差很多。
Go 语言内置了select关键字使用它可以同时响应多个通道的操作。
Select 的使用方式类似于之前学到的 switch 语句它也有一系列 case 分支和一个默认的分支。
每个 case 分支会对应一个通道的通信接收或发送过程。select 会一直等待直到其中的某个case的通信操作完成时就会执行该 case 分支对应的语句
select {
case -ch1://...
case data : -ch2://...
case ch3 - 10://...
default://默认操作
}Select 语句具有以下特点:
可处理一个或多个 channel 的发送/接收操作。如果多个 case 同时满足select 会随机选择一个执行。对于没有case的 select 会一直阻塞可用于阻塞 main 函数防止退出。
package mainimport fmtfunc main() {ch : make(chan int, 1)for i : 1; i 10; i {select {case x : -ch:fmt.Println(x)case ch - i:}}
}代码首先是创建了一个缓冲区大小为1的通道 ch进入 for 循环后
第一次循环时 i 1select 语句中包含两个 case 分支此时由于通道中没有值可以接收所以x : -ch 这个 case 分支不满足而ch - i这个分支可以执行会把1发送到通道中结束本次 for 循环第二次 for 循环时i 2由于通道缓冲区已满所以ch - i这个分支不满足而x : -ch这个分支可以执行从通道接收值1并赋值给变量 x 所以会在终端打印出 1后续的 for 循环以此类推会依次打印出3、5、7、9
简单而言就是当i为偶数的时候执行的通道里输出。
通道错误示例
示例1
// demo1 通道误用导致的bug
func demo1() {wg : sync.WaitGroup{}ch : make(chan int, 10)for i : 0; i 10; i {ch - i}close(ch)wg.Add(3)for j : 0; j 3; j {go func() {for {task : -ch// 这里假设对接收的数据执行某些操作fmt.Println(task)}wg.Done()}()}wg.Wait()
}匿名函数所在的 goroutine 并不会按照预期在通道被关闭后退出。
因为task : - ch的接收操作在通道被关闭后会一直接收到零值而不会退出。此处的接收操作应该使用task, ok : - ch 通过判断布尔值ok为假时退出或者使用select 来处理通道。 修改后
for j : 0; j 3; j {go func() {for {task, ok : -chfmt.Println(task)if !ok {break}}wait.Done()}()}wait.Wait()其实不需要嵌套外循环的。不过为了方便观看就这样也了。
// demo2 通道误用导致的bug
func demo2() {ch : make(chan string)go func() {// 这里假设执行一些耗时的操作time.Sleep(3 * time.Second)ch - job result}()select {case result : -ch:fmt.Println(result)case -time.After(time.Second): // 设置的超时时间return}
}分析代码可以知道此时有两个goroutine 主方法走select而另一个 goroutine 会走给通道输入值的操作。此时就有一个问题。从协程goroutine会等待三秒而主协程指挥等待一秒然后按照超时操作弹出。
而这种问题的存在不是因为我们没有达到想要的结果而是可能导致 goroutine 泄露goroutine 并未按预期退出并销毁
由于 select 命中了超时逻辑导致通道没有消费者无接收操作而其定义的通道为无缓冲通道因此 goroutine 中的ch - job result操作会一直阻塞最终导致 goroutine 泄露。 上一章漏下的内容讲完 并发安全和锁
场景可能会存在多个 goroutine 同时操作一个资源临界区的情况这种情况下就会发生竞态问题数据竞态
问题描述
package mainimport (fmtsync
)var (x int64wg sync.WaitGroup // 等待组
)// add 对全局变量x执行5000次加1操作
func add() {for i : 0; i 5000; i {x x 1}wg.Done()
}func main() {wg.Add(2)go add()go add()wg.Wait()fmt.Println(x)
}每次执行都会生成不同的结果 原因: 我们开启了两个 goroutine 分别执行 add 函数这两个 goroutine 在访问和修改全局的x变量时就会存在数据竞争某个 goroutine 中对全局变量x的修改可能会覆盖掉另一个 goroutine 中的操作所以导致最后的结果与预期不符。
互斥锁
互斥锁是一种常用的控制共享资源访问的方法它能够保证同一时间只有一个 goroutine 可以访问共享资源。Go 语言中使用sync包中提供的Mutex类型来实现互斥锁。
sync.Mutex提供了两个方法:
方法名功能func (m *Mutex) Lock()获取互斥锁func (m *Mutex) Unlock()释放互斥锁
通过锁修改
package mainimport (fmtsync
)// sync.Mutexvar (x int64wg sync.WaitGroup // 等待组m sync.Mutex // 互斥锁
)// add 对全局变量x执行5000次加1操作
func add() {for i : 0; i 5000; i {m.Lock() // 修改x前加锁x x 1m.Unlock() // 改完解锁}wg.Done()
}func main() {wg.Add(2)go add()go add()wg.Wait()fmt.Println(x)
}此时就会达到我们的预期结果。
使用互斥锁能够保证同一时间有且只有一个 goroutine 进入临界区其他的 goroutine 则在等待锁当互斥锁释放后等待的 goroutine 才可以获取锁进入临界区多个 goroutine 同时等待一个锁时唤醒的策略是随机的。
读写互斥锁
互斥锁是完全互斥的但是实际上有很多场景是读多写少的。
当我们并发的去读取一个资源而不涉及资源修改的时候是没有必要加互斥锁的这种场景下使用读写锁是更好的一种选择。
读写锁在 Go 语言中使用sync包中的RWMutex类型。
方法名功能func (rw *RWMutex) Lock()获取写锁func (rw *RWMutex) Unlock()释放写锁func (rw *RWMutex) RLock()获取读锁func (rw *RWMutex) RUnlock()释放读锁func (rw *RWMutex) RLocker() Locker返回一个实现Locker接口的读写锁
读写锁分为两种读锁和写锁
当一个 goroutine 获取到读锁之后其他的 goroutine 如果是获取读锁会继续获得锁如果是获取写锁就会等待。当一个 goroutine 获取写锁之后其他的 goroutine 无论是获取读锁还是写锁都会等待。
package mainimport (fmtsynctime
)var (x int64wg sync.WaitGroupmutex sync.MutexrwMutex sync.RWMutex
)// writeWithLock 使用互斥锁的写操作
func writeWithLock() {mutex.Lock() // 加互斥锁x x 1time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒mutex.Unlock() // 解互斥锁wg.Done()
}// readWithLock 使用互斥锁的读操作
func readWithLock() {mutex.Lock() // 加互斥锁time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒mutex.Unlock() // 释放互斥锁wg.Done()
}// writeWithLock 使用读写互斥锁的写操作
func writeWithRWLock() {rwMutex.Lock() // 加写锁x x 1time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒rwMutex.Unlock() // 释放写锁wg.Done()
}// readWithRWLock 使用读写互斥锁的读操作
func readWithRWLock() {rwMutex.RLock() // 加读锁time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒rwMutex.RUnlock() // 释放读锁wg.Done()
}func do(wf, rf func(), wc, rc int) {start : time.Now()// wc个并发写操作for i : 0; i wc; i {wg.Add(1)go wf()}// rc个并发读操作for i : 0; i rc; i {wg.Add(1)go rf()}wg.Wait()cost : time.Since(start)fmt.Printf(x:%v cost:%v\n, x, cost)}
func main() {// 使用互斥锁10并发写1000并发读do(writeWithLock, readWithLock, 10, 1000) // x:10 cost:1.466500951s// 使用读写互斥锁10并发写1000并发读do(writeWithRWLock, readWithRWLock, 10, 1000) // x:10 cost:117.207592ms
}
从最终的执行结果可以看出使用读写互斥锁在读多写少的场景下能够极大地提高程序的性能。
不过需要注意的是如果一个程序中的读操作和写操作数量级差别不大那么读写互斥锁的优势就发挥不出来。
有一点要注意在在这个实验中要明确一个已经知道的属性那就是读操作一定比写操作快。
在并发的时候说了GO本体的标准库有个一个专门为并发实现的包sync。
sync
sync.WaitGroup
在代码中生硬的使用time.Sleep肯定是不合适的Go语言中可以使用sync.WaitGroup来实现并发任务的同步。在这里插入代码片
方法名功能func (wg * WaitGroup) Add(delta int)计数器 deltafunc (wg *WaitGroup) Done()计数器 -1 (这个要搭配defer使用)func (wg *WaitGroup) Wait()阻塞直到计数器变为 0
sync.WaitGroup内部维护着一个计数器计数器的值可以增加和减少。
当我们启动了 N 个并发任务时就将计数器值增加N。每个任务完成时通过调用 Done 方法将计数器减1。通过调用 Wait 来等待并发任务执行完当计数器值为 0 时表示所有并发任务已经完成。几乎同时人类看来输出结果
需要注意:sync.WaitGroup是一个结构体进行参数传递的时候要传递指针。## sync.Once
var wg sync.WaitGroupfunc hello() {defer wg.Done()fmt.Println(Hello Goroutine!)
}
func main() {wg.Add(1)go hello() // 启动另外一个goroutine去执行hello函数fmt.Println(main goroutine done!)wg.Wait()
}在某些场景下我们需要确保某些操作即使在高并发的场景下也只会被执行一次例如只加载一次配置文件等。
Go语言中的sync包中提供了一个针对只执行一次场景的解决方案——sync.Oncesync.Once只有一个Do方法
func (o *Once) Do(f func())注意如果要执行的函数f需要传递参数就需要搭配闭包来使用。
加载配置文件示例
延迟一个开销很大的初始化操作到真正用到它的时候再执行是一个很好的实践。
因为预先初始化一个变量比如在init函数中完成初始化会增加程序的启动耗时而且有可能实际执行过程中这个变量没有用上那么这个初始化操作就不是必须要做的。
var icons map[string]image.Imagefunc loadIcons() {icons map[string]image.Image{left: loadIcon(left.png),up: loadIcon(up.png),right: loadIcon(right.png),down: loadIcon(down.png),}
}func loadIcon(s string) image.Image {return nil
}// Icon 被多个goroutine调用时不是并发安全的
func Icon(name string) image.Image {if icons nil {loadIcons()}return icons[name]
}多个 goroutine 并发调用Icon函数时不是并发安全的现代的编译器和CPU可能会在保证每个 goroutine 都满足串行一致的基础上自由地重排访问内存的顺序。(指令重排序)
loadIcons函数可能会被重排为以下结果
func loadIcons() {icons make(map[string]image.Image)icons[left] loadIcon(left.png)icons[up] loadIcon(up.png)icons[right] loadIcon(right.png)icons[down] loadIcon(down.png)
}千万别看这个顺序和前面定义的一样。那是每个goroutine的出来的结果。并不一个得出的。
在这种情况下就会出现即使判断了icons不是nil也不意味着变量初始化完成了。考虑到这种情况我们能想到的办法就是添加互斥锁保证初始化icons的时候不会被其他的 goroutine 操作但是这样做又会引发性能问题。
所以此时就考虑sync.Once
import (imagesync
)var icons map[string]image.Imagevar loadIconsOnce sync.Oncefunc loadIcons() {icons map[string]image.Image{left: loadIcon(left.png),up: loadIcon(up.png),right: loadIcon(right.png),down: loadIcon(down.png),}
}func loadIcon(s string) image.Image {return nil
}// Icon 是并发安全的
func Icon(name string) image.Image {loadIconsOnce.Do(loadIcons)return icons[name]
}
func main() {}并发安全的单例模式
package singletonimport (sync
)type singleton struct {}var instance *singleton
var once sync.Oncefunc GetInstance() *singleton {once.Do(func() {instance singleton{}})return instance
}sync.Once其实内部包含一个互斥锁和一个布尔值互斥锁保证布尔值和数据的安全而布尔值用来记录初始化是否完成。
这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。
sync.Map
sync.Map
Go 语言中内置的 map 不是并发安全的. 错误例子
package mainimport (fmtstrconvsync
)var m make(map[string]int)func get(key string) int {return m[key]
}func set(key string, value int) {m[key] value
}func main() {wg : sync.WaitGroup{}for i : 0; i 10; i {wg.Add(1)go func(n int) {key : strconv.Itoa(n)set(key, n)fmt.Printf(k:%v,v:%v\n, key, get(key))wg.Done()}(i)}wg.Wait()
}将上面的代码编译后执行会报出fatal error: concurrent map writes错误。我们不能在多个 goroutine 中并发对内置的 map 进行读写操作否则会存在数据竞争问题。其实大家自己运行一下就知道了。其实不一定会出现这个错误但是有概率出现。所以能加锁就枷锁。
这种场景下就需要为map加锁来保证并发的安全性了Go语言的sync包中提供了一个开箱即用的并发安全版 map——sync.Map
开箱即用表示其不用像内置的 map 一样使用 make 函数初始化就能直接使用。
方法名功能func (m *Map) Store(key, value interface{})存储key-value数据func (m *Map) Load(key interface{}) (value interface{}, ok bool)查询key对应的valuefunc (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)查询或存储key对应的valuefunc (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool)查询并删除keyfunc (m *Map) Delete(key interface{})删除keyfunc (m *Map) Range(f func(key, value interface{}) bool)对map中的每个key-value依次调用f
package mainimport (fmtstrconvsync
)// 并发安全的map
var m sync.Map{}func main() {wg : sync.WaitGroup{}// 对m执行20个并发的读写操作for i : 0; i 20; i {wg.Add(1)go func(n int) {key : strconv.Itoa(n)m.Store(key, n) // 存储key-valuevalue, _ : m.Load(key) // 根据key取值fmt.Printf(k:%v,v:%v\n, key, value)wg.Done()}(i)}wg.Wait()
}此时就安全了
说到枷锁操作就不得不说一个东西。原子性操作
原子操作
针对整数数据类型int32、uint32、int64、uint64我们还可以使用原子操作来保证并发安全通常直接使用原子操作比使用锁操作效率更高。
Go语言中原子操作由内置的标准库sync/atomic提供。(具体需要的话可以去看相关文档)
读取操作
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)写入操作
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)修改操作
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)交换操作
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)比较并交换操作
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)atomic包提供了底层的原子级内存操作对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。
除了某些特殊的底层应用使用通道或者 sync 包的函数/类型实现同步更好。
例子
package mainimport (fmtsyncsync/atomictime
)type Counter interface {Inc()Load() int64
}// 普通版
type CommonCounter struct {counter int64
}func (c CommonCounter) Inc() {c.counter
}func (c CommonCounter) Load() int64 {return c.counter
}// 互斥锁版
type MutexCounter struct {counter int64lock sync.Mutex
}func (m *MutexCounter) Inc() {m.lock.Lock()defer m.lock.Unlock()m.counter
}func (m *MutexCounter) Load() int64 {m.lock.Lock()defer m.lock.Unlock()return m.counter
}// 原子操作版
type AtomicCounter struct {counter int64
}func (a *AtomicCounter) Inc() {atomic.AddInt64(a.counter, 1)
}func (a *AtomicCounter) Load() int64 {return atomic.LoadInt64(a.counter)
}func test(c Counter) {var wg sync.WaitGroupstart : time.Now()for i : 0; i 1000; i {wg.Add(1)go func() {c.Inc()wg.Done()}()}wg.Wait()end : time.Now()fmt.Println(c.Load(), end.Sub(start))
}func main() {c1 : CommonCounter{} // 非并发安全test(c1)c2 : MutexCounter{} // 使用互斥锁实现并发安全test(c2)c3 : AtomicCounter{} // 并发安全且比互斥锁效率更高test(c3)
}