Sync包源码解析(Map)

版本

  • go version 1.10.1
使用方法
var m sync.mapvalue,ok:=m.Load(key)m.Store(key,value)value,loaded,ok := m.LoadOrStore(key,value)m.Delete(key)m.Range(func(key,value) bool)

数据结构
/* package: sync file: map.go line: 27 */ type Map struct { mu Mutex read atomic.Value dirty map[interface{}]*entry misses int }/* package: sync file: map.go line: 27 */ type readOnly struct { mmap[interface{}]*entry amended bool }/* package: sync file: map.go line: 73 */type entry struct { p unsafe.Pointer }/* package: sync file: map.go line: 70 */ var expunged = unsafe.Pointer(new(interface{}))

  • mu: Mutex锁,在对dirty进行操作的时候,需要上锁
  • read: atomic.Value对象,对readOnly对象进行原子读取和存储,对map进行操作时,优先操作read中的数据。
  • readOnly:维护一个只读的map和一个判断dirty是否有新值的字段。
  • dirty:保存最新数据的map,当从read中多次读取不到数据时,会将dirty提升成read。
  • misses:从read读取不到数据的次数,当dirty提升为read的时候置零。
  • entry:指针p保存传入value的地址
  • expunged:随机地址,当entry从dirty中删除的时候,将entry的p指向expunged
方法 Nwomap
/* package: sync file: map.go line: 95 */ func newEntry(i interface{}) *entry { return &entry{p: unsafe.Pointer(&i)} }

newEntry传入一个interface对象,返回保存了interface对象指针的entry指针。
Load
/* package: sync file: map.go line: 102 */ func (m *Map) Load(key interface{}) (value interface{}, ok bool) { read, _ := m.read.Load().(readOnly) //step 1 e, ok := read.m[key] if !ok && read.amended { //step 2 m.mu.Lock() read, _ = m.read.Load().(readOnly) //step 3 e, ok = read.m[key] if !ok && read.amended { e, ok = m.dirty[key] //step 4 m.missLocked() } m.mu.Unlock() } if !ok { //step 5 return nil, false } return e.load() //step 6 }

  1. 从read中读取readOnly,并尝试从readOnly的map中寻找key对应的entry。
  2. 若在readOnly中不存在这个key,但是dirty中存在新的entry(不一定是key对应的entry,只是表示dirty中存在read中不存在的entry),则获取Mutex锁。
  3. 双检查,重复step1和step2。若获得锁之前key在dirty中存在,并且dirty被提升read,则这个时候key在read中存在,读取read中的entry并解锁。
  4. 若在read中还是不存在,则直接从dirty中读取entry,并且将miss数加1。若miss数大于等于dirty中的数据,则将dirty提升为read,将readOnly中的amended设为false,dirty设为nil,miss置零,然后解锁。
/* package: sync file: map.go line: 343 */ func (m *Map) missLocked() { m.misses++ if m.misses < len(m.dirty) { return } m.read.Store(readOnly{m: m.dirty}) m.dirty = nil m.misses = 0 }

  1. 若key在dirty和read中都不存在,则返回空值和false。
  2. 从entry中读取存储的数据,若entry的p为空或者被标记为expunged,则表示该值已经被删除,则返回nil和false,否则返回对应的值和true。
func (e *entry) load() (value interface{}, ok bool) { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return nil, false } return *(*interface{})(p), true }

Store
/* package: sync file: map.go line: 136 */ func (m *Map) Store(key, value interface{}) { read, _ := m.read.Load().(readOnly)//step 1 if e, ok := read.m[key]; ok && e.tryStore(&value) { return }m.mu.Lock()//step 2 read, _ = m.read.Load().(readOnly) if e, ok := read.m[key]; ok { if e.unexpungeLocked() { m.dirty[key] = e } e.storeLocked(&value) } else if e, ok := m.dirty[key]; ok {//step 3 e.storeLocked(&value) } else {//step 4 if !read.amended { m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended: true}) } m.dirty[key] = newEntry(value) } m.mu.Unlock() }

  1. 从read中读取readOnly,并尝试从readOnly的map中寻找key对应的entry,若存在则尝试直接将entry的p指向value。
    func (e *entry) tryStore(i *interface{}) bool { p := atomic.LoadPointer(&e.p)//step a if p == expunged { return false } for { //step b if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { return true } p = atomic.LoadPointer(&e.p) if p == expunged { return false } } }

    a. 判断p是否被标记为expunged,若是则表示key已经在dirty中删除,不能直接在read中存储。
    b. 循环尝试进行CAS操作将当前Value的值存储到entry中,除非key在dirty中被删除。
  2. 上锁然后进行双检查,若dirty被提升为read,并且key在新的read中存在,则将新的值存入entry中。中间会判断p是否被标记为expunged,若是则将key存到dirty中。
  3. 若在dirty中存在,则直接将值存到entry中。
  4. 若在dirty中也不存在,则创建新的entry存到dirty中。并且将read的amended设为ture,表示dirty中有新的数据。若dirty为nil,会将read的值赋值给新的dirty。
    /* package: sync file: map.go line: 353 */ func (m *Map) dirtyLocked() { if m.dirty != nil { //step a return }read, _ := m.read.Load().(readOnly)//step b m.dirty = make(map[interface{}]*entry, len(read.m)) for k, e := range read.m { if !e.tryExpungeLocked() { //step c m.dirty[k] = e } } }/* package: sync file: map.go line: 367 */ func (e *entry) tryExpungeLocked() (isExpunged bool) { p := atomic.LoadPointer(&e.p) for p == nil { if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { return true } p = atomic.LoadPointer(&e.p) } return p == expunged }

    a. 若dirty不为nil,则直接返回。
    b. 读出read,创建长度同read的map赋值给dirty。
    c. 循环遍历read,若read中的entry的值为nil,则将p标记为expunged,若不为nil,则将entry赋值给dirty。因此,p只有在dirty在重新创建的时候才会被标记为expunged,并且对应的key不会在dirty中出现。所以,在step2的时候,若entry的p被标记为expunged了,那么dirty一定不为nil,并且key在dirty中不存在,需要将key加入到dirty中。
LoadOrStore
/* package: sync file: map.go line: 203 */ func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {read, _ := m.read.Load().(readOnly) if e, ok := read.m[key]; ok { actual, loaded, ok := e.tryLoadOrStore(value) if ok { return actual, loaded } }m.mu.Lock() read, _ = m.read.Load().(readOnly) if e, ok := read.m[key]; ok { if e.unexpungeLocked() { m.dirty[key] = e } actual, loaded, _ = e.tryLoadOrStore(value) } else if e, ok := m.dirty[key]; ok { actual, loaded, _ = e.tryLoadOrStore(value) m.missLocked() } else { if !read.amended { m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended: true}) } m.dirty[key] = newEntry(value) actual, loaded = value, false } m.mu.Unlock()return actual, loaded }/* package: sync file: map.go line: 243 */ func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) { p := atomic.LoadPointer(&e.p) //step 1 if p == expunged { return nil, false, false } if p != nil { //step 2 return *(*interface{})(p), true, true }ic := i for { //step 3 if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) { return i, false, true } p = atomic.LoadPointer(&e.p) if p == expunged { return nil, false, false } if p != nil { return *(*interface{})(p), true, true } } }

基本逻辑同Load方法,在read中找对应的entry,若存在则尝试读并写入,若不存在则上锁后依次在read、dirty中进行读取和写入。唯一区别则是若key在read中不存在,而在dirty中存在,会像Store方法一样,进行miss数增加和判断是否需要提升dirty的操作。
而在entry的tryLoadOrStore方法中
  1. 判断key是否被删除,若是则返回nil
  2. 若不为nil,则返回对应的值
  3. 若为nil,则循环尝试原子CAS操作将新的值存到entry中和进行step 1、step 2的判断
Delete
/* package: sync file: map.go line: 271 */ func (m *Map) Delete(key interface{}) { read, _ := m.read.Load().(readOnly) //step 1 e, ok := read.m[key] if !ok && read.amended {//step 2 m.mu.Lock() read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { delete(m.dirty, key) } m.mu.Unlock() } if ok { //step 3 e.delete() } }/* package: sync file: map.go line: 288 */ func (e *entry) delete() (hadValue bool) { for { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return false } if atomic.CompareAndSwapPointer(&e.p, p, nil) { return true } } }

  1. 从read中读取readOnly,并尝试从readOnly的map中寻找key对应的entry。
  2. 若在read中不存在,并且dirty中有新的数据,则上锁并进行双检查,若dirty中依然有新的数据,则直接调用delete方法删除dirty中的对应的key,无论在dirty中是否存在这个key。
  3. 若entry存在,则将entry中的p标记为nil。
Range
/* package: sync file: map.go line: 310 */ func (m *Map) Range(f func(key, value interface{}) bool) { read, _ := m.read.Load().(readOnly) //step 1 if read.amended { m.mu.Lock() read, _ = m.read.Load().(readOnly) if read.amended { read = readOnly{m: m.dirty} m.read.Store(read) m.dirty = nil m.misses = 0 } m.mu.Unlock() }for k, e := range read.m { //step 2 v, ok := e.load() if !ok { continue } if !f(k, v) { break } } }

  1. 从read中读取readOnly,若dirty中有新数据,则立即提升dirty为read。
  2. 使用原生的range方法遍历read,若数据不为nil,则调用回调方法传入key和value,若回调方法返回false,则跳出循环。
总结
  • sync.map通过使用一个只读的read和一只写的dirty来保证读写时候的并发效率,只有在对read中不存在的key进行读写的时候,才会去上锁写dirty。
  • read和dirty通过使用entry进行数据共享,若一个key在read中存在,则只修改read中这个key对应的entry的值,就可以保证在dirty中数据同步。而不需要对map进行修改,保证了不会对read造成并发读写异常。比如delete操作,只是将read中的entry的p设置为nil,而不是将这个key删除。
  • 在读取操作的时候,若key在read中多次不存在,而在dirty中有新数据(不管在dirty中是否存在),则会将dirty提升为read,保证读取的命中率,减少上锁次数。
  • 在写操作的时候,若read不存在这个key,才会对dirty进行写,并且若dirty为空,会从read中复制不为空的数据。
  • 【Sync包源码解析(Map)】sync.map适合在对大量重复key的读写操作的场景下使用,这个时候约等于都在对read进行读写,不需要上锁,效率非常高。但是不适合在对大量新增key的读写场景下使用,这个时候大部分操作都是在对dirty进行操作,几乎都需要上锁,而且由于大量miss和大量新增key的写,导致频繁的dirty提升和dirty复制,效率反而可能低于读写锁。

    推荐阅读