文章目录
- Kafka构建TB级异步消息系统(发送系统通知)
-
- 1.封装事件对象
- 2.开发事件的生产者
- 3.开发事件的消费者
- 4.常量接口
- 5.生产者部署到三个对应的controller接口
-
- 5.1 CommentController
- 5.2 LikeController
- 5.3 FollowController
Kafka构建TB级异步消息系统(发送系统通知)
文章图片
1.封装事件对象 Event
/*对应kafka的事件——事件对象*/
@Data
@ToString
public class Event {private String topic;
//主题,就是事件的类型
private int userId;
//事件触发的人id
private int entityType;
//实体类型——事件触发什么操作(点赞、关注、回复),所以要知道事件发生在哪个实体身上,所以要实体类型
private int entityId;
//实体id
private int entityUserId;
//实体作者
//除此以外,事件对象要具有通用性;有可能在一些其他的业务中还要处理一些数据,所以用map来封装那些数据,从而
// 拥有一定扩展性(现在只处理这三种事件,以后可能会处理更多的事件(点赞、关注、评论))
private Map data = https://www.it610.com/article/new HashMap<>();
public String getTopic() {
return topic;
}public Event setTopic(String topic) {
this.topic = topic;
return this;
}public int getUserId() {
return userId;
}public Event setUserId(int userId) {
this.userId = userId;
return this;
}public int getEntityType() {
return entityType;
}public Event setEntityType(int entityType) {
this.entityType = entityType;
return this;
}public int getEntityId() {
return entityId;
}public Event setEntityId(int entityId) {
this.entityId = entityId;
return this;
}public int getEntityUserId() {
return entityUserId;
}public Event setEntityUserId(int entityUserId) {
this.entityUserId = entityUserId;
return this;
}public Map getData() {
return data;
}public Event setData(String key, Object value) {
this.data = https://www.it610.com/article/data;
return this;
}
}
2.开发事件的生产者 EventProducer
/*事件生产者*/
@Component
public class EventProducer {@Autowired
private KafkaTemplate kafkaTemplate;
//处理事件 :就是发送消息
public void fireEvent(Event event){
//将事件发布到指定的主题上
kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
//以上第二个参数:将event转换为一个json字符串,消费者得到这个字符串后,能将其还原成event对象,这样就得到了event所有数据并对其进一步处理}
}
3.开发事件的消费者 EventConsumer
/*事件消费者:不用主动调,只要有数据,会自动的,只需调生产者在对应接口上即可!*/
@Component
public class EventConsumer implements CommunityConstant {//打印日志
private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
/*处理一个事件,其实最终就是要给某一个人发送一条消息,发消息其实就是往message表插一条数据*/
@Autowired
private MessageService messageService;
/*一个方法可以消费一个或多个主题;一个主体也能被多个方法消费————多对多的关系*/
//当前我的业务是:点赞、关注、评论。这三个通知形式是很相近的,所以写到一个方法里处理这三个主题
@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})
//定义消费者的方法
public void handleCommentMessage(ConsumerRecord record){
if (record == null || record.value()==null){
logger.error("消息内容为空!");
return;
}
//将传过来的json字符串转为对象。字符串从record中取,转为后面的class参数对象
Event event = JSONObject.parseObject(record.value().toString(), Event.class);
if (event == null){
logger.error("消息格式错误!必须为JSON");
return;
}/*上面内容和格式都判断通过,开始正式业务*/
//发送站内通知
Message message = new Message();
//构造一个Message对象
message.setFromId(SYSTEM_USER_ID);
//消息发布者:就是系统给用户发,系统在表里是id为1的用户,所以将1定义为常量1
message.setId(event.getEntityUserId());
message.setConversationId(event.getTopic());
//这里不是xxx_xxx的会话信息id,而是主题名
message.setCreateTime(new Date());
//内容content就存 事件触发拼接信息
Map content = new HashMap<>();
content.put("userId",event.getUserId());
//触发事件人的id
content.put("entityType",event.getEntityType());
content.put("entityId",event.getEntityId());
//对其他数据进行判断
if (!event.getData().isEmpty()){
for(Map.Entry entry : event.getData().entrySet()){
content.put(entry.getKey(),entry.getValue());
}
}
//将content以字符串格式存入
message.setContent(JSONObject.toJSONString(content));
messageService.addMessage(message);
}
}
4.常量接口
文章图片
5.生产者部署到三个对应的controller接口 5.1 CommentController
@Controller
@RequestMapping("/comment")
public class CommentController implements CommunityConstant {@Autowired
private CommentService commentService;
@Autowired
private HostHolder hostHolder;
//获取当前用户信息
@Autowired
private EventProducer eventProducer;
//注入生产者
@Autowired
private DiscussPostService discussPostService;
/*1.插入评论*/
//比如进入一个帖子,我发布完评论后,应该重定向到这个帖子接着查看详情页面;所以最终重定向的地方需要用到帖子id,所以我在添加的路径上把id也传过来
//@PathVariable("discussPostId") 用来接收参数
@RequestMapping(path = "/add/{discussPostId}",method = RequestMethod.POST)
public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment){
comment.setUserId(hostHolder.getUsers().getId());
//如果用户未登录,这里就会报错。不碍事,后面统一异常处理
comment.setStatus(0);
comment.setCreateTime(new Date());
//实现添加业务
commentService.addComment(comment);
/*触发评论事件-kafka*/
Event event = new Event()
.setTopic(TOPIC_COMMENT)
.setUserId(hostHolder.getUsers().getId())
.setEntityType(comment.getEntityType())
.setEntityId(comment.getEntityId())
.setData("postId",discussPostId);
//被评论帖子的id
if (comment.getEntityType()==ENTITY_TYPE_POST){
//实体类型等于帖子,即对帖子的评论;
DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
event.setUserId(target.getUserId());
}else if (comment.getEntityType()==ENTITY_TYPE_USER){
//实体类型等于帖子,即对用户的评论;
Comment target = commentService.findCommentById(comment.getEntityId());
event.setEntityUserId(target.getUserId());
}
//调生产者就完成了!
eventProducer.fireEvent(event);
return "redirect:/discuss/detail/"+discussPostId;
}}
文章图片
5.2 LikeController
@Controller
public class LikeController implements CommunityConstant {@Autowired
private LikeService likeService;
@Autowired
private HostHolder hostHolder;
@Autowired
private EventProducer eventProducer;
@RequestMapping(path ="/like",method = RequestMethod.POST)
@ResponseBody
public String like(int entityType,int entityId,int entityUserId,int postId){
//获取当前用户 _ 不用判断登录状态,因为有拦截器,添加一下即可。后期security也可以的。
User user = hostHolder.getUsers();
//点赞
likeService.like(user.getId(),entityType,entityId,entityUserId);
//统计点赞数量
long likeCount = likeService.findEntityLikeCount(entityType, entityId);
//统计点赞状态
int likeStatus = likeService.findEntityLikeStatus(user.getId(), entityType, entityId);
//最终要把这两个值传给页面,封装一下再传
//返回的结果
Map map = new HashMap<>();
map.put("likeCount",likeCount);
map.put("likeStatus",likeStatus);
/*触发点赞事件-kafka*/
//这个点赞是有双重能力的方法,点一下喜欢,再点一下取消喜欢,所以只需要通知喜欢进行
if (likeStatus == 1){
Event event = new Event()
.setTopic(TOPIC_LIKE)
.setUserId(hostHolder.getUsers().getId())
.setEntityType(entityType)
.setEntityId(entityId)
.setEntityUserId(entityUserId)
.setData("postId",postId);
eventProducer.fireEvent(event);
}return CommunityUtil.getJSONString(0,null,map);
}
}
【笔记|疫情防控交流社区平台——5.1 Kafka构建TB级异步消息系统(发送系统通知)】
文章图片
5.3 FollowController
//一、关注
@RequestMapping(path = "/follow",method = RequestMethod.POST)
@ResponseBody
//点击关注,应该是一个异步请求,局部刷新
//关注肯定是当前用户关注某一个实体,所以用户id不用传,取当前用户即可。
public String follow(int entityType,int entityId){
//1.获取当前用户(如果没登录,就不行。所以这个方法用拦截器做个检查,登录后才能访问)
User user = hostHolder.getUsers();
//2.关注
followService.follow(user.getId(),entityType,entityId);
/*触发关注事件——kafka*/
Event event = new Event()
.setTopic(TOPIC_FOLLOW)
.setUserId(hostHolder.getUsers().getId())
.setEntityType(entityType)
.setEntityId(entityId)
.setEntityUserId(entityId);
eventProducer.fireEvent(event);
return CommunityUtil.getJSONString(0,"已关注!");
}
推荐阅读
- 零碎知识|springboot项目的zip发版转为jar包发版所得
- JAVA|线程池处理任务
- JAVA|多线程之定时器&&一些面试题
- JAVA|第一届AcWing Cup决赛两个闹钟问题
- JAVA|斗地主游戏基础版
- JAVA|特殊的四位十进制
- java基础|Java 多线程(超详细)
- Java基础|Java读写Properties配置文件(Properties类)
- Java基础|Java这些IO流你了解嘛