最近几天由于工作原因,需要设计实现一个线程安全的缓存机制,拿出来和大家分享交流一下。
应用背景: 缓存是在实际工作中经常用到的,主要作用呢?1. 提高响应速度 2. 降低cpu压力或者数据库压力。 在此,我的应用背景是拦截一些RPC请求(
不要求获取实时数据),且RPC请求无参数,即主要是应对一些数据全量同步的请求(那么缓存的key是请求的函数名,value是返回值)。提供缓存实现,以降低数据库及自身应用的访问压力。
目标: 高可扩展性:可以方便配置需要使用缓存的方法。 线程安全性:在并发情况下,要求线程安全,且尽可能高效。
使用技术:
- 使用AOP的插件性质来降低缓存与原系统的耦合性,即在切面层做缓存的处理。
- 使用Annotation来对需要做缓存处理的函数进行标记,并可以对缓存时间个性化
- 针对缓存过期问题,对放入缓存的数据封装一层,并打上时间戳
文章图片
设计难点: 针对某一时刻并发数较多且缓存失效的情况下,我们应该保证的是只有一个线程会去执行数据的读取并设置的操作,那么其他线程应该是等待该线程完成操作再一起返回还是直接返回null值? 答:在并发数较多且数据准备时间过长的情况下,如果线程采取等待策略,那么将引起很大的资源浪费:占用RPC连接(一般数量是有限制的),占用服务器cpu时间等等问题。所以,最好是代码中提供两种策略,对于执行时间较长的读数据操作,我们应该将线程直接返回,而非一直等待。
代码实现:Annotation:
/**
* 表示一个方法是否启用本地缓存,可以指定本地缓存的时间间隔,默认为一个小时
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface LocalCacheOperation {long localCacheInterval() default 1000 * 60 * 60;
String localCacheKey() default "";
}
CacheObject:
//为缓存对象包上一层时间戳
public class CacheObject implements Serializable {
private static final long serialVersionUID = 4873268779348802945L;
private long timestamp;
private Object object;
.....
}
标记要使用缓存的方法:
public interface ConfigService {
public BloomFilter getAllPassengerNames();
public BloomFilter getAllTrades();
}public class ConfigServiceImpl implements ConfigService {
private PassengerManager passengerManager;
private TradeManager tradeManager;
@LocalCacheOperation(localCacheInterval=1000*60*60*24)
public BloomFilter getAllPassengerNames() {
return passengerManager.getAllPassengerNames();
}
@LocalCacheOperation
public BloomFilter getAllTrades() {
return tradeManager.getAllTrades();
}
}
核心:切面实现 我在这里使用了加锁和不加锁两种方式来实现对应的两种策略:缓存不存在线程直接返回或等待。
/**
* 本地缓存切面实现
*/
@Aspect
public class LocalCacheAspect {private static final Log log = LogFactory.getLog(LocalCacheAspect.class);
private final ConcurrentMap>
localCache = new ConcurrentHashMap>();
//控制台
private ConsoleBean consoleBean;
private final ConcurrentMap>
concurrentLocalCache = new ConcurrentHashMap>();
/**
* 对于执行时间较长的读数据操作,需要在这里相应的添加锁,对于操作添加锁后的函数的线程
* 如果本地缓存为空,且读数据的锁已被其他线程占据,将直接返回null
*/private final static Map localCacheLocks = new HashMap();
static {
localCacheLocks.put("getAllTrades", new ReentrantLock());
}/**
* Advice aound audit operations
*
* @param pjpParam
* @return
*/
@Around("execution(@LocalCacheOperation * *(..))")
public Object doCache(ProceedingJoinPoint pjpParam) throws Throwable {
if(!getConsoleBean().isLocalCacheSwitchOn()) {
log.warn("localCache not switch on, please pay attention");
return pjpParam.proceed(pjpParam.getArgs());
}
final ProceedingJoinPoint pjp = pjpParam;
Signature sig = pjp.getSignature();
if (sig instanceof MethodSignature) {
MethodSignature mSig = (MethodSignature) sig;
LocalCacheOperation co = mSig.getMethod().getAnnotation(
LocalCacheOperation.class);
long localCacheInterval = 0;
String localCacheKey = null;
/**
* AOP在拦截子类的Annotataion时,无法获取该Annotation,导致co可能为空
*/
if( co == null ){
localCacheInterval = consoleBean.getLocalCacheInterval();
localCacheKey = mSig.getName();
} else {
localCacheInterval = co.localCacheInterval();
localCacheKey = StringUtils.isNotBlank(co.localCacheKey()) ?
co.localCacheKey() : mSig.getName();
}
if (localCacheLocks.containsKey(mSig.getMethod().getName())) { //使用本地互斥锁
return doConcurrentLocalCache(pjp, localCacheInterval, localCacheKey);
}while (true) {// 等待某个线程将数据获取到本地缓存
Future f = localCache.get(localCacheKey);
try {
long currentTime = System.currentTimeMillis();
if (f != null && f.get() != null && currentTime - f.get().getTimestamp()
> localCacheInterval) {
localCache.remove(localCacheKey, f);
f = null;
}if (f == null) {
Callable eval = new Callable() {
public CacheObject call() throws InterruptedException {
Object res;
try {
res = pjp.proceed(pjp.getArgs());
}
catch (Throwable e) {
log.error("Fail to process method", e);
throw newServiceException(e.getMessage());
}
CacheObject cacheObject = new CacheObject();
cacheObject.setObject(res);
cacheObject.setTimestamp(System.currentTimeMillis());
return cacheObject;
}
};
FutureTask ft =
new FutureTask(eval);
f = localCache.putIfAbsent(localCacheKey, ft);
if (f == null) {
f = ft;
ft.run();
}
}CacheObject obj = f.get();
if (obj != null)
return obj.getObject();
}
catch (CancellationException e) {
localCache.remove(localCacheKey, f);
}
catch (ExecutionException e) {
throw new ServiceException(e.getMessage());
}
}
}
return pjp.proceed(pjp.getArgs());
}@SuppressWarnings("static-access")
public Object doConcurrentLocalCache(ProceedingJoinPoint pjp,
long localCacheInterval, String localCacheKey) throws Throwable {
try {
long currentTime = System.currentTimeMillis();
SoftReference weakRefCacheObj =
concurrentLocalCache.get(localCacheKey);
if (weakRefCacheObj != null && weakRefCacheObj.get() != null &&
currentTime - weakRefCacheObj.get().getTimestamp() > localCacheInterval) {
// 缓存过期
weakRefCacheObj.get().setObject(null);
// 清空引用
concurrentLocalCache.remove(localCacheKey, weakRefCacheObj);
weakRefCacheObj = null;
} else if (weakRefCacheObj != null && weakRefCacheObj.get() != null) {
return weakRefCacheObj.get().getObject();
}if (this.localCacheLocks.get(localCacheKey).tryLock()) {
weakRefCacheObj = concurrentLocalCache.get(localCacheKey);
if (weakRefCacheObj != null && weakRefCacheObj.get() != null) {
// double check
return weakRefCacheObj.get().getObject();
}
try {
Object res = pjp.proceed(pjp.getArgs());
CacheObject cacheObject = new CacheObject();
cacheObject.setObject(res);
cacheObject.setTimestamp(System.currentTimeMillis());
weakRefCacheObj = new SoftReference(
cacheObject);
concurrentLocalCache.put(localCacheKey, weakRefCacheObj);
return res;
} finally {
this.localCacheLocks.get(localCacheKey).unlock();
}
} else {
return null;
// make the other part wait
}
} catch (Exception e) {
throw new ServiceException(e.getMessage(), e);
}
}
}
推荐阅读
- Ant|antd pro 4.0 动态菜单
- ant|ant design pro 轮播图加prev()和next()
- sketch|How to create a flat styled icon in Sketch 3
- list|自己动手写 printf函数
- android|Design Support Library
- Java|sitemesh&freemarker
- ARM|Keil IAR - Cortex M3 调试问题及解决方法(1)
- Linux|使用Syslogd...klogd...
- TcpIp|利用TCP/IP堆栈进行远程操作系统判别的方法
- Linux|利用tcpip堆栈处理不同判断远程操作系统. ..