websoket实战|使用websocket做及时通讯功能————04tomcat版本做即时通讯的问题记载

其实很不想写这一篇文章的,但是,怕后面真有同学看我的博客踩坑了。我觉得不好意思。
还是接上一篇博客,上面我们完成了基本的群聊功能。相信大家发现能玩以后肯定也是很高兴的。今天这一篇博客就是要推翻我前面的02后端实现的功能,当然,如果你的企业结构比较小,这样也是完全满足要求的。
接着第二篇博客,我们来测试一下,这样的即时通讯可以链接多少个用户。希望不要亮瞎你的眼睛。
首先我们会实现websocket,其次我们会使用脚本触发链接。直到链接到瓶颈。
1.后端demo 0):pom

【websoket实战|使用websocket做及时通讯功能————04tomcat版本做即时通讯的问题记载】4.0.0com.hannan.ehu test-websocket 0.0.1-SNAPSHOTjartest-websocket springbootWebsocket并发链接测试org.springframework.boot spring-boot-starter-parent 2.1.0.RELEASE UTF-8UTF-8 1.8 1.2.47 org.springframework.boot spring-boot-starter-websocket com.alibaba fastjson ${fastjson.version} org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin true

a): ChatController
package com.hannan.ehu.websocket.controller; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.hannan.ehu.websocket.utils.MsgUtils; import com.hannan.ehu.websocket.utils.WebSocketUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.List; @ServerEndpoint(value = "https://www.it610.com/chat/{userId}") @Component public class ChatController {private Logger log = LoggerFactory.getLogger(ChatController.class); @OnOpen public void onOpen(@PathParam("userId") String userId, Session session) { log.info("连接开启:{}-----"+userId); WebSocketUtils.put(userId, session); log.info("当前并发个数为:"+WebSocketUtils.getAllOnlineUser().size()); MsgUtils.onOpen(userId,session); }@OnMessage public void onMessage(String msgStr,Session session) { log.info(msgStr); JSONObject msg = JSON.parseObject(msgStr); Object content = msg.get("content"); String from = msg.getString("from"); String to = msg.getString("to"); if(content instanceof String){ content = from.substring(0,5) + ":" + content; } broadcast(MsgUtils.handChatMsg(content), from, to,session); }@OnError public void onError(@PathParam("userId") String userId,Throwable r) { WebSocketUtils.remove(userId); }@OnClose public void onClose(@PathParam("userId") String userId) { log.info("连接关闭了:{}-----"+userId); log.info("当前并发个数为:"+WebSocketUtils.getAllOnlineUser().size()); WebSocketUtils.remove(userId); }/** * 发送消息(该处理逻辑步骤) * * @param message 消息内容 * @param from * @param to */ private void broadcast(String message, String from, String to,Session fromSession) { if ("-1".equals(to)) { List sessions = WebSocketUtils.getOtherSession(from); if (sessions.size() > 0) { for (Session s : sessions) { s.getAsyncRemote().sendText(message); } } } else { Session toSession = WebSocketUtils.get(to); if (null != toSession && toSession.isOpen()) { toSession.getAsyncRemote().sendText(message); } else { Session mySession = WebSocketUtils.get(from); if(mySession.equals(fromSession)){ String msg =MsgUtils.handNoticeMsg(MsgUtils.NOT_LINE_NOTICE,"对方已经下线了"); mySession.getAsyncRemote().sendText(msg); }else{ String msg =MsgUtils.handNoticeMsg(MsgUtils.EXCEPTION_NOTICE,"你的链接出现异常"); fromSession.getAsyncRemote().sendText(msg); } } } } }

b): UUidController
package com.hannan.ehu.websocket.controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID; @RestController public class UUidController { @GetMapping("/uid") public String getUUid(){ return UUID.randomUUID().toString().replaceAll("-",""); } }

c):WebSocketUtils
package com.hannan.ehu.websocket.utils; import javax.websocket.Session; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; public class WebSocketUtils { private static Map chat = new ConcurrentHashMap<>(); private static final String PREFIX = "mws_"; public static void put(String userid, Session session) { chat.put(getKey(userid), session); }public static Session get(String userid) { return chat.get(getKey(userid)); }public static List getOtherSession(String userid) { List result = new ArrayList<>(); Set set = chat.entrySet(); for (Map.Entry s : set) { if (!s.getKey().equals(getKey(userid))) { result.add(s.getValue()); } } return result; }public static void remove(String userid) { chat.remove(getKey(userid)); }public static boolean hasConnection(String userid) { return chat.containsKey(getKey(userid)); }private static String getKey(String userid) { return PREFIX + userid; }public static Set getAllOnlineUser(){ return chat.keySet(); } }

d):
package com.hannan.ehu.websocket.utils; import com.alibaba.fastjson.JSONObject; import javax.websocket.Session; import java.util.*; public class MsgUtils { /** * 聊天类消息处理 */ private static IntegerTEXT_MSG = 1; /** * 系统级别消息处理 */ private static IntegerOPEN_CONNECT = 11; private static IntegerADD_USER = 12; public static IntegerNOT_LINE_NOTICE = 13; public static IntegerEXCEPTION_NOTICE = 14; /** * 处理接收到的聊天类消息 */ public static String handChatMsg(Object msg){ if (msg instanceof String){ return getTextMsg((String) msg); }else{ if(msg instanceof JSONObject){ JSONObject message = (JSONObject) msg; if(message.getInteger("type")!=null&&message.getString("content")!=null){ message.put("createTime",new Date()); return JSONObject.toJSONString(message); }else{ return getTextMsg(JSONObject.toJSONString(msg)); } }else{ return getTextMsg(JSONObject.toJSONString(msg)); } } }/** * 处理后台生成的消息 * @param type 消息类型 * @param content 消息对象 * @return */ public static String handNoticeMsg(Integer type,Object content){ return JSONObject.toJSONString(getEhuMessage(content,type)); }/** * 打开链接返回数据逻辑处理 */ public static void onOpen(String userId, Session session){ /** * 本人获取所有的用户 */ List uids = new ArrayList<>(); for (String uid : WebSocketUtils.getAllOnlineUser()) { uids.add(uid.replaceAll("mws_","")); } session.getAsyncRemote().sendText(getOpenMsg(uids)); List sessions = WebSocketUtils.getOtherSession(userId); if (sessions.size() > 0) { for (Session s : sessions) { s.getAsyncRemote().sendText(getAddUserMsg(userId)); } } }/** * 获取所有用户消息体 * @param obj * @return */ private static String getOpenMsg(Object obj){ Map model = new HashMap(); model.put("type",OPEN_CONNECT); model.put("content",obj); model.put("createTime",new Date()); return JSONObject.toJSONString(model); }/** * 新用户加入消息体 * @param uid * @return */ private static String getAddUserMsg(String uid){ Map model = getEhuMessage(uid, ADD_USER); return JSONObject.toJSONString(model); }/** * 普通文本消息体 * @param content * @return */ private static String getTextMsg(String content){ Map model = getEhuMessage(content, TEXT_MSG); return JSONObject.toJSONString(model); }private static Map getEhuMessage(Object content, Integer textMsg) { Map model = new HashMap(); model.put("type", textMsg); model.put("content", content); model.put("createTime", new Date()); return model; } }

前端demo concurrency.html
并发测试连接数 - 锐客网 当前并发个数为:0

进行测试 运行后端demo;使用谷歌浏览器打开concurrency.html。之后会不停的创建链接websocket。如下图
websoket实战|使用websocket做及时通讯功能————04tomcat版本做即时通讯的问题记载
文章图片

当链接超过256个之后,我们查看后台控制台:
websoket实战|使用websocket做及时通讯功能————04tomcat版本做即时通讯的问题记载
文章图片

我们发现链接竟然不打印了。这说明,其实现在后台仅仅能处理256个链接,
让我们把页面调回前端:
websoket实战|使用websocket做及时通讯功能————04tomcat版本做即时通讯的问题记载
文章图片

我们发现请求在256个之后就不再创建websocket通道了,但是数字仍在继续增长。
当请求到达512以后,开始出现错误,错误如下:
websoket实战|使用websocket做及时通讯功能————04tomcat版本做即时通讯的问题记载
文章图片

从错误中我们看见了醒目的:ERROR_INSUFFICIENT_RESOURCES
这是啥意思呢,意思就是tomcat搞不过来了。服务器资源耗尽了。
此时我们关闭websocket页面,查看控制台。
websoket实战|使用websocket做及时通讯功能————04tomcat版本做即时通讯的问题记载
文章图片

看见没有,关闭页面本应该是websocket通道全部关闭了。但是从控制台我们得到的结果却是没有全部关闭,还剩下一部分。
因此我们从这里可以得出两个问题:
1.再默认的tomcat下,能够处理的websocket请求有256个
2.当不超过512个之前tomcat将256-512的请求进行了就收,但没有处理
3.当链接超过512个以后 tomcat资源被耗尽
4.并发下关闭请求,会造成服务端websocket通道小概率不会关闭。
哦,忘了。在网上看见这样一篇博客。Tomcat下WebSocket最大连接数测试,上面提到默认情况下,Tomcat的WebSocket最大连接数为200。感觉我的测试结果跟他的不一样。而且我修改了以后也没有啥效果,不知道博主怎么实现的。
websoket实战|使用websocket做及时通讯功能————04tomcat版本做即时通讯的问题记载
文章图片

后面我会找时间继续更新处理的方法…

    推荐阅读