笔记|疫情防控交流社区平台——5.1 Kafka构建TB级异步消息系统(发送系统通知)


文章目录

  • Kafka构建TB级异步消息系统(发送系统通知)
    • 1.封装事件对象
    • 2.开发事件的生产者
    • 3.开发事件的消费者
    • 4.常量接口
    • 5.生产者部署到三个对应的controller接口
      • 5.1 CommentController
      • 5.2 LikeController
      • 5.3 FollowController

Kafka构建TB级异步消息系统(发送系统通知) 笔记|疫情防控交流社区平台——5.1 Kafka构建TB级异步消息系统(发送系统通知)
文章图片

1.封装事件对象 Event
/*对应kafka的事件——事件对象*/ @Data @ToString public class Event {private String topic; //主题,就是事件的类型 private int userId; //事件触发的人id private int entityType; //实体类型——事件触发什么操作(点赞、关注、回复),所以要知道事件发生在哪个实体身上,所以要实体类型 private int entityId; //实体id private int entityUserId; //实体作者 //除此以外,事件对象要具有通用性;有可能在一些其他的业务中还要处理一些数据,所以用map来封装那些数据,从而 // 拥有一定扩展性(现在只处理这三种事件,以后可能会处理更多的事件(点赞、关注、评论)) private Map data = https://www.it610.com/article/new HashMap<>(); public String getTopic() { return topic; }public Event setTopic(String topic) { this.topic = topic; return this; }public int getUserId() { return userId; }public Event setUserId(int userId) { this.userId = userId; return this; }public int getEntityType() { return entityType; }public Event setEntityType(int entityType) { this.entityType = entityType; return this; }public int getEntityId() { return entityId; }public Event setEntityId(int entityId) { this.entityId = entityId; return this; }public int getEntityUserId() { return entityUserId; }public Event setEntityUserId(int entityUserId) { this.entityUserId = entityUserId; return this; }public Map getData() { return data; }public Event setData(String key, Object value) { this.data = https://www.it610.com/article/data; return this; } }

2.开发事件的生产者 EventProducer
/*事件生产者*/ @Component public class EventProducer {@Autowired private KafkaTemplate kafkaTemplate; //处理事件 :就是发送消息 public void fireEvent(Event event){ //将事件发布到指定的主题上 kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event)); //以上第二个参数:将event转换为一个json字符串,消费者得到这个字符串后,能将其还原成event对象,这样就得到了event所有数据并对其进一步处理} }

3.开发事件的消费者 EventConsumer
/*事件消费者:不用主动调,只要有数据,会自动的,只需调生产者在对应接口上即可!*/ @Component public class EventConsumer implements CommunityConstant {//打印日志 private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class); /*处理一个事件,其实最终就是要给某一个人发送一条消息,发消息其实就是往message表插一条数据*/ @Autowired private MessageService messageService; /*一个方法可以消费一个或多个主题;一个主体也能被多个方法消费————多对多的关系*/ //当前我的业务是:点赞、关注、评论。这三个通知形式是很相近的,所以写到一个方法里处理这三个主题 @KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW}) //定义消费者的方法 public void handleCommentMessage(ConsumerRecord record){ if (record == null || record.value()==null){ logger.error("消息内容为空!"); return; } //将传过来的json字符串转为对象。字符串从record中取,转为后面的class参数对象 Event event = JSONObject.parseObject(record.value().toString(), Event.class); if (event == null){ logger.error("消息格式错误!必须为JSON"); return; }/*上面内容和格式都判断通过,开始正式业务*/ //发送站内通知 Message message = new Message(); //构造一个Message对象 message.setFromId(SYSTEM_USER_ID); //消息发布者:就是系统给用户发,系统在表里是id为1的用户,所以将1定义为常量1 message.setId(event.getEntityUserId()); message.setConversationId(event.getTopic()); //这里不是xxx_xxx的会话信息id,而是主题名 message.setCreateTime(new Date()); //内容content就存 事件触发拼接信息 Map content = new HashMap<>(); content.put("userId",event.getUserId()); //触发事件人的id content.put("entityType",event.getEntityType()); content.put("entityId",event.getEntityId()); //对其他数据进行判断 if (!event.getData().isEmpty()){ for(Map.Entry entry : event.getData().entrySet()){ content.put(entry.getKey(),entry.getValue()); } } //将content以字符串格式存入 message.setContent(JSONObject.toJSONString(content)); messageService.addMessage(message); } }

4.常量接口 笔记|疫情防控交流社区平台——5.1 Kafka构建TB级异步消息系统(发送系统通知)
文章图片

5.生产者部署到三个对应的controller接口 5.1 CommentController
@Controller @RequestMapping("/comment") public class CommentController implements CommunityConstant {@Autowired private CommentService commentService; @Autowired private HostHolder hostHolder; //获取当前用户信息 @Autowired private EventProducer eventProducer; //注入生产者 @Autowired private DiscussPostService discussPostService; /*1.插入评论*/ //比如进入一个帖子,我发布完评论后,应该重定向到这个帖子接着查看详情页面;所以最终重定向的地方需要用到帖子id,所以我在添加的路径上把id也传过来 //@PathVariable("discussPostId") 用来接收参数 @RequestMapping(path = "/add/{discussPostId}",method = RequestMethod.POST) public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment){ comment.setUserId(hostHolder.getUsers().getId()); //如果用户未登录,这里就会报错。不碍事,后面统一异常处理 comment.setStatus(0); comment.setCreateTime(new Date()); //实现添加业务 commentService.addComment(comment); /*触发评论事件-kafka*/ Event event = new Event() .setTopic(TOPIC_COMMENT) .setUserId(hostHolder.getUsers().getId()) .setEntityType(comment.getEntityType()) .setEntityId(comment.getEntityId()) .setData("postId",discussPostId); //被评论帖子的id if (comment.getEntityType()==ENTITY_TYPE_POST){ //实体类型等于帖子,即对帖子的评论; DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId()); event.setUserId(target.getUserId()); }else if (comment.getEntityType()==ENTITY_TYPE_USER){ //实体类型等于帖子,即对用户的评论; Comment target = commentService.findCommentById(comment.getEntityId()); event.setEntityUserId(target.getUserId()); } //调生产者就完成了! eventProducer.fireEvent(event); return "redirect:/discuss/detail/"+discussPostId; }}

笔记|疫情防控交流社区平台——5.1 Kafka构建TB级异步消息系统(发送系统通知)
文章图片

5.2 LikeController
@Controller public class LikeController implements CommunityConstant {@Autowired private LikeService likeService; @Autowired private HostHolder hostHolder; @Autowired private EventProducer eventProducer; @RequestMapping(path ="/like",method = RequestMethod.POST) @ResponseBody public String like(int entityType,int entityId,int entityUserId,int postId){ //获取当前用户 _ 不用判断登录状态,因为有拦截器,添加一下即可。后期security也可以的。 User user = hostHolder.getUsers(); //点赞 likeService.like(user.getId(),entityType,entityId,entityUserId); //统计点赞数量 long likeCount = likeService.findEntityLikeCount(entityType, entityId); //统计点赞状态 int likeStatus = likeService.findEntityLikeStatus(user.getId(), entityType, entityId); //最终要把这两个值传给页面,封装一下再传 //返回的结果 Map map = new HashMap<>(); map.put("likeCount",likeCount); map.put("likeStatus",likeStatus); /*触发点赞事件-kafka*/ //这个点赞是有双重能力的方法,点一下喜欢,再点一下取消喜欢,所以只需要通知喜欢进行 if (likeStatus == 1){ Event event = new Event() .setTopic(TOPIC_LIKE) .setUserId(hostHolder.getUsers().getId()) .setEntityType(entityType) .setEntityId(entityId) .setEntityUserId(entityUserId) .setData("postId",postId); eventProducer.fireEvent(event); }return CommunityUtil.getJSONString(0,null,map); } }

【笔记|疫情防控交流社区平台——5.1 Kafka构建TB级异步消息系统(发送系统通知)】笔记|疫情防控交流社区平台——5.1 Kafka构建TB级异步消息系统(发送系统通知)
文章图片

5.3 FollowController
//一、关注 @RequestMapping(path = "/follow",method = RequestMethod.POST) @ResponseBody //点击关注,应该是一个异步请求,局部刷新 //关注肯定是当前用户关注某一个实体,所以用户id不用传,取当前用户即可。 public String follow(int entityType,int entityId){ //1.获取当前用户(如果没登录,就不行。所以这个方法用拦截器做个检查,登录后才能访问) User user = hostHolder.getUsers(); //2.关注 followService.follow(user.getId(),entityType,entityId); /*触发关注事件——kafka*/ Event event = new Event() .setTopic(TOPIC_FOLLOW) .setUserId(hostHolder.getUsers().getId()) .setEntityType(entityType) .setEntityId(entityId) .setEntityUserId(entityId); eventProducer.fireEvent(event); return CommunityUtil.getJSONString(0,"已关注!"); }

    推荐阅读