1. 统一配置中心 在分布式集群系统中,经常要对某个配置信息进行修改,若一台一台对服务器进行修改,非常浪费时间,假如服务的体量非常大,几十台上百台甚至上千台服务器进行修改的话,几乎很不现实。zookeeper通过对节点数据进行监听,一旦配置信息发生变化,立马通知所有的客户端对数据进行重新读取。
下面通过Java代码的方式对统一配置中心进行简单的实现。
1.1 客户端
package cn.tianqb.zookeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
/**
* @Description: 客户端
* @Author tianqb
* @Mail tianqingbo@tianqb.cn
* @Date 2020/5/21 20:26
* @Version v1.0
*/
public class ZKClient {public ZooKeeper zooKeeper;
public CountDownLatch countDownLatch = new CountDownLatch(1);
// zookeeper服务器地址
private String IP;
// 会话超时时间
private Integer SESSION_TIME_OUT;
// url信息
private String URL_PATH;
// 用户名信息
private String USER_PATH;
// 密码信息
private String PASSWD_PATH;
public ZKClient(String IP, Integer SESSION_TIME_OUT, String URL_PATH, String USER_PATH, String PASSWD_PATH) {
this.IP = IP;
this.SESSION_TIME_OUT = SESSION_TIME_OUT;
this.URL_PATH = URL_PATH;
this.USER_PATH = USER_PATH;
this.PASSWD_PATH = PASSWD_PATH;
init();
}/**
* 连接至服务器
*/
private void init() {ZKWatcher watcher = new ZKWatcher(countDownLatch, this);
try {
zooKeeper = new ZooKeeper(IP, SESSION_TIME_OUT, watcher);
countDownLatch.await();
pull();
} catch (Exception e) {
e.printStackTrace();
}
}public void close() {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}/**
* 接到通知重新拉取数据
*/
public void pull() {ZKWatcher watcher = new ZKWatcher(countDownLatch, this);
try {
System.out.println("客户端" + zooKeeper.getSessionId() + "---url:" + new String(zooKeeper.getData(URL_PATH, watcher,
null)));
System.out.println("客户端" + zooKeeper.getSessionId() + "---user:" + new String(zooKeeper.getData(USER_PATH, watcher,
null)));
System.out.println("客户端" + zooKeeper.getSessionId() + "---passwd:" + new String(zooKeeper.getData(PASSWD_PATH, watcher,
null)));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
1.2 常量
package cn.tianqb.zookeeper;
/**
* @Description: 常量
* @Author tianqb
* @Mail tianqingbo@tianqb.cn
* @Date 2020/5/21 20:34
* @Version v1.0
*/
public class ZKConstant {
// zookeeper服务器地址
public static final String IP = "192.168.100.1:2181";
// 会话超时时间
public static final Integer SESSION_TIME_OUT = 5000;
// url信息
public static final String URL_PATH = "/conf/url";
// 用户名信息
public static final String USER_PATH = "/conf/user";
// 密码信息
public static final String PASSWD_PATH = "/conf/passwd";
}
1.3 监听器
package cn.tianqb.zookeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.CountDownLatch;
/**
* @Description: 监听器
* @Author tianqb
* @Mail tianqingbo@tianqb.cn
* @Date 2020/5/21 20:28
* @Version v1.0
*/
public class ZKWatcher implements Watcher {private CountDownLatch countDownLatch;
private ZKClient zkClient;
private ZKWatcher() {
}public ZKWatcher(CountDownLatch countDownLatch, ZKClient zkClient) {
this.countDownLatch = countDownLatch;
this.zkClient = zkClient;
}@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.None) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
System.out.println("服务连接成功");
countDownLatch.countDown();
}
}else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
// 数据发生变化,客户端拉取数据
zkClient.pull();
}
}
}
1.4 测试
package cn.tianqb.zookeeper;
/**
* @Description:
* @Author tianqb
* @Mail tianqingbo@tianqb.cn
* @Date 2020/5/21 20:46
* @Version v1.0
*/
public class ZKMain {
public static void main(String[] args) {for (int i = 0;
i < 10;
i++) {
Thread thread = new Thread(() -> {
ZKClient zkClient = new ZKClient(ZKConstant.IP, ZKConstant.SESSION_TIME_OUT, ZKConstant.URL_PATH, ZKConstant.USER_PATH,
ZKConstant.PASSWD_PATH);
try {
Thread.sleep(60000);
zkClient.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.start();
}
}
}
2. 分布式唯一ID 【zookeeper|Zookeeper案例】适用的场景数据库进行分库分表时,插入数据无法使用auto_increment,这时需要数据的唯一性,即使在不同的服务器上!
案例代码如下:
2.1 生成ID类
package cn.tianqb.zookeeper;
import org.apache.zookeeper.*;
import java.util.concurrent.CountDownLatch;
/**
* @Description: 分布式唯一ID
* @Author tianqb
* @Mail tianqingbo@tianqb.cn
* @Date 2020/5/22 13:13
* @Version v1.0
*/
public class GeneralUniqueId implements Watcher {private CountDownLatch countDownLatch = new CountDownLatch(1);
private ZooKeeper zooKeeper;
private String BASE_URL = "/uniqueId_";
public GeneralUniqueId() {
init();
}@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.None) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功!");
countDownLatch.countDown();
}
}
}private void init() {
try {
zooKeeper = new ZooKeeper(ZKConstant.IP, ZKConstant.SESSION_TIME_OUT, this);
countDownLatch.await();
//general();
} catch (Exception e) {
e.printStackTrace();
}
}public String general() {
String path;
try {
path = zooKeeper.create(BASE_URL, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
return path.substring(10);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}public void close(){
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}public static void main(String[] args) {
GeneralUniqueId generalUniqueId = new GeneralUniqueId();
for (int i = 0;
i < 10;
i++) {
String id = generalUniqueId.general();
System.out.println(id);
}
generalUniqueId.close();
}
}
2.2 测试
package cn.tianqb.zookeeper;
/**
* @Description:
* @Author tianqb
* @Mail tianqingbo@tianqb.cn
* @Date 2020/5/22 15:58
* @Version v1.0
*/
public class GeneralUniqueIdMain {public static void main(String[] args) {
for (int i = 0;
i < 10;
i++) {
new Thread(() -> {
GeneralUniqueId generalUniqueId = new GeneralUniqueId();
String id = generalUniqueId.general();
System.out.println(id);
generalUniqueId.close();
}).start();
}
}
}
3. 分布式锁 多个服务调用某个服务时,需要对访问进行同步处理,保证同一时刻只允许一个服务/事务存在。
案例代码如下:
3.1 分布式锁类
package cn.tianqb.zookeeper.lock;
import cn.tianqb.zookeeper.ZKConstant;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @Description: 分布式锁
* @Author tianqb
* @Mail tianqingbo@tianqb.cn
* @Date 2020/5/22 16:08
* @Version v1.0
*/
public class ZKLock {private CountDownLatch countDownLatch = new CountDownLatch(1);
private ZooKeeper zooKeeper;
private String BASE_URL = "/lock";
private String NODE_URL = "lock_";
private String lockId;
public void init() {
try {
zooKeeper = new ZooKeeper(ZKConstant.IP, ZKConstant.SESSION_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.None) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
System.out.println("客户端" + zooKeeper.getSessionId() + "成功连接服务器!");
countDownLatch.countDown();
}
}
}
});
countDownLatch.await();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}public ZKLock() {
init();
}/**
* 创建节点信息
*/
public void create() {
try {
// 判断根节点是否存在
Stat stat = zooKeeper.exists(BASE_URL, false);
if (stat == null) {
zooKeeper.create(BASE_URL, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
// 创建锁节点
lockId = zooKeeper.create(BASE_URL + "/" + NODE_URL, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (Exception e) {
e.printStackTrace();
}
}public void acquireLock() {
create();
lock();
}Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
synchronized (this) {
// 一旦监听到节点删除
// 通知所有客户端
notifyAll();
}
}
}
};
/**
* 获取锁
*/
public void lock() {
try {
// 获取子节点
List> children = zooKeeper.getChildren(BASE_URL, false);
Collections.sort(children);
int index = children.indexOf(lockId.substring(BASE_URL.length() + 1));
// 判断当前节点是否在第一位
if (index == 0) {
System.out.println("-------------------------------");
System.out.println("客户端" + zooKeeper.getSessionId() + "成功获取锁!");
return;
} else {
// 获取上一个节点的路径
String s = children.get(index - 1);
Stat stat = zooKeeper.exists(BASE_URL + "/" + s, watcher);
// 存在节点使用完已经删除的情况
if (stat == null) {
// 再次获取锁
lock();
} else {
// 阻塞当前线程
synchronized (watcher) {
watcher.wait();
}
lock();
}
}} catch (Exception e) {
e.printStackTrace();
}
}/**
* 释放锁
*/
public void unlock() {
try {
zooKeeper.delete(lockId, -1);
System.out.println("客户端" + zooKeeper.getSessionId() + "锁已经释放!");
System.out.println("-------------------------------");
if (zooKeeper != null) {
zooKeeper.close();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}}
3.2 测试
package cn.tianqb.zookeeper.lock;
/**
* @Description:
* @Author tianqb
* @Mail tianqingbo@tianqb.cn
* @Date 2020/5/22 19:01
* @Version v1.0
*/
public class ZKLockMain {public static void main(String[] args) {
for (int i = 0;
i < 10;
i++){
new Thread(() -> {
ZKLock zkLock = new ZKLock();
zkLock.acquireLock();
try {
System.out.println("插入数据!!!");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
zkLock.unlock();
}).start();
}
}
}
推荐阅读
- 分布式集群|一起进阶学习JAVA(Zookeeper)
- zookeeper|zookeeper的简单例子带你认识zk的API
- zookeeper使用IDEA测试API出现KeeperException$ConnectionLossException解决思路
- Zookeeper集群与Curator客户端
- Java|乐观锁和悲观锁在zookeeper中的应用
- 拜占庭将军-分布式领域的幽灵
- java|应用 | Redis实现 主从,单例,集群,哨兵,配置应用
- spring-boot|zookeeper与grpc集成实现服务注册与发现
- zookeeper 启动报错 : KeeperErrorCode = NoNode for /XXX