guava cache过期方案实践

过期机制 只要是缓存,就必定有过期机制,guava 缓存过期分为以下三种:

  • expireAfterAccess: 数据在指定时间内没有被访问(读或写),则为过期数据,当没有数据或者读到过期数据时,只允许一个线程更新新数据时,其他线程阻塞等待该线程更新完成后,取最新的数据。
构造函数:
public CacheBuilder expireAfterAccess(long duration, TimeUnit unit) { ... this.expireAfterAccessNanos = unit.toNanos(duration); return this; }

构造函数设置了变量expireAfterAccessNanos的值。
  • expireAfterWrite:数据在指定时间内没有被更新(写入),则为过期数据,当没有数据或者读到过期数据时,只允许一个线程更新新数据时,其他线程阻塞等待该线程更新完成后,取最新的数据。
构造函数:
public CacheBuilder expireAfterWrite(long duration, TimeUnit unit) { ... this.expireAfterWriteNanos = unit.toNanos(duration); return this; }

构造函数设置了变量expireAfterWriteNanos的值。
  • refreshAfterWrite:数据在指定时间内没有被更新(写入),则为过期数据,当有线程正在更新(写入)新数据时,其他线程返回旧数据。
构造函数:
public CacheBuilder refreshAfterWrite(long duration, TimeUnit unit) { ... this.refreshNanos = unit.toNanos(duration); return this; }

构造函数设置了变量refreshNanos的值。
问题
  • expireAfterAccess和expireAfterWrite:
    当数据达到过期时间,限制只能有1个线程去执行数据刷新,其他请求阻塞等待这个刷新操作完成,对性能会有的损耗。
  • refreshAfterWrite:
    当数据达到过期时间,限制只能有1个线程去执行新值的加载,其他线程取旧值返回(也可设置异步获取新值,所有线程都返回旧值)。这样有效地可以减少等待和锁争用,所以refreshAfterWrite会比expireAfterWrite性能好。但是还是会有一个线程需要去执行刷新任务,而guava cache支持异步刷新,如果开启异步刷新,在该线程在提交异步刷新任务之后,也会返回旧值,性能上更优异。
    但是由于guava cache并不会定时清理的功能(主动),而是在查询数据时,一并做了过期检查和清理(被动)。那就会出现以下问题:数据如果隔了很长一段时间再去查询,得到的这个旧值可能来自于很长时间之前,这将会引发问题,对时效性要求高的场景可能会造成非常大的错误。
当缓存里没有要访问的数据时,不管设置的是哪个模式,所有的线程都会阻塞,通过锁来控制只会有一个线程去加载数据
原理 首先要了解guava cache过期原理。
1. 整体方法 get方法:
class LocalCache extends AbstractMap implements ConcurrentMap {V get(K key, int hash, CacheLoader loader) throws ExecutionException { checkNotNull(key); checkNotNull(loader); try { if (count != 0) { // read-volatile // don't call getLiveEntry, which would ignore loading values ReferenceEntry e = getEntry(key, hash); if (e != null) { long now = map.ticker.read(); V value = https://www.it610.com/article/getLiveValue(e, now); if (value != null) { recordRead(e, now); statsCounter.recordHits(1); return scheduleRefresh(e, key, hash, value, now, loader); } ValueReference valueReference = e.getValueReference(); if (valueReference.isLoading()) { return waitForLoadingValue(e, key, valueReference); } } }// at this point e is either null or expired; return lockedGetOrLoad(key, hash, loader); } catch (ExecutionException ee) { Throwable cause = ee.getCause(); if (cause instanceof Error) { throw new ExecutionError((Error) cause); } else if (cause instanceof RuntimeException) { throw new UncheckedExecutionException(cause); } throw ee; } finally { postReadCleanup(); } }}

可以看到guava cache继承了ConcurrentHashMap,为了满足并发场景,核心的数据结构就是按照 ConcurrentHashMap 来的。
2. 简化方法 这里将方法简化为跟本次主题有关的几个关键步骤:
if (count != 0) {// 当前缓存是否有数据 ReferenceEntry e = getEntry(key, hash); // 取数据节点 if (e != null) { V value = https://www.it610.com/article/getLiveValue(e, now); // 判断是否过期,过滤已过期数据,仅对expireAfterAccess或expireAfterWrite模式下设置的时间做判断 if (value != null) { return scheduleRefresh(e, key, hash, value, now, loader); // 是否需要刷新数据,仅在refreshAfterWrite模式下生 } ValueReference valueReference = e.getValueReference(); if (valueReference.isLoading()) {// 如果有其他线程正在加载/刷新数据 return waitForLoadingValue(e, key, valueReference); // 等待其他线程完成加载/刷新数据 } } } return lockedGetOrLoad(key, hash, loader); // 加载/刷新数据

countcache的一个属性,被volatile修饰(volatile int count),保存的是当前缓存的数量。
  • 如果count == 0(没有缓存)或者根据key取不到Hash节点,则加锁并加载缓存lockedGetOrLoad)。
  • 如果取到Hash节点,则判断是否过期(getLiveValue),过滤掉已过期数据。
3. getLiveValue
V getLiveValue(ReferenceEntry entry, long now) { if (entry.getKey() == null) { tryDrainReferenceQueues(); return null; } V value = https://www.it610.com/article/entry.getValueReference().get(); if (value == null) { tryDrainReferenceQueues(); return null; }if (map.isExpired(entry, now)) { tryExpireEntries(now); return null; } return value; }

通过isExpired判断当前节点是否已过期:
boolean isExpired(ReferenceEntry entry, long now) { checkNotNull(entry); if (expiresAfterAccess() && (now - entry.getAccessTime() >= expireAfterAccessNanos)) { return true; } if (expiresAfterWrite() && (now - entry.getWriteTime() >= expireAfterWriteNanos)) { return true; } return false; }

isExpired只判断了expireAfterAccessNanos和,expireAfterWriteNanos两个时间,结合expireAfterAccessexpireAfterWriterefreshAfterWrite三个方法的构造函数,可以看到这方法并不去管refreshAfterWrite设置的时间,也就是如果过了expireAfterAccessexpireAfterWrite设置的时间,那数据就是过期了,否则就是没过期。
如果发现数据已过期,则会连带检查是否还有其他过期的数据(惰性删除):
void tryExpireEntries(long now) { if (tryLock()) { try { expireEntries(now); } finally { unlock(); // don't call postWriteCleanup as we're in a read } } }void expireEntries(long now) { drainRecencyQueue(); ReferenceEntry e; while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) { if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) { throw new AssertionError(); } } while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) { if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) { throw new AssertionError(); } } }void drainRecencyQueue() { ReferenceEntry e; while ((e = recencyQueue.poll()) != null) { if (accessQueue.contains(e)) { accessQueue.add(e); } } }

取最近范围&写入的数据,一一检查是否过期。
4. scheduleRefresh
V value = https://www.it610.com/article/getLiveValue(e, now); if (value != null) { return scheduleRefresh(e, key, hash, value, now, loader); }

getLiveValue之后,如果结果不为null,则说明在expireAfterAccessexpireAfterWrite两个模式下没有过期(或者没有设置这两个模式的时间),但是不代表数据不会刷新,因为getLiveValue并没有判断refreshAfterWrite的过期时间,而是在scheduleRefresh方法里判断。
V scheduleRefresh( ReferenceEntry entry, K key, int hash, V oldValue, long now, CacheLoader loader) { if (map.refreshes() && (now - entry.getWriteTime() > map.refreshNanos) && !entry.getValueReference().isLoading()) { V newValue = https://www.it610.com/article/refresh(key, hash, loader, true); if (newValue != null) { return newValue; } } return oldValue; }

满足以下条件时,会刷新数据(刷新线程在同步刷新模式下,返回新值,异步刷新模式下可能会返回旧值),否则直接返回旧值:
  1. 设置了refreshAfterWrite时间refreshNanos
  2. 当前数据已过期。
  3. 没有其他线程正在刷新数据(!entry.getValueReference().isLoading())。
5. waitForLoadingValue 如果没有设置refreshAfterWrite,且数据已过期:
  1. 如果有其他线程正在刷新,则阻塞等待(通过future.get()阻塞)。
  2. 如果没有其他线程正在刷新,则加锁并刷新数据。
    ValueReference valueReference = e.getValueReference(); if (valueReference.isLoading()) { return waitForLoadingValue(e, key, valueReference); }V waitForLoadingValue(ReferenceEntry e, K key, ValueReference valueReference) throws ExecutionException { if (!valueReference.isLoading()) { throw new AssertionError(); }checkState(!Thread.holdsLock(e), "Recursive load of: %s", key); // don't consider expiration as we're concurrent with loading try { V value = https://www.it610.com/article/valueReference.waitForValue(); if (value == null) { throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + "."); } // re-read ticker now that loading has completed long now = map.ticker.read(); recordRead(e, now); return value; } finally { statsCounter.recordMisses(1); } }public V waitForValue() throws ExecutionException { return getUninterruptibly(futureValue); }public static V getUninterruptibly(Future future) throws ExecutionException { boolean interrupted = false; try { while (true) { try { return future.get(); } catch (InterruptedException e) { interrupted = true; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } }

6. 加载数据 加载数据,最终就是调用不管是lockedGetOrLoad方法,还是scheduleRefresh中的refresh方法,最终调用的是CacheLoaderload/reload方法。
当缓存里没有要访问的数据时,不管设置的是哪个模式,都会进入到lockedGetOrLoad方法中:
  1. 通过锁争抢拥有加载数据的权利
  2. 抢到锁的数据,将节点状态设置为loading,并加载数据。
  3. 没抢到锁的数据,进入跟上一步一样的waitForLoadingValue方法,阻塞等到数据加载完成。
lock(); try { LoadingValueReference loadingValueReference = new LoadingValueReference(valueReference); e.setValueReference(loadingValueReference); if (createNewEntry) { loadingValueReference = new LoadingValueReference(); if (e == null) { e = newEntry(key, hash, first); e.setValueReference(loadingValueReference); table.set(index, e); } else { e.setValueReference(loadingValueReference); } } } finally { unlock(); postWriteCleanup(); }if (createNewEntry) { try { // Synchronizes on the entry to allow failing fast when a recursive load is // detected. This may be circumvented when an entry is copied, but will fail fast most // of the time. synchronized (e) { return loadSync(key, hash, loadingValueReference, loader); } } finally { statsCounter.recordMisses(1); } } else { // The entry already exists. Wait for loading. return waitForLoadingValue(e, key, valueReference); }

解决方案 通过以上分析,可以知道在判断缓存是否过期时,guava cache会分成两次独立的判断:
  1. 判断expireAfterAccessexpireAfterWrite
  2. 判断refreshAfterWrite
【guava cache过期方案实践】回到问题“refreshAfterWrite虽然提升了性能,但是除了同步加载模式下执行刷新的那个线程外,其他线程可能访问到过期已久的数据”。我们可以通过expireAfterWriterefreshAfterWrite结合的方案来解决:设置了refreshAfterWrite的过期时间的同时,可以再设置expireAfterWrite/expireAfterAccess的过期时间,expireAfterWrite/expireAfterAccess的时间要大于refreshAfterWrite的时间。
例如refreshAfterWrite的时间为5分钟,而expireAfterWrite的时间为30分钟,访问到过期数据时:
  1. 如果过期时间小于30分钟,则会进入scheduleRefresh方法,除刷新线程以外的其他线程直接返回旧值。
  2. 如果缓存数据长时间未被访问,过期时间超过30分钟,则数据会在getLiveValue方法中被过滤掉,除刷新线程以外,其他线程阻塞等待。

    推荐阅读