Web|WebSocket集群解决方案

一、问题起因 最近做项目时遇到了需要多用户之间通信的问题,涉及到了WebSocket握手请求,以及集群中WebSocket Session共享的问题。
期间我经过了几天的研究,总结出了几个实现分布式WebSocket集群的办法,从zuul到spring cloud gateway的不同尝试。
以下是我的场景描述:

  • ?资源 :4台服务器。其中只有一台服务器具备ssl认证域名,一台redis+mysql服务器,两台应用服务器(集群)
  • 应用发布限制条件 :由于场景需要,应用场所需要ssl认证的域名才能发布。因此ssl认证的域名服务器用来当api网关,负责https请求与wss(安全认证的ws)连接。俗称https卸载,用户请求https域名服务器(eg:https://oiscircle.com/xxx),但真实访问到的是http+ip地址的形式。只要网关配置高,能handle多个应用
  • 需求 :用户登录应用,需要与服务器建立wss连接,不同角色之间可以单发消息,也可以群发消息
  • 集群中的应用服务类型 :每个集群实例都负责http无状态请求服务与ws长连接服务

二、系统架构图 Web|WebSocket集群解决方案
文章图片

在我的实现里,每个应用服务器都负责http and ws请求,其实也可以将ws请求建立的聊天模型单独成立为一个模块。从分布式的角度来看,这两种实现类型差不多,但从实现方便性来说,一个应用服务http+ws请求的方式更为方便,下文会有解释。本文涉及的技术栈:
  • Eureka 服务发现与注册
  • Redis Session共享
  • Redis 消息订阅
  • Spring Boot
  • Zuul 网关
  • Spring Cloud Gateway 网关
  • Spring WebSocket 处理长连接
  • Ribbon 负载均衡
  • Netty 多协议NIO网络通信框架
  • Consistent Hash 一致性哈希算法

三、?技术可行性分析下面我将描述session特性,以及根据这些特性列举出n个解决分布式架构中处理ws请求的集群方案:WebSocketSession与HttpSession
【Web|WebSocket集群解决方案】在Spring所集成的WebSocket里面,每个ws连接都有一个对应的session:WebSocketSession,在Spring WebSocket中,我们建立ws连接之后可以通过类似这样的方式进行与客户端的通信:
protected void handleTextMessage(WebSocketSession session, TextMessage message) { System.out.println("服务器接收到的消息: "+ message ); //send message to client session.sendMessage(new TextMessage("message")); }

那么问题来了:ws的session无法序列化到redis,因此在集群中,我们无法将所有WebSocketSession都缓存到redis进行session共享。每台服务器都有各自的session。于此相反的是HttpSession,redis可以支持httpsession共享,但是目前没有websocket session共享的方案,因此走redis websocket session共享这条路是行不通的。
有的人可能会想:我可不可以将sessin关键信息缓存到redis,集群中的服务器从redis拿取session关键信息然后重新构建websocket session...我只想说这种方法如果有人能试出来,请告诉我一声...
以上便是websocket session与http session共享的区别:总的来说就是http session共享已经有解决方案了,而且很简单,只要引入相关依赖:spring-session-data-redisspring-boot-starter-redis。而websocket session共享的方案由于websocket底层实现的方式,我们无法做到真正的websocket session共享。

四、解决方案的演变 Netty与Spring WebSocket 刚开始的时候,我尝试着用netty实现了websocket服务端的搭建。在netty里面,并没有websocket session这样的概念,与其类似的是channel,每一个客户端连接都代表一个channel。前端的ws请求通过netty监听的端口,走websocket协议进行ws握手连接之后,通过一些列的handler(责链模式)进行消息处理。与websocket session类似地,服务端在连接建立后有一个channel,我们可以通过channel进行与客户端的通信。
/** * TODO 根据服务器传进来的id,分配到不同的group */ private static final ChannelGroup GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //retain增加引用计数,防止接下来的调用引用失效 System.out.println("服务器接收到来自 " + ctx.channel().id() + " 的消息: " + msg.text()); //将消息发送给group里面的所有channel,也就是发送消息给客户端 GROUP.writeAndFlush(msg.retain()); }

那么,服务端用netty还是用spring websocket?我将从几个方面列举这两种实现方式的优缺点。

使用netty实现websocket玩过netty的人都知道netty是的线程模型是nio模型,并发量非常高,spring5之前的网络线程模型是servlet实现的,而servlet不是nio模型,所以在spring5之后,spring的底层网络实现采用了netty。如果我们单独使用netty来开发websocket服务端,速度快是绝对的,但是可能会遇到下列问题:
  • 与系统的其他应用集成不方便,在rpc调用的时候,无法享受springcloud里feign服务调用的便利性
  • 业务逻辑可能要重复实现
  • 使用netty可能需要重复造轮子
  • 怎么连接上服务注册中心,也是一件麻烦的事情
  • restful服务与ws服务需要分开实现,如果在netty上实现restful服务,有多麻烦可想而知,用spring一站式restful开发相信很多人都习惯了。

使用spring websocket实现ws服务spring websocket已经被springboot很好地集成了,所以在springboot上开发ws服务非常方便,做法非常简单:
第一步:添加依赖
org.springframework.boot spring-boot-starter-websocket

第二步:添加配置类
@Configuration public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(myHandler(), "/") .setAllowedOrigins("*"); } @Bean public WebSocketHandler myHandler() { return new MessageHandler(); } }

第三步:实现消息监听类
@Component @SuppressWarnings("unchecked") public class MessageHandler extends TextWebSocketHandler { private List clients = new ArrayList<>(); @Override public void afterConnectionEstablished(WebSocketSession session) { clients.add(session); System.out.println("uri :" + session.getUri()); System.out.println("连接建立: " + session.getId()); System.out.println("current seesion: " + clients.size()); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { clients.remove(session); System.out.println("断开连接: " + session.getId()); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) { String payload = message.getPayload(); Map map = JSONObject.parseObject(payload, HashMap.class); System.out.println("接受到的数据" + map); clients.forEach(s -> { try { System.out.println("发送消息给: " + session.getId()); s.sendMessage(new TextMessage("服务器返回收到的信息," + payload)); } catch (Exception e) { e.printStackTrace(); } }); } }


使用spring websocket更简单的写法 第一步:添加配置类
@Configuration public class WebSocketConfig { /** * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }

第二步:实现消息监听类
@ServerEndpoint("/websocket/{sid}") @Component @Slf4j public class WebSocketServer {/** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; /** * 接收sid */ private String sid = ""; /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("sid") String sid) throws IOException { printThread(); this.session = session; this.sid = sid; WebSocketManager.addWebSocketServer(sid, this); log.info("有新窗口开始监听: {}, 当前在线人数为: {}, sessionId={}", sid, WebSocketManager.getOnlineCount(), session.getId()); if (!session.isOpen()) { throw new RuntimeException(String.format("Session isn't opened: sid=%s, sessionId=%s", sid, session.getId())); } else { WebSocketMessageSender.sendMessage(sid, session, "连接成功"); } }/** * 连接关闭调用的方法 */ @OnClose public void onClose() { printThread(); //从set中删除 WebSocketManager.removeWebSocketServer(this.sid, this); log.info("有一连接关闭!当前在线人数为" + WebSocketManager.getOnlineCount()); }/** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message, Session session) throws IOException { printThread(); log.info("收到消息来自窗口: sid={}, message={}, sessionId={}", this.sid, message, session.getId()); CopyOnWriteArraySet webSocketSet = WebSocketManager.getWebSocketServer(this.sid); if (Objects.isNull(webSocketSet) || webSocketSet.isEmpty()) { WebSocketMessageSender.sendMessage(this.sid, session, String.format("未找到可用的连接通道: sid=%s", this.sid)); return; }//群发消息 for (WebSocketServer item : webSocketSet) { WebSocketMessageSender.sendMessage(this.sid, item.session, message); } }@OnError public void onError(Session session, Throwable error) { printThread(); log.error("发生错误: sid={}, sessionId={}", this.sid, session.getId(), error); }public Session getSession() { return session; }public String getSid() { return sid; }private void printThread() { Thread thread = Thread.currentThread(); ThreadGroup threadGroup = thread.getThreadGroup(); log.info("=================[thread={},threadGroup={},threadActiveInGroup={}]==================", thread.getName(), threadGroup.getName(), threadGroup.activeCount()); } }


五、Session广播 这是最简单的websocket集群通讯解决方案。场景如下: (教师A想要群发消息给他的学生们)
  • 教师的消息请求发给网关,内容包含{我是教师A,我想把xxx消息发送我的学生们}
  • 网关接收到消息,获取集群所有ip地址,逐个调用教师的请求
  • 集群中的每台服务器获取请求,根据教师A的信息查找本地有没有与学生关联的session,有则调用sendMessage方法,没有则忽略请求
Web|WebSocket集群解决方案
文章图片

session广播实现很简单,但是有一个致命缺陷:计算力浪费现象,当服务器没有消息接收者session的时候,相当于浪费了一次循环遍历的计算力,该方案在并发需求不高的情况下可以优先考虑,实现很容易。

spring cloud中获取服务集群中每台服务器信息的方法如下:
@Resource private EurekaClient eurekaClient; Application app = eurekaClient.getApplication("service-name"); //instanceInfo包括了一台服务器ip,port等消息 InstanceInfo instanceInfo = app.getInstances().get(0); System.out.println("ip address: " + instanceInfo.getIPAddr());

服务器需要维护关系映射表,将用户的id与session做映射,session建立时在映射表中添加映射关系,session断开后要删除映射表内关联关系。

六、一致性哈希算法实现 首先,想要将一致性哈希算法的思想应用到我们的websocket集群,我们需要解决以下新问题:
  • 集群节点DOWN,会影响到哈希环映射到状态是DOWN的节点。
  • 集群节点UP,会影响到旧key映射不到对应的节点。
  • 哈希环读写共享。
在集群中,总会出现服务UP/DOWN的问题。

针对节点DOWN的问题 一个服务器DOWN的时候,其拥有的websocket session会自动关闭连接,并且前端会收到通知。此时会影响到哈希环的映射错误。我们只需要当监听到服务器DOWN的时候,删除哈希环上面对应的实际结点和虚结点,避免让网关转发到状态是DOWN的服务器上。
实现方法:在eureka治理中心监听集群服务DOWN事件,并及时更新哈希环。

针对节点UP的问题 现假设集群中有服务 CacheB上线了,该服务器的ip地址刚好被映射到key1和 cacheA之间。那么key1对应的用户每次要发消息时都跑去 CacheB发送消息,结果明显是发送不了消息,因为 CacheB没有key1对应的session。
Web|WebSocket集群解决方案
文章图片
此时我们有两种解决方案。

方案A简单,动作大:
eureka监听到节点UP事件之后,根据现有集群信息,更新哈希环。并且断开所有session连接,让客户端重新连接,此时客户端会连接到更新后的哈希环节点,以此避免消息无法送达的情况。
方案B复杂,动作小:
我们先看看没有虚拟节点的情况,假设 CacheC和CacheA之间上线了服务器 CacheB。所有映射在 CacheC到 CacheB的用户发消息时都会去 CacheB里面找session发消息。也就是说 CacheB一但上线,便会影响到 CacheC到 CacheB之间的用户发送消息。所以我们只需要将断开 CacheC到 CacheB的用户所对应的session,让客户端重连。
Web|WebSocket集群解决方案
文章图片

虚拟节点 接下来是有虚拟节点的情况,假设浅色的节点是虚拟节点。我们用长括号来代表某段区域映射的结果属于某个 Cache。首先是C节点未上线的情况。所有B的虚拟节点都会指向真实的B节点,所以所有B节点逆时针那一部分都会映射到B(因为我们规定哈希环顺时针查找)。
Web|WebSocket集群解决方案
文章图片


接下来是C节点上线的情况,可以看到某些区域被C占领了。
Web|WebSocket集群解决方案
文章图片

由以上情况我们可以知道:节点上线,会有许多对应虚拟节点也同时上线,因此我们需要将多段范围key对应的session断开连接(上图红色的部分)。具体算法有点复杂,实现的方式因人而异,大家可以尝试一下自己实现算法。

哈希环应该放在哪里?
  • gateway本地创建并维护哈希环。当ws请求进来的时候,本地获取哈希环并获取映射服务器信息,转发ws请求。这种方法看上去不错,但实际上是不太可取的,回想一下上面服务器DOWN的时候只能通过eureka监听,那么eureka监听到DOWN事件之后,需要通过io来通知gateway删除对应节点吗?显然太麻烦了,将eureka的职责分散到gateway,不建议这么做。
  • eureka创建,并放到redis共享读写。这个方案可行,当eureka监听到服务DOWN的时候,修改哈希环并推送到redis上。为了请求响应时间尽量地短,我们不可以让gateway每次转发ws请求的时候都去redis取一次哈希环。哈希环修改的概率的确很低,gateway只需要应用redis的消息订阅模式,订阅哈希环修改事件便可以解决此问题。

至此我们的spring websocket集群已经搭建的差不多了,最重要的地方还是一致性哈希算法。现在有最后一个技术瓶颈,网关如何根据ws请求转发到指定的集群服务器上?
答案在负载均衡。spring cloud gateway或zuul都默认集成了ribbon作为负载均衡,我们只需要根据建立ws请求时客户端发来的user id,重写ribbon负载均衡算法,根据user id进行hash,并在哈希环上寻找ip,并将ws请求转发到该ip便完事了。流程如下图所示:
Web|WebSocket集群解决方案
文章图片

接下来用户沟通的时候,只需要根据id进行hash,在哈希环上获取对应ip,便可以知道与该用户建立ws连接时的session存在哪台服务器上了!

七、部分源码 xiaxinyu/spring-websocketWeb|WebSocket集群解决方案
文章图片
https://gitee.com/xiaxinyu3_admin/spring-websocket.git

    推荐阅读