zookeeper|Zookeeper案例

1. 统一配置中心 在分布式集群系统中,经常要对某个配置信息进行修改,若一台一台对服务器进行修改,非常浪费时间,假如服务的体量非常大,几十台上百台甚至上千台服务器进行修改的话,几乎很不现实。zookeeper通过对节点数据进行监听,一旦配置信息发生变化,立马通知所有的客户端对数据进行重新读取。
下面通过Java代码的方式对统一配置中心进行简单的实现。
1.1 客户端

package cn.tianqb.zookeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import java.util.concurrent.CountDownLatch; /** * @Description: 客户端 * @Author tianqb * @Mail tianqingbo@tianqb.cn * @Date 2020/5/21 20:26 * @Version v1.0 */ public class ZKClient {public ZooKeeper zooKeeper; public CountDownLatch countDownLatch = new CountDownLatch(1); // zookeeper服务器地址 private String IP; // 会话超时时间 private Integer SESSION_TIME_OUT; // url信息 private String URL_PATH; // 用户名信息 private String USER_PATH; // 密码信息 private String PASSWD_PATH; public ZKClient(String IP, Integer SESSION_TIME_OUT, String URL_PATH, String USER_PATH, String PASSWD_PATH) { this.IP = IP; this.SESSION_TIME_OUT = SESSION_TIME_OUT; this.URL_PATH = URL_PATH; this.USER_PATH = USER_PATH; this.PASSWD_PATH = PASSWD_PATH; init(); }/** * 连接至服务器 */ private void init() {ZKWatcher watcher = new ZKWatcher(countDownLatch, this); try { zooKeeper = new ZooKeeper(IP, SESSION_TIME_OUT, watcher); countDownLatch.await(); pull(); } catch (Exception e) { e.printStackTrace(); } }public void close() { try { zooKeeper.close(); } catch (InterruptedException e) { e.printStackTrace(); } }/** * 接到通知重新拉取数据 */ public void pull() {ZKWatcher watcher = new ZKWatcher(countDownLatch, this); try { System.out.println("客户端" + zooKeeper.getSessionId() + "---url:" + new String(zooKeeper.getData(URL_PATH, watcher, null))); System.out.println("客户端" + zooKeeper.getSessionId() + "---user:" + new String(zooKeeper.getData(USER_PATH, watcher, null))); System.out.println("客户端" + zooKeeper.getSessionId() + "---passwd:" + new String(zooKeeper.getData(PASSWD_PATH, watcher, null))); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }

1.2 常量
package cn.tianqb.zookeeper; /** * @Description: 常量 * @Author tianqb * @Mail tianqingbo@tianqb.cn * @Date 2020/5/21 20:34 * @Version v1.0 */ public class ZKConstant { // zookeeper服务器地址 public static final String IP = "192.168.100.1:2181"; // 会话超时时间 public static final Integer SESSION_TIME_OUT = 5000; // url信息 public static final String URL_PATH = "/conf/url"; // 用户名信息 public static final String USER_PATH = "/conf/user"; // 密码信息 public static final String PASSWD_PATH = "/conf/passwd"; }

1.3 监听器
package cn.tianqb.zookeeper; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.util.concurrent.CountDownLatch; /** * @Description: 监听器 * @Author tianqb * @Mail tianqingbo@tianqb.cn * @Date 2020/5/21 20:28 * @Version v1.0 */ public class ZKWatcher implements Watcher {private CountDownLatch countDownLatch; private ZKClient zkClient; private ZKWatcher() { }public ZKWatcher(CountDownLatch countDownLatch, ZKClient zkClient) { this.countDownLatch = countDownLatch; this.zkClient = zkClient; }@Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getType() == Event.EventType.None) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("服务连接成功"); countDownLatch.countDown(); } }else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) { // 数据发生变化,客户端拉取数据 zkClient.pull(); } } }

1.4 测试
package cn.tianqb.zookeeper; /** * @Description: * @Author tianqb * @Mail tianqingbo@tianqb.cn * @Date 2020/5/21 20:46 * @Version v1.0 */ public class ZKMain { public static void main(String[] args) {for (int i = 0; i < 10; i++) { Thread thread = new Thread(() -> { ZKClient zkClient = new ZKClient(ZKConstant.IP, ZKConstant.SESSION_TIME_OUT, ZKConstant.URL_PATH, ZKConstant.USER_PATH, ZKConstant.PASSWD_PATH); try { Thread.sleep(60000); zkClient.close(); } catch (InterruptedException e) { e.printStackTrace(); } }); thread.start(); } } }

2. 分布式唯一ID 【zookeeper|Zookeeper案例】适用的场景数据库进行分库分表时,插入数据无法使用auto_increment,这时需要数据的唯一性,即使在不同的服务器上!
案例代码如下:
2.1 生成ID类
package cn.tianqb.zookeeper; import org.apache.zookeeper.*; import java.util.concurrent.CountDownLatch; /** * @Description: 分布式唯一ID * @Author tianqb * @Mail tianqingbo@tianqb.cn * @Date 2020/5/22 13:13 * @Version v1.0 */ public class GeneralUniqueId implements Watcher {private CountDownLatch countDownLatch = new CountDownLatch(1); private ZooKeeper zooKeeper; private String BASE_URL = "/uniqueId_"; public GeneralUniqueId() { init(); }@Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getType() == Event.EventType.None) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("连接成功!"); countDownLatch.countDown(); } } }private void init() { try { zooKeeper = new ZooKeeper(ZKConstant.IP, ZKConstant.SESSION_TIME_OUT, this); countDownLatch.await(); //general(); } catch (Exception e) { e.printStackTrace(); } }public String general() { String path; try { path = zooKeeper.create(BASE_URL, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); return path.substring(10); } catch (Exception e) { e.printStackTrace(); return null; } }public void close(){ if (zooKeeper != null) { try { zooKeeper.close(); } catch (InterruptedException e) { e.printStackTrace(); } } }public static void main(String[] args) { GeneralUniqueId generalUniqueId = new GeneralUniqueId(); for (int i = 0; i < 10; i++) { String id = generalUniqueId.general(); System.out.println(id); } generalUniqueId.close(); } }

2.2 测试
package cn.tianqb.zookeeper; /** * @Description: * @Author tianqb * @Mail tianqingbo@tianqb.cn * @Date 2020/5/22 15:58 * @Version v1.0 */ public class GeneralUniqueIdMain {public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(() -> { GeneralUniqueId generalUniqueId = new GeneralUniqueId(); String id = generalUniqueId.general(); System.out.println(id); generalUniqueId.close(); }).start(); } } }

3. 分布式锁 多个服务调用某个服务时,需要对访问进行同步处理,保证同一时刻只允许一个服务/事务存在。
案例代码如下:
3.1 分布式锁类
package cn.tianqb.zookeeper.lock; import cn.tianqb.zookeeper.ZKConstant; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Test; import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; /** * @Description: 分布式锁 * @Author tianqb * @Mail tianqingbo@tianqb.cn * @Date 2020/5/22 16:08 * @Version v1.0 */ public class ZKLock {private CountDownLatch countDownLatch = new CountDownLatch(1); private ZooKeeper zooKeeper; private String BASE_URL = "/lock"; private String NODE_URL = "lock_"; private String lockId; public void init() { try { zooKeeper = new ZooKeeper(ZKConstant.IP, ZKConstant.SESSION_TIME_OUT, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getType() == Event.EventType.None) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("客户端" + zooKeeper.getSessionId() + "成功连接服务器!"); countDownLatch.countDown(); } } } }); countDownLatch.await(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }public ZKLock() { init(); }/** * 创建节点信息 */ public void create() { try { // 判断根节点是否存在 Stat stat = zooKeeper.exists(BASE_URL, false); if (stat == null) { zooKeeper.create(BASE_URL, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } // 创建锁节点 lockId = zooKeeper.create(BASE_URL + "/" + NODE_URL, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } catch (Exception e) { e.printStackTrace(); } }public void acquireLock() { create(); lock(); }Watcher watcher = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getType() == Event.EventType.NodeDeleted) { synchronized (this) { // 一旦监听到节点删除 // 通知所有客户端 notifyAll(); } } } }; /** * 获取锁 */ public void lock() { try { // 获取子节点 List> children = zooKeeper.getChildren(BASE_URL, false); Collections.sort(children); int index = children.indexOf(lockId.substring(BASE_URL.length() + 1)); // 判断当前节点是否在第一位 if (index == 0) { System.out.println("-------------------------------"); System.out.println("客户端" + zooKeeper.getSessionId() + "成功获取锁!"); return; } else { // 获取上一个节点的路径 String s = children.get(index - 1); Stat stat = zooKeeper.exists(BASE_URL + "/" + s, watcher); // 存在节点使用完已经删除的情况 if (stat == null) { // 再次获取锁 lock(); } else { // 阻塞当前线程 synchronized (watcher) { watcher.wait(); } lock(); } }} catch (Exception e) { e.printStackTrace(); } }/** * 释放锁 */ public void unlock() { try { zooKeeper.delete(lockId, -1); System.out.println("客户端" + zooKeeper.getSessionId() + "锁已经释放!"); System.out.println("-------------------------------"); if (zooKeeper != null) { zooKeeper.close(); } } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } }}

3.2 测试
package cn.tianqb.zookeeper.lock; /** * @Description: * @Author tianqb * @Mail tianqingbo@tianqb.cn * @Date 2020/5/22 19:01 * @Version v1.0 */ public class ZKLockMain {public static void main(String[] args) { for (int i = 0; i < 10; i++){ new Thread(() -> { ZKLock zkLock = new ZKLock(); zkLock.acquireLock(); try { System.out.println("插入数据!!!"); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } zkLock.unlock(); }).start(); } } }

    推荐阅读