system|针对全量请求的缓存机制实现 - AOP

最近几天由于工作原因,需要设计实现一个线程安全的缓存机制,拿出来和大家分享交流一下。
应用背景: 缓存是在实际工作中经常用到的,主要作用呢?1. 提高响应速度 2. 降低cpu压力或者数据库压力。 在此,我的应用背景是拦截一些RPC请求( 不要求获取实时数据),且RPC请求无参数,即主要是应对一些数据全量同步的请求(那么缓存的key是请求的函数名,value是返回值)。提供缓存实现,以降低数据库及自身应用的访问压力。
目标: 高可扩展性:可以方便配置需要使用缓存的方法。 线程安全性:在并发情况下,要求线程安全,且尽可能高效。
使用技术:

  • 使用AOP的插件性质来降低缓存与原系统的耦合性,即在切面层做缓存的处理。
  • 使用Annotation来对需要做缓存处理的函数进行标记,并可以对缓存时间个性化
  • 针对缓存过期问题,对放入缓存的数据封装一层,并打上时间戳
示意图: system|针对全量请求的缓存机制实现 - AOP
文章图片
设计难点: 针对某一时刻并发数较多且缓存失效的情况下,我们应该保证的是只有一个线程会去执行数据的读取并设置的操作,那么其他线程应该是等待该线程完成操作再一起返回还是直接返回null值? 答:在并发数较多且数据准备时间过长的情况下,如果线程采取等待策略,那么将引起很大的资源浪费:占用RPC连接(一般数量是有限制的),占用服务器cpu时间等等问题。所以,最好是代码中提供两种策略,对于执行时间较长的读数据操作,我们应该将线程直接返回,而非一直等待。
代码实现:Annotation:
/** * 表示一个方法是否启用本地缓存,可以指定本地缓存的时间间隔,默认为一个小时 */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface LocalCacheOperation {long localCacheInterval() default 1000 * 60 * 60; String localCacheKey() default ""; }

CacheObject:
//为缓存对象包上一层时间戳 public class CacheObject implements Serializable { private static final long serialVersionUID = 4873268779348802945L; private long timestamp; private Object object; ..... }

标记要使用缓存的方法:
public interface ConfigService { public BloomFilter getAllPassengerNames(); public BloomFilter getAllTrades(); }public class ConfigServiceImpl implements ConfigService { private PassengerManager passengerManager; private TradeManager tradeManager; @LocalCacheOperation(localCacheInterval=1000*60*60*24) public BloomFilter getAllPassengerNames() { return passengerManager.getAllPassengerNames(); } @LocalCacheOperation public BloomFilter getAllTrades() { return tradeManager.getAllTrades(); } }


核心:切面实现 我在这里使用了加锁和不加锁两种方式来实现对应的两种策略:缓存不存在线程直接返回或等待。
/** * 本地缓存切面实现 */ @Aspect public class LocalCacheAspect {private static final Log log = LogFactory.getLog(LocalCacheAspect.class); private final ConcurrentMap> localCache = new ConcurrentHashMap>(); //控制台 private ConsoleBean consoleBean; private final ConcurrentMap> concurrentLocalCache = new ConcurrentHashMap>(); /** * 对于执行时间较长的读数据操作,需要在这里相应的添加锁,对于操作添加锁后的函数的线程 * 如果本地缓存为空,且读数据的锁已被其他线程占据,将直接返回null */private final static Map localCacheLocks = new HashMap(); static { localCacheLocks.put("getAllTrades", new ReentrantLock()); }/** * Advice aound audit operations * * @param pjpParam * @return */ @Around("execution(@LocalCacheOperation * *(..))") public Object doCache(ProceedingJoinPoint pjpParam) throws Throwable { if(!getConsoleBean().isLocalCacheSwitchOn()) { log.warn("localCache not switch on, please pay attention"); return pjpParam.proceed(pjpParam.getArgs()); } final ProceedingJoinPoint pjp = pjpParam; Signature sig = pjp.getSignature(); if (sig instanceof MethodSignature) { MethodSignature mSig = (MethodSignature) sig; LocalCacheOperation co = mSig.getMethod().getAnnotation( LocalCacheOperation.class); long localCacheInterval = 0; String localCacheKey = null; /** * AOP在拦截子类的Annotataion时,无法获取该Annotation,导致co可能为空 */ if( co == null ){ localCacheInterval = consoleBean.getLocalCacheInterval(); localCacheKey = mSig.getName(); } else { localCacheInterval = co.localCacheInterval(); localCacheKey = StringUtils.isNotBlank(co.localCacheKey()) ? co.localCacheKey() : mSig.getName(); } if (localCacheLocks.containsKey(mSig.getMethod().getName())) { //使用本地互斥锁 return doConcurrentLocalCache(pjp, localCacheInterval, localCacheKey); }while (true) {// 等待某个线程将数据获取到本地缓存 Future f = localCache.get(localCacheKey); try { long currentTime = System.currentTimeMillis(); if (f != null && f.get() != null && currentTime - f.get().getTimestamp() > localCacheInterval) { localCache.remove(localCacheKey, f); f = null; }if (f == null) { Callable eval = new Callable() { public CacheObject call() throws InterruptedException { Object res; try { res = pjp.proceed(pjp.getArgs()); } catch (Throwable e) { log.error("Fail to process method", e); throw newServiceException(e.getMessage()); } CacheObject cacheObject = new CacheObject(); cacheObject.setObject(res); cacheObject.setTimestamp(System.currentTimeMillis()); return cacheObject; } }; FutureTask ft = new FutureTask(eval); f = localCache.putIfAbsent(localCacheKey, ft); if (f == null) { f = ft; ft.run(); } }CacheObject obj = f.get(); if (obj != null) return obj.getObject(); } catch (CancellationException e) { localCache.remove(localCacheKey, f); } catch (ExecutionException e) { throw new ServiceException(e.getMessage()); } } } return pjp.proceed(pjp.getArgs()); }@SuppressWarnings("static-access") public Object doConcurrentLocalCache(ProceedingJoinPoint pjp, long localCacheInterval, String localCacheKey) throws Throwable { try { long currentTime = System.currentTimeMillis(); SoftReference weakRefCacheObj = concurrentLocalCache.get(localCacheKey); if (weakRefCacheObj != null && weakRefCacheObj.get() != null && currentTime - weakRefCacheObj.get().getTimestamp() > localCacheInterval) { // 缓存过期 weakRefCacheObj.get().setObject(null); // 清空引用 concurrentLocalCache.remove(localCacheKey, weakRefCacheObj); weakRefCacheObj = null; } else if (weakRefCacheObj != null && weakRefCacheObj.get() != null) { return weakRefCacheObj.get().getObject(); }if (this.localCacheLocks.get(localCacheKey).tryLock()) { weakRefCacheObj = concurrentLocalCache.get(localCacheKey); if (weakRefCacheObj != null && weakRefCacheObj.get() != null) { // double check return weakRefCacheObj.get().getObject(); } try { Object res = pjp.proceed(pjp.getArgs()); CacheObject cacheObject = new CacheObject(); cacheObject.setObject(res); cacheObject.setTimestamp(System.currentTimeMillis()); weakRefCacheObj = new SoftReference( cacheObject); concurrentLocalCache.put(localCacheKey, weakRefCacheObj); return res; } finally { this.localCacheLocks.get(localCacheKey).unlock(); } } else { return null; // make the other part wait } } catch (Exception e) { throw new ServiceException(e.getMessage(), e); } } }


    推荐阅读