大数据|zookeeper学习(六)ZooKeeper实现软负载均衡

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,提供的功能包括配置维护、名字服务、分布式同步、组服务等。
ZooKeeper会维护一个树形的数据结构,类似于Windows资源管理器目录,其中EPHEMERAL类型的节点会随着创建它的客户端断开而被删除,利用这个特性很容易实现软负载均衡。
一、获取服务器列表,通过随机数,客户端随机获取一台服务器进行访问
基本原理是,每个应用的Server启动时创建一个EPHEMERAL节点,应用客户端通过读取节点列表获得可用服务器列表,并订阅节点事件,有Server宕机断开时触发事件,客户端监测到后把该Server从可用列表中删除。
在项目中添加zkclient的maven依赖
[java]view plain copy

  1. com.101tec
  2. zkclient
  3. 0.8
Server代码

[java]view plain copy
  1. import java.io.BufferedReader;
  2. import java.io.IOException;
  3. import java.io.InputStreamReader;
  4. import java.io.PrintWriter;
  5. import java.net.ServerSocket;
  6. import java.net.Socket;
  7. import org.I0Itec.zkclient.ZkClient;
  8. public class SimpleServer implements Runnable {
  9. public static void main(String[] args) throws IOException {
  10. int port = 18080;
  11. SimpleServer server = new SimpleServer(port);
  12. Thread thread = new Thread(server);
  13. thread.start();
  14. }
  15. private int port;
  16. public SimpleServer(int port) {
  17. this.port = port;
  18. }
  19. private void regServer() {
  20. //向ZooKeeper注册当前服务器
  21. ZkClient client = new ZkClient("127.0.0.1:2181", 60000, 1000);
  22. String path = "/test/server" + port;
  23. if(client.exists(path))
  24. client.delete(path);
  25. client.createEphemeral(path, "127.0.0.1:" + port);
  26. }
  27. @Override
  28. public void run() {
  29. ServerSocket server = null;
  30. try {
  31. server = new ServerSocket(port);
  32. regServer();
  33. System.out.println("Server started at " + port);
  34. Socket socket = null;
  35. while (true) {
  36. socket = server.accept();
  37. new Thread(new SimpleServerHandler(socket)).start();
  38. }
  39. } catch(IOException ex) {
  40. ex.printStackTrace();
  41. } finally {
  42. if (server != null) {
  43. try {
  44. server.close();
  45. } catch (IOException e) {}
  46. }
  47. }
  48. }
  49. }
  50. //SimpleServerHandler略
客户端代码

[java]view plain copy
  1. import java.io.BufferedReader;
  2. import java.io.IOException;
  3. import java.io.InputStreamReader;
  4. import java.io.PrintWriter;
  5. import java.net.Socket;
  6. import java.util.ArrayList;
  7. import java.util.Arrays;
  8. import java.util.List;
  9. import java.util.Random;
  10. import org.I0Itec.zkclient.IZkChildListener;
  11. import org.I0Itec.zkclient.ZkClient;
  12. public class SimpleClient {
  13. private static List servers = new ArrayList<>();
  14. public static void main(String[] args) {
  15. initServerList();
  16. SimpleClient client = new SimpleClient();
  17. BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
  18. while (true) {
  19. String name;
  20. try {
  21. name = console.readLine();
  22. if("exit".equals(name)) {
  23. System.exit(0);
  24. }
  25. client.send(name);
  26. } catch (IOException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. }
  31. private static void initServerList() {
  32. //启动时从ZooKeeper读取可用服务器
  33. String path = "/test";
  34. ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000);
  35. List childs = zkClient.getChildren(path);
  36. servers.clear();
  37. for(String p : childs) {
  38. servers.add(zkClient.readData(path + "/" + p));
  39. }
  40. //订阅节点变化事件
  41. zkClient.subscribeChildChanges("/test", new IZkChildListener() {
  42. @Override
  43. public void handleChildChange(String parentPath, List currentChilds)
  44. throws Exception {
  45. System.out.println(String.format("[ZookeeperRegistry] service list change: path=%s, currentChilds=%s", parentPath, currentChilds.toString()));
  46. servers.clear();
  47. for(String p : currentChilds) {
  48. servers.add(zkClient.readData(path + "/" + p));
  49. }
  50. System.out.println("Servers: " + servers.toString());
  51. }
  52. });
  53. }
  54. public static String getServer() {
  55. return servers.get(new Random().nextInt(servers.size()));
  56. }
  57. //其他无变化, 略
  58. }
分别启动Server和Client,然后修改Server的端口号,再启动一个实例,可以看到客户端检测到了这个新服务器的存在


二、采用计数器方式,连接上加一,断开减一,获取计数最少的连接

zookeeper本身是不提供负载均衡的策略,需要自己来实现,所以这里确切的说,是在负载均衡中应用到了zookeeper做集群的协调。
对于HTTP请求的负载均衡,成熟的解决方案是Nginx(或Haproxy) +keepalived。其中Niginx负责代理HTTP请求,通过某种均衡策略访问集群中的服务器,keepalived负责检测集群中的服务器运行情况(有故障的机器移除,机器恢复工作后重新加入)
而对于TCP层的负载均衡,比如用Apache Mina做的网络通信应用,上面那种方案明显不适合,因为网络通信客户端和服务端要保持长连接
所以要针对这种长连接做负载均衡,一般都是基于连接数这种均衡策略,也就是在第一次连接时,分配服务器IP时,取当前连接数最少的那台
集群中有几台服务器处于运行状态,每一台服务器当前连接的客户数量,最大连接数量,等等这些信息需要记录起来,然后每次做负载均衡时根据这些信息来做分配,一般首先想到的是把这些信息存放在数据库里
简单的做法就是服务器启动时,把数据库里相应的状态改为运行,有客户连接或断开时,把连接数做加数或减数运算。
当服务器关闭时,问题就来了:
1、服务器关闭,可能数据源也已经被关闭,没法操作数据库,该机器在数据库里一直处于运行状态
2、服务器宕机,这种问题就很致命,这是连关闭的程序都没有执行,更不用说能操作数据库了
解决的方式就是用zookeeper保存服务器的连接信息
1、当服务器启动时,往zookeeper的节点里写入数据(节点类型是临时节点)
【大数据|zookeeper学习(六)ZooKeeper实现软负载均衡】2、当服务器关闭时,从zookeeper移除相应的节点数据
3、当服务器宕机,zookeeper因为没有检测到心跳,自动把该节点移除,并通知其他服务器,其他服务器得知该机器已宕机,在分配连接时,不会分配到这台机器上,这点也是标题说的在负载均衡中用到zookeeper的原因。
对比了一下保存在数据库那种方式,zookeeper其实就是一个具有通知功能的数据库,也就是它底下节点数据有变化时,会通知它的所有客户端(这里的客户端指的连接到zookeeper的服务器)。
原理解析

架构图 大数据|zookeeper学习(六)ZooKeeper实现软负载均衡
文章图片
每台WorkServer启动的时候都会到Server创建临时节点。 每台ClientServer启动的时候,都会到Server节点下面取得所有WorksServer节点,并通过一定算法取得一台并与之连接。
服务端主体流程 大数据|zookeeper学习(六)ZooKeeper实现软负载均衡
文章图片
有ClientServer与之建立连接,这台WorksServer的负载计数器加一,断开连接负载计数器减一。负载计数器作为客户端负载均衡算法的依据,客户端会选择负载最轻的WorksServer建立连接。
客户端流程 大数据|zookeeper学习(六)ZooKeeper实现软负载均衡
文章图片

服务端核心类 大数据|zookeeper学习(六)ZooKeeper实现软负载均衡
文章图片
ServerRunner 调度类RegistProvider 服务端启动时的注册过程 ServerHander 处理与客户端之间的连接 DefaultBalanceUpdateProvider 连接建立与断开,修改负载信息 客户端核心类 大数据|zookeeper学习(六)ZooKeeper实现软负载均衡
文章图片
ClientRunner 调度类 ClientHander 处理与服务器之间的通信 BanceProvider 负载的算法 ServerData 服务器和客户端公用的类,计算负载等使用


    推荐阅读