使用redis结合aop提取用户的行为数据

场景描述 在项目中有这样一个需求,用户下载app并打开进入首页之前,会让用户选择自己感兴趣的主题分类,后台根据用户的选择提取出用户的行为数据并作出统计,以图形或者表格的形式展现出来,后期就可以根据这些数据做去做一些类似个性化或精准化的推送了,例如在进入豆瓣app首页前提示用户先选择感兴趣的主题内容.

使用redis结合aop提取用户的行为数据
文章图片
image.png 具体实现 由于项目中主要使用了Spring框架,所以首先想到的最简单的方式就是使用aop切面拦截用户选择主题分类的接口,然后在数据库中建一张单独的表t_user_interest来存储用户的行为数据,由于是统计型数据所以实时性要求不高,允许有一定的延迟,并且为避免频繁的操作数据库,在切面拦截数据后起独立的线程推送到redis队列,后台再起调度任务每隔1小时异步持久化到数据库中.下面来看看实现步骤:
1.定义aop拦截器拦截方法调用,把用户行为数据推送到redis队列中,其中TaskPo为封装好的与用户相关的行为数据对象.

/** * aop拦截器拦截方法调用 */ @Component @Aspect public class UserInterestAspect {private final static Logger logger = LoggerFactory.getLogger(UserInterestAspect.class); private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); private final static ExecutorService pool = Executors.newFixedThreadPool(5); private final static String QUEUE_NAME = "USER_INTEREST"; /** * 前置通知:在目标方法开始之前执行 * * @param joinPoint * @throws Exception */ @Before("execution(* xxx.service.impl.xxxServiceImpl.xxx(..))") public void saveRequiresLog(JoinPoint joinPoint) throws Exception { try { TaskPo taskPo = getTaskPo(joinPoint); if (taskPo != null) { queue.add(taskPo); TaskPo taskMsge; while ((taskMsge = queue.poll()) != null) { pool.execute(new PushRedisWorker(QUEUE_NAME, taskMsge)); } } } catch (Exception e) { throw e; } }/** * 解析参数映射为Java对象 * * @param joinPoint * @return */ public TaskPo getTaskPo(JoinPoint joinPoint) { Object[] paramsValue = https://www.it610.com/article/joinPoint.getArgs(); String[] paramsName = ((CodeSignature) joinPoint.getStaticPart() .getSignature()).getParameterNames(); JSONObject entityJSON = new JSONObject(); for (int i = 0; i < paramsName.length; i++) { entityJSON.put(paramsName[i], paramsValue[i]); } if (entityJSON != null && !entityJSON.isEmpty()) { TaskPo taskPo = JSONObject.toJavaObject(entityJSON, TaskPo.class); taskPo.setOperatorDate(new Date()); return taskPo; } return null; } }

2.接着起一个任务线程接口worker继承Runnable.
public interface Worker extends Runnable {}

3.然后创建一个PushRedisWorker类实现Worker接口,并将封装好的数据写到redis队列中.
public class PushRedisWorker implements Worker {private String QUEUE_NAME; private TaskPo taskPo; public PushRedisWorker(String QUEUE_NAME, TaskPo taskPo) { this.QUEUE_NAME = QUEUE_NAME; this.taskPo = taskPo; }@Override public void run() { // TODO Auto-generated method stub try (Jedis jedis = JedisUtils.getJedis()) { jedis.lpush(QUEUE_NAME, JSON.toJSONString(taskPo)); } } }

4.然后再定义一个调度任务器类每隔一段时间间隔轮询redis任务队列,不断的从队列中消费数据,再结合线程池异步发送数据到mysql中.
/** * 任务调度定时到Redis队列中拉取对象持久化到数据库 */ @Component public class RedisTaskJob {private final static Logger logger = LoggerFactory.getLogger(RedisTaskJob.class); private final static String QUEUE_NAME = "USER_INTEREST"; private final static ExecutorService es = Executors.newFixedThreadPool(5); private volatile static boolean isRun = true; /** * 每隔一小时执行一次 */ @Scheduled(cron = "0 0 0/1 * * ? ") public void getRedisTask() { if (logger.isDebugEnabled()) { logger.debug("调度开始"); } //取出spring受管的业务service实例 IUserInterestService userInterestService = SpringContextHolder.getBean("userInterestService",IUserInterestService.class); try (Jedis jedis = JedisUtils.getJedis()) { if (jedis.exists(QUEUE_NAME)) { start(); } while (isRun) { if (!jedis.exists(QUEUE_NAME)) { stop(); break; } try { String task = jedis.lpop(QUEUE_NAME); if (StringHelpUtils.isNotBlank(task)) { TaskPo taskPo = JSONObject.toJavaObject(JSON.parseObject(task), TaskPo.class); //任务对象转化为实体model UserInterest model = EntityUtils.convert(taskPo,UserInterest.class); es.submit(() -> { //执行数据入库操作 userInterestService.save(model); }); } } catch (Exception e) { stop(); throw e; } } } }public static void stop() { isRun = false; }public static void start() { isRun = true; } }

【使用redis结合aop提取用户的行为数据】5.最后统计mysql中的数据并生成报表.

    推荐阅读