Sync包源码解析(Map)
版本
- go version 1.10.1
var m sync.mapvalue,ok:=m.Load(key)m.Store(key,value)value,loaded,ok := m.LoadOrStore(key,value)m.Delete(key)m.Range(func(key,value) bool)
数据结构
/*
package: sync
file: map.go
line: 27
*/
type Map struct {
mu Mutex
read atomic.Value
dirty map[interface{}]*entry
misses int
}/*
package: sync
file: map.go
line: 27
*/
type readOnly struct {
mmap[interface{}]*entry
amended bool
}/*
package: sync
file: map.go
line: 73
*/type entry struct {
p unsafe.Pointer
}/*
package: sync
file: map.go
line: 70
*/
var expunged = unsafe.Pointer(new(interface{}))
- mu: Mutex锁,在对dirty进行操作的时候,需要上锁
- read: atomic.Value对象,对readOnly对象进行原子读取和存储,对map进行操作时,优先操作read中的数据。
- readOnly:维护一个只读的map和一个判断dirty是否有新值的字段。
- dirty:保存最新数据的map,当从read中多次读取不到数据时,会将dirty提升成read。
- misses:从read读取不到数据的次数,当dirty提升为read的时候置零。
- entry:指针p保存传入value的地址
- expunged:随机地址,当entry从dirty中删除的时候,将entry的p指向expunged
/*
package: sync
file: map.go
line: 95
*/
func newEntry(i interface{}) *entry {
return &entry{p: unsafe.Pointer(&i)}
}
newEntry传入一个interface对象,返回保存了interface对象指针的entry指针。
Load
/*
package: sync
file: map.go
line: 102
*/
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly) //step 1
e, ok := read.m[key]
if !ok && read.amended { //step 2
m.mu.Lock()
read, _ = m.read.Load().(readOnly) //step 3
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key] //step 4
m.missLocked()
}
m.mu.Unlock()
}
if !ok { //step 5
return nil, false
}
return e.load() //step 6
}
- 从read中读取readOnly,并尝试从readOnly的map中寻找key对应的entry。
- 若在readOnly中不存在这个key,但是dirty中存在新的entry(不一定是key对应的entry,只是表示dirty中存在read中不存在的entry),则获取Mutex锁。
- 双检查,重复step1和step2。若获得锁之前key在dirty中存在,并且dirty被提升read,则这个时候key在read中存在,读取read中的entry并解锁。
- 若在read中还是不存在,则直接从dirty中读取entry,并且将miss数加1。若miss数大于等于dirty中的数据,则将dirty提升为read,将readOnly中的amended设为false,dirty设为nil,miss置零,然后解锁。
/*
package: sync
file: map.go
line: 343
*/
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
- 若key在dirty和read中都不存在,则返回空值和false。
- 从entry中读取存储的数据,若entry的p为空或者被标记为expunged,则表示该值已经被删除,则返回nil和false,否则返回对应的值和true。
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
Store
/*
package: sync
file: map.go
line: 136
*/
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)//step 1
if e, ok := read.m[key];
ok && e.tryStore(&value) {
return
}m.mu.Lock()//step 2
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key];
ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key];
ok {//step 3
e.storeLocked(&value)
} else {//step 4
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
- 从read中读取readOnly,并尝试从readOnly的map中寻找key对应的entry,若存在则尝试直接将entry的p指向value。
func (e *entry) tryStore(i *interface{}) bool { p := atomic.LoadPointer(&e.p)//step a if p == expunged { return false } for { //step b if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { return true } p = atomic.LoadPointer(&e.p) if p == expunged { return false } } }
a. 判断p是否被标记为expunged,若是则表示key已经在dirty中删除,不能直接在read中存储。
b. 循环尝试进行CAS操作将当前Value的值存储到entry中,除非key在dirty中被删除。
- 上锁然后进行双检查,若dirty被提升为read,并且key在新的read中存在,则将新的值存入entry中。中间会判断p是否被标记为expunged,若是则将key存到dirty中。
- 若在dirty中存在,则直接将值存到entry中。
- 若在dirty中也不存在,则创建新的entry存到dirty中。并且将read的amended设为ture,表示dirty中有新的数据。若dirty为nil,会将read的值赋值给新的dirty。
/* package: sync file: map.go line: 353 */ func (m *Map) dirtyLocked() { if m.dirty != nil { //step a return }read, _ := m.read.Load().(readOnly)//step b m.dirty = make(map[interface{}]*entry, len(read.m)) for k, e := range read.m { if !e.tryExpungeLocked() { //step c m.dirty[k] = e } } }/* package: sync file: map.go line: 367 */ func (e *entry) tryExpungeLocked() (isExpunged bool) { p := atomic.LoadPointer(&e.p) for p == nil { if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { return true } p = atomic.LoadPointer(&e.p) } return p == expunged }
a. 若dirty不为nil,则直接返回。
b. 读出read,创建长度同read的map赋值给dirty。
c. 循环遍历read,若read中的entry的值为nil,则将p标记为expunged,若不为nil,则将entry赋值给dirty。因此,p只有在dirty在重新创建的时候才会被标记为expunged,并且对应的key不会在dirty中出现。所以,在step2的时候,若entry的p被标记为expunged了,那么dirty一定不为nil,并且key在dirty中不存在,需要将key加入到dirty中。
/*
package: sync
file: map.go
line: 203
*/
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key];
ok {
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key];
ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key];
ok {
actual, loaded, _ = e.tryLoadOrStore(value)
m.missLocked()
} else {
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()return actual, loaded
}/*
package: sync
file: map.go
line: 243
*/
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
p := atomic.LoadPointer(&e.p) //step 1
if p == expunged {
return nil, false, false
}
if p != nil { //step 2
return *(*interface{})(p), true, true
}ic := i
for { //step 3
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
}
}
基本逻辑同Load方法,在read中找对应的entry,若存在则尝试读并写入,若不存在则上锁后依次在read、dirty中进行读取和写入。唯一区别则是若key在read中不存在,而在dirty中存在,会像Store方法一样,进行miss数增加和判断是否需要提升dirty的操作。
而在entry的tryLoadOrStore方法中
- 判断key是否被删除,若是则返回nil
- 若不为nil,则返回对应的值
- 若为nil,则循环尝试原子CAS操作将新的值存到entry中和进行step 1、step 2的判断
/*
package: sync
file: map.go
line: 271
*/
func (m *Map) Delete(key interface{}) {
read, _ := m.read.Load().(readOnly) //step 1
e, ok := read.m[key]
if !ok && read.amended {//step 2
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok { //step 3
e.delete()
}
}/*
package: sync
file: map.go
line: 288
*/
func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}
- 从read中读取readOnly,并尝试从readOnly的map中寻找key对应的entry。
- 若在read中不存在,并且dirty中有新的数据,则上锁并进行双检查,若dirty中依然有新的数据,则直接调用delete方法删除dirty中的对应的key,无论在dirty中是否存在这个key。
- 若entry存在,则将entry中的p标记为nil。
/*
package: sync
file: map.go
line: 310
*/
func (m *Map) Range(f func(key, value interface{}) bool) {
read, _ := m.read.Load().(readOnly) //step 1
if read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}for k, e := range read.m { //step 2
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
- 从read中读取readOnly,若dirty中有新数据,则立即提升dirty为read。
- 使用原生的range方法遍历read,若数据不为nil,则调用回调方法传入key和value,若回调方法返回false,则跳出循环。
- sync.map通过使用一个只读的read和一只写的dirty来保证读写时候的并发效率,只有在对read中不存在的key进行读写的时候,才会去上锁写dirty。
- read和dirty通过使用entry进行数据共享,若一个key在read中存在,则只修改read中这个key对应的entry的值,就可以保证在dirty中数据同步。而不需要对map进行修改,保证了不会对read造成并发读写异常。比如delete操作,只是将read中的entry的p设置为nil,而不是将这个key删除。
- 在读取操作的时候,若key在read中多次不存在,而在dirty中有新数据(不管在dirty中是否存在),则会将dirty提升为read,保证读取的命中率,减少上锁次数。
- 在写操作的时候,若read不存在这个key,才会对dirty进行写,并且若dirty为空,会从read中复制不为空的数据。
- 【Sync包源码解析(Map)】sync.map适合在对大量重复key的读写操作的场景下使用,这个时候约等于都在对read进行读写,不需要上锁,效率非常高。但是不适合在对大量新增key的读写场景下使用,这个时候大部分操作都是在对dirty进行操作,几乎都需要上锁,而且由于大量miss和大量新增key的写,导致频繁的dirty提升和dirty复制,效率反而可能低于读写锁。
推荐阅读
- 喂,你结婚我给你随了个红包
- CET4听力微技能一
- 放下心中的偶像包袱吧
- 社保代缴公司服务费包含哪些
- Beego打包部署到Linux
- 世界之大,包罗万象--|世界之大,包罗万象-- 读《我不过低配的人生》
- 用npm发布一个包的教程并编写一个vue的插件发布
- CocoaAsyncSocket|CocoaAsyncSocket (GCDAsyncSocket)适配IPv6
- 积极探索|积极探索 绽放生命 ???——心心相印计划:青少年心理工作研讨小组全国大型公益行动第二次活动包头市青山区分校圆满成功
- 那个喝大了的女人在群里发了一晚上的红包