动手实现一个localcache - 实现篇

第一步:抽象接口 第一步很重要,以面向接口编程为原则,我们先抽象出来要暴露给用户的方法,给用户提供简单易懂的方法,因此我抽象出来的结果如下:
// ICache abstract interface type ICache interface { // Set value use default expire time. default does not expire. Set(key string, value []byte) error // Get value if find it. if value already expire will delete. Get(key string) ([]byte, error) // SetWithTime set value with expire time SetWithTime(key string, value []byte, expired time.Duration) error // Delete manual removes the key Delete(key string) error // Len computes number of entries in cache Len() int // Capacity returns amount of bytes store in the cache. Capacity() int // Close is used to signal a shutdown of the cache when you are done with it. // This allows the cleaning goroutines to exit and ensures references are not // kept to the cache preventing GC of the entire cache. Close() error // Stats returns cache's statistics Stats() Stats // GetKeyHit returns key hit GetKeyHit(key string) int64 }

  • Set(key string, value []byte):使用该方法存储的数据使用默认的过期时间,如果清除过期的异步任务没有enable,那么就永不过期,否则默认过期时间为10min。
  • Get(key string) ([]byte, error):根据key获取对象内容,如果数据过期了会在这一步删除。
  • SetWithTime(key string, value []byte, expired time.Duration):存储对象是使用自定义过期时间
  • Delete(key string) error:根据key删除对应的缓存数据
  • Len() int:获取缓存的对象数量
  • Capacity() int:获取当前缓存的容量
  • Close() error:关闭缓存
  • Stats() Stats:缓存监控数据
  • GetKeyHit(key string) int64:获取key的命中率数据
第二步:定义缓存对象 第一步我们抽象好了接口,下面就要定义一个缓存对象实例实现接口,先看定义结构:
type cache struct { // hashFunc represents used hash func hashFunc HashFunc // bucketCount represents the number of segments within a cache instance. value must be a power of two. bucketCount uint64 // bucketMask is bitwise AND applied to the hashVal to find the segment id. bucketMask uint64 // segment is shard segments []*segment // segment lock locks[]sync.RWMutex // close cache close chan struct{} }

  • hashFunc:分片要用的哈希函数,用户可以自行定义,实现HashFunc接口即可,默认使用fnv算法。
  • bucketCount:分片的数量,一定要是偶数,默认分片数为256
  • bucketMask:因为分片数是偶数,所以可以分片时可以使用位运算代替取余提升性能效率,hashValue % bucketCount == hashValue & bucketCount - 1
  • segments:分片对象,每个分片的对象结构我们在后面介绍。
  • locks:每个分片的读写锁
  • close:关闭缓存对象时通知其他goroutine暂停
// NewCache constructor cache instance func NewCache(opts ...Opt) (ICache, error) { options := &options{ hashFunc: NewDefaultHashFunc(), bucketCount: defaultBucketCount, maxBytes: defaultMaxBytes, cleanTime: defaultCleanTIme, statsEnabled: defaultStatsEnabled, cleanupEnabled: defaultCleanupEnabled, } for _, each := range opts{ each(options) }if !isPowerOfTwo(options.bucketCount){ return nil, errShardCount }if options.maxBytes <= 0 { return nil, ErrBytes }segments := make([]*segment, options.bucketCount) locks := make([]sync.RWMutex, options.bucketCount)maxSegmentBytes := (options.maxBytes + options.bucketCount - 1) / options.bucketCount for index := range segments{ segments[index] = newSegment(maxSegmentBytes, options.statsEnabled) }c := &cache{ hashFunc: options.hashFunc, bucketCount: options.bucketCount, bucketMask: options.bucketCount - 1, segments: segments, locks: locks, close: make(chan struct{}), } if options.cleanupEnabled { go c.cleanup(options.cleanTime) }return c, nil }

  • 前置参数检查,对于外部传入的参数,我们还是要做基本的校验
  • 分片对象初始化
  • 构造缓存对象
第三步:定义分片结构 每个分片结构如下:
type segment struct { hashmap map[uint64]uint32 entries buffer.IBuffer clockclock evictList*list.List stats IStats }

  • hashmp:存储key所对应的存储索引
  • entries:存储key/value的底层结构,我们在第四步的时候介绍,也是代码的核心部分。
  • clock:定义时间方法
  • evicList:这里我们使用一个队列来记录old索引,当容量不足时进行删除(临时解决方案,当前存储结构不适合使用LRU淘汰算法)
  • stats:缓存的监控数据。
func newSegment(bytes uint64, statsEnabled bool) *segment { if bytes == 0 { panic(fmt.Errorf("bytes cannot be zero")) } if bytes >= maxSegmentSize{ panic(fmt.Errorf("too big bytes=%d; should be smaller than %d", bytes, maxSegmentSize)) } capacity := (bytes + segmentSize - 1) / segmentSize entries := buffer.NewBuffer(int(capacity)) entries.Reset() return &segment{ entries: entries, hashmap: make(map[uint64]uint32), clock:&systemClock{}, evictList: list.New(), stats: newStats(statsEnabled), } }

第四步:定义缓存结构 缓存对象现在也构造好了,接下来就是本地缓存的核心:定义缓存结构。
type Buffer struct { array [][]byte capacity int index int // maxCount = capacity - 1 count int // availableSpace If any objects are removed after the buffer is full, the idle index is logged. // Avoid array "wormhole" availableSpace map[int]struct{} // placeholder record the index that buffer has stored. placeholder map[int]struct{} }

  • array [][]byte:存储缓存对象的二维切片
  • capacity:缓存结构的最大容量
  • index:索引,记录缓存所在的位置的索引
  • count:记录缓存数量
  • availableSpace:记录"虫洞",当缓存对象被删除时记录下空闲位置的索引,方便后面容量满了后使用"虫洞"
  • placeholder:记录缓存对象的索引,迭代清除过期缓存可以用上。
第五步:完善向缓存写入数据方法 上面我们定义好了所有需要的结构,接下来就是填充我们的写入缓存方法就可以了:
func (c *cache) Set(key string, value []byte) error{ hashKey := c.hashFunc.Sum64(key) bucketIndex := hashKey&c.bucketMask c.locks[bucketIndex].Lock() defer c.locks[bucketIndex].Unlock() err := c.segments[bucketIndex].set(key, hashKey, value, defaultExpireTime) return err }func (s *segment) set(key string, hashKey uint64, value []byte, expireTime time.Duration) error { if expireTime <= 0{ return ErrExpireTimeInvalid } expireAt := uint64(s.clock.Epoch(expireTime))if previousIndex, ok := s.hashmap[hashKey]; ok { if err := s.entries.Remove(int(previousIndex)); err != nil{ return err } delete(s.hashmap, hashKey) }entry := wrapEntry(expireAt, key, hashKey, value) for { index, err := s.entries.Push(entry) if err == nil { s.hashmap[hashKey] = uint32(index) s.evictList.PushFront(index) return nil } ele := s.evictList.Back() if err := s.entries.Remove(ele.Value.(int)); err != nil{ return err } s.evictList.Remove(ele) } }

  • 根据key计算哈希值,然后根据分片数获取对应分片位置
  • 如果当前缓存中存在相同的key,则先删除,在重新插入,会刷新过期时间
  • 封装存储结构,根据过期时间戳、key长度、哈希大小、缓存对象进行封装
  • 将数据存入缓存,如果缓存失败,移除最老的数据后再次重试
第六步:完善从缓存读取数据方法 第一步根据key计算哈希值,再根据分片数获取对应的分片位置:
func (c *cache) Get(key string) ([]byte, error){ hashKey := c.hashFunc.Sum64(key) bucketIndex := hashKey&c.bucketMask c.locks[bucketIndex].RLock() defer c.locks[hashKey&c.bucketMask].RUnlock() entry, err := c.segments[bucketIndex].get(key, hashKey) if err != nil{ return nil, err } return entry,nil }

  • 先根据哈希值判断key是否存在于缓存中,不存返回key没有找到
  • 从缓存中读取数据得到缓存中的key判断是否发生哈希冲突
  • 判断缓存对象是否过期,过期删除缓存数据(可以根据业务优化需要是否返回当前过期数据)
  • 在每个记录缓存监控数据
func (s *segment) getWarpEntry(key string, hashKey uint64) ([]byte,error) { index, ok := s.hashmap[hashKey] if !ok { s.stats.miss() return nil, ErrEntryNotFound } entry, err := s.entries.Get(int(index)) if err != nil{ s.stats.miss() return nil, err } if entry == nil{ s.stats.miss() return nil, ErrEntryNotFound }if entryKey := readKeyFromEntry(entry); key != entryKey { s.stats.collision() return nil, ErrEntryNotFound } return entry, nil }func (s *segment) get(key string, hashKey uint64) ([]byte, error) { currentTimestamp := s.clock.TimeStamp() entry, err := s.getWarpEntry(key, hashKey) if err != nil{ return nil, err } res := readEntry(entry)expireAt := int64(readExpireAtFromEntry(entry)) if currentTimestamp - expireAt >= 0{ _ = s.entries.Remove(int(s.hashmap[hashKey])) delete(s.hashmap, hashKey) return nil, ErrEntryNotFound } s.stats.hit(key)return res, nil }

第七步:来个测试用例体验一下 先来个简单的测试用例测试一下:
func (h *cacheTestSuite) TestSetAndGet() { cache, err := NewCache() assert.Equal(h.T(), nil, err) key := "asong" value := []byte("公众号:Golang梦工厂")err = cache.Set(key, value) assert.Equal(h.T(), nil, err)res, err := cache.Get(key) assert.Equal(h.T(), nil, err) assert.Equal(h.T(), value, res) h.T().Logf("get value is %s", string(res)) }

=== RUNTestCacheTestSuite === RUNTestCacheTestSuite/TestSetAndGet cache_test.go:33: get value is 公众号:Golang梦工厂 --- PASS: TestCacheTestSuite (0.00s) --- PASS: TestCacheTestSuite/TestSetAndGet (0.00s) PASS

总结 实现篇到这里就结束了,但是这个项目的编码仍未结束,我会继续以此版本为基础不断迭代优化,该本地缓存的优点:
  • 实现简单、提供给用户的方法简单易懂
  • 使用二维切片作为存储结构,避免了不能删除底层数据的缺点,也在一定程度上避免了"虫洞"问题。
  • 测试用例齐全,适合作为小白的入门项目
  • 没有使用高效的缓存淘汰算法,可能会导致热点数据被频繁删除
  • 定时删除过期数据会导致锁持有时间过长,需要优化
  • 关闭缓存实例需要优化处理方式
  • 根据业务场景进行优化(特定业务场景)
  • 添加异步加载缓存功能
  • ...... (思考中)
