本地批量缓存

public abstract class AtomicBatchService implements Serializable {private static final long serialVersionUID = 2931723128262800986L; private static final Logger logger = LoggerFactory.getLogger(AtomicBatchService.class); private long LAST_BATCH_INSERT_TIME = System.currentTimeMillis(); public AtomicReference cacheMap = new AtomicReference<>(); public OUT out; private Context context; public RESULT doBatch(String key, INPUT dataObject) {return doBatch(key, dataObject, null); }public RESULT doBatch(String key, INPUT dataObject, OUT out) {this.setOut(out); cacheMap.updateAndGet(new UnaryOperator() { @Override public Map apply(Map map) { if (map == null) { map = new ConcurrentHashMap(); context = getConfig() == null ? new Context() : getConfig(); if (context.getPeriod() > 0) { addTimer(context); } System.out.println(System.identityHashCode(this) + "批量任务初始化成功:" + JSONObject.toJSONString(context)); } map.put(key, dataObject); return map; } }); return isExcuteBatch() ? excute() : null; }/** * 是否执行批量 * * @return */ public boolean isExcuteBatch() {return cacheMap.get().size() >= context.getBatchSize() || (System.currentTimeMillis() - LAST_BATCH_INSERT_TIME) > context.getMilliSeconds(); }/** * 定时任务是否执行 * * @return */ public boolean isExcuteTask() {return (System.currentTimeMillis() - LAST_BATCH_INSERT_TIME) > context.getMilliSeconds(); }/** * 批量处理数据 * * @return */ public RESULT excute() {LAST_BATCH_INSERT_TIME = System.currentTimeMillis(); long startTime = LAST_BATCH_INSERT_TIME; Map map = cacheMap.getAndSet(new ConcurrentHashMap<>()); if (MapUtils.isEmpty(map)) { return null; }Collection dataList = map.values(); if (CollectionUtils.isEmpty(dataList)) { return null; }RESULT result = batchOperation(new ArrayList<>(dataList)); if (ConstantsDefine.isLog) { System.out.println("批量处理数据: " + JSONObject.toJSONString(map)); } System.out.println(String.format("批量操作花费时间: %d|%dms", dataList.size(), (System.currentTimeMillis() - startTime))); return result; }/** * 批量处理本地缓存的数据 */ public void doSubmit() {long startTime = System.currentTimeMillis(); Map map = cacheMap.getAndSet(new ConcurrentHashMap<>()); if (MapUtils.isEmpty(map)) { return; }Collection dataList = map.values(); if (CollectionUtils.isEmpty(dataList)) { return; } batchOperation(new ArrayList<>(dataList)); logger.info("批量操作花费时间: {}|{}ms", dataList.size(), (System.currentTimeMillis() - startTime)); }/** * 增加定时器处理 * * @param context */ public void addTimer(Context context) {new Timer().schedule(new AtomicTimerTask(this), context.getPeriod(), context.getPeriod()); }/** * 批量操作 * * @param dataList */ public abstract RESULT batchOperation(List dataList); /** * 批量输出 * * @param out * @param resultList */ public abstract void out(OUT out, RESULT resultList); /** * 设置批量数量、间隔时间(单位秒) */ public abstract Context getConfig(); public static class Context {private long milliSeconds = 5000; // 间隔时间(单位毫秒)private int batchSize = 100; // 批量数量private long period = 0; // 定时器处理周期,为0表示不启动定时器public Context() {}public Context(int batchSize, int milliSeconds) {this.setMilliSeconds(milliSeconds); this.setBatchSize(batchSize); }public Context(int batchSize, int milliSeconds, long period) {this.setMilliSeconds(milliSeconds); this.setBatchSize(batchSize); this.setPeriod(period); }public long getMilliSeconds() { return milliSeconds; }public void setMilliSeconds(long milliSeconds) { this.milliSeconds = milliSeconds; }public int getBatchSize() { return batchSize; }public void setBatchSize(int batchSize) { this.batchSize = batchSize; }public long getPeriod() { return period; }public void setPeriod(long period) { this.period = period; } }/** * @Description 批量任务定时器 * @Author 01381119 * @CreateDate: 2019/3/18 17:19 */ public class AtomicTimerTask extends TimerTask {private AtomicBatchService atomicBatchService; public AtomicTimerTask(AtomicBatchService atomicBatchService) {this.atomicBatchService = atomicBatchService; }@Override public void run() {System.out.println(String.format("批量任务定时器开始处理:%s|%s|%sms", DateTimeUtils.formatDateToString(new Date(), DateTimeUtils.DATE_FORMAT_FULL), this.atomicBatchService.getClass().getName(), this.atomicBatchService.getConfig().getPeriod())); if (!atomicBatchService.isExcuteTask()) { System.out.println("批量任务执行正常,无需执行定时器任务!"); return; }if (atomicBatchService.out == null) {// 不需要再执行输出下一环节操作 atomicBatchService.excute(); } else { atomicBatchService.out(atomicBatchService.out, atomicBatchService.excute()); } System.out.println(String.format("批量任务定时器完成处理:%s", DateTimeUtils.formatDateToString(new Date(), DateTimeUtils.DATE_FORMAT_FULL))); } }public AtomicReference getCacheMap() { return cacheMap; }public void setCacheMap(AtomicReference cacheMap) { this.cacheMap = cacheMap; }public OUT getOut() { return out; }public void setOut(OUT out) { this.out = out; }public Context getContext() { return context; }public void setContext(Context context) { this.context = context; }





【本地批量缓存】
批量定时器:
public class AtomicTimerTask extends TimerTask {private AtomicBatchService atomicBatchService; public AtomicTimerTask(AtomicBatchService atomicBatchService) {this.atomicBatchService = atomicBatchService; }@Override public void run() {System.out.println(String.format("批量任务定时器开始处理:%s|%sms", this.atomicBatchService.getClass().getName(), this.atomicBatchService.getConfig().getPeriod())); if (!atomicBatchService.isExcuteTask()) { System.out.println("批量任务执行正常,无需执行定时器任务!"); return; }if (atomicBatchService.out == null) {// 不需要再执行输出下一环节操作 atomicBatchService.excute(); } else { atomicBatchService.out(atomicBatchService.out, atomicBatchService.excute()); } }




应用:(继承)
public class ChaifenService extends AtomicBatchService, Collector> {private static HighLevelRestDao highLevelRestDao = new HighLevelRestDaoImpl(); private static String ES_INDEX_NAME = "chaifen_vehicle"; private static String ES_DOC_TYPE = "chaifen_vehicle"; @Override public List batchOperation(List dataList) { List liRequestList = Lists.transform(dataList, new Function() { @Nullable @Override public String apply(@Nullable JSONObject jsonObject) { return jsonObject.getString("line_require_id"); } }); if (ListUtil.isEmpty(liRequestList)){ return dataList; }try { QueryCondition queryCondition = new QueryCondition(ES_INDEX_NAME,ES_DOC_TYPE); queryCondition.setField("line_require_id",liRequestList); highLevelRestDao.doSyncDelete(queryCondition); }catch (Exception e){ e.printStackTrace(); }return dataList; }@Override public void out(Collector stringCollector, List resultList) { if (ListUtil.isEmpty(resultList)) { return; }for (JSONObject dataJson : resultList) { out.collect(dataJson.toJSONString()); }}@Override public Context getConfig() { return new Context(300, 3 * 1000, 5 * 60 * 1000); }



chaifenService.setOut(out); List list = chaifenService.doBatch(json.getString("esKey"), json); if (ListUtil.isEmpty(list)) { return; }for (JSONObject result : list) { out.collect(result.toJSONString()); }

    推荐阅读