Zookeeper集群与Curator客户端


目录

    • 1. Zookeeper集群搭建
        • 1.1 将集群服务器上分别放置zookeeper文件
        • 1.2 修改对应服务器上的zookeeper配置文件
        • 1.3 在dataDir的指定目录创建myid文件,文件内写入服务器的编号
        • 1.4 启动集群服务器
    • 2. zab(zookeeper atomic broadcast)原子广播协议
        • 2.1 集群角色
    • 3. Leader选举
        • 3.1 启动时期
        • 3.2 运行时期
    • 4. observer角色及配置
    • 5. Curator客户端
        • 5.1 maven依赖
        • 5.2 curator连接对象的创建以及重连机制
        • 5.3 创建节点
        • 5.4 更新节点
        • 5.5 删除节点
        • 5.6 查看节点数据
        • 5.7 查看子节点
        • 5.8 查看节点是否存在
        • 5.9 监听机制
            • 5.9.1 监听节点的新增和修改
            • 5.9.2 监听子节点变化
        • 5.10 事务操作
    • 6. 分布式锁
        • 6.1 排他锁
            • 6.1.2 测试
        • 6.2 读写锁
            • 6.2.1 测试

1. Zookeeper集群搭建 1.1 将集群服务器上分别放置zookeeper文件 1.2 修改对应服务器上的zookeeper配置文件
# zookeeper对外公布的端口 clientPort = 2181# 快照文件,即服务器对数据的持久化文件 dataDir=/tmp/zookeeper# server.A=B:C:D # A为zookeeper服务器编号 B为zookeeper服务器IP C为zookeeper服务器通信端口 D为zookeeperleader选举端口 server.1=192.168.0.1:2287:3387 server.2=192.168.0.2:2287:3387

1.3 在dataDir的指定目录创建myid文件,文件内写入服务器的编号
echo "1" > myid

1.4 启动集群服务器
./zkServer.sh start

2. zab(zookeeper atomic broadcast)原子广播协议 2.1 集群角色
角色 描述
leader 领导者,负责投票的发起和决议,更新系统状态
fallower 跟随者,接收客户端请求并向客户端返回结果,在leader选举中进行投票
observer 观察者,可以接收客户端连接,将写请求转发给leader,但observer不参与投票,只同步leader状态,observer的主要目的是扩展系统,提高读取速度
Zookeeper集群与Curator客户端
文章图片

  1. 客户端向集群任意一个服务器发送一个写请求
  2. 非leader节点将请求转发到leader节点
  3. leader节点向其他follower节点发送事务提议
  4. follower节点收到事务提议添加到历史队列中,向服务器发送响应
  5. 当leader收到超过一半以上节点的反馈后,leader向follower发送提交请求
  6. follower节点收到提交请求后将历史队列进行commit操作
3. Leader选举 3.1 启动时期 Zookeeper集群与Curator客户端
文章图片

3.2 运行时期 Zookeeper集群与Curator客户端
文章图片

4. observer角色及配置
  1. 在观察者服务器的配置文件中加以下配置
peerType=observer

  1. 在所有的服务器zookeeper配置文件中添加以下配置
server.3=192.168.1.3:2289:3389:observer

  1. 重启
5. Curator客户端 【Zookeeper集群与Curator客户端】Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Curator对原生的zookeeper框架进行了再次的封装,解决了很多底层的操作,实现了Fluent风格的API接口。
5.1 maven依赖
org.apache.curator curator-framework 2.12.0 org.apache.curator curator-recipes 2.12.0

5.2 curator连接对象的创建以及重连机制
package cn.tianqb.zookeeper.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.junit.Test; /** * @Description: * @Author tianqb * @Mail tianqingbo@tianqb.cn * @Date 2020/5/24 20:48 * @Version v1.0 */ public class ConnDemo {@Test public void conn() { // 美[kj??re?t?r] Curator CuratorFramework client = CuratorFrameworkFactory.builder() // 服务器地址 .connectString("192.168..0.1:2181") // 会话超时时间 .connectionTimeoutMs(5000) // 会话断开连接后的3000ms尝试再次连接(重连机制) .retryPolicy(new RetryOneTime(3000)) // 命名空间(根节点) .namespace("curator") .build(); // 启动连接 client.start(); System.out.println(client.isStarted()); client.close(); }/** * 重连机制 */ public void retryTime() { // 重连一次 // arg0:重连间隔 // RetryOneTime(int sleepMsBetweenRetry)// 重连N次 // arg0:连接次数arg1:间隔 // RetryNTimes(int n, int sleepMsBetweenRetries)// 一直重连 // arg0:重连间隔 // RetryForever(int retryIntervalMs)// 超过重连总时长,不再重连 // arg0:重连总时长arg1:重连间隔 // RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)// 随着次数的增加,重连间隔变长 // long sleepMs = (long)(this.baseSleepTimeMs * Math.max(1, this.random.nextInt(1 << retryCount + 1))); // ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) }}

5.3 创建节点
@Test public void create1() throws Exception { String path = client.create() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/node1", "node1".getBytes()); System.out.println(path); }@Test public void create2() throws Exception { List acl = new ArrayList<>(); acl.add(new ACL(ZooDefs.Perms.ALL, new Id("ip", "192.168.1.105"))); String path = client.create() .withMode(CreateMode.PERSISTENT_SEQUENTIAL) .withACL(acl) .forPath("/node2", "node2".getBytes()); System.out.println(path); }/** * 递归创建,若父节点不存在 * * @throws Exception */ @Test public void create3() throws Exception { List acl = new ArrayList<>(); acl.add(new ACL(ZooDefs.Perms.ALL, new Id("ip", "192.168.1.105"))); String path = client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT_SEQUENTIAL) .withACL(acl) .forPath("/node3/node33", "node3".getBytes()); System.out.println(path); }/** * 异步创建 * * @throws Exception */ @Test public void create4() throws Exception { List acl = new ArrayList<>(); acl.add(new ACL(ZooDefs.Perms.ALL, new Id("ip", "192.168.1.105"))); client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(acl) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { System.out.println(curatorEvent.getPath()); System.out.println(curatorEvent.getType()); } }) .forPath("/node4/node44", "node44".getBytes()); Thread.sleep(5000);

5.4 更新节点
@Test public void update() throws Exception { Stat stat = client.setData() // 指定版本 .withVersion(-1) // 指定路径和数据 .forPath("/node1", "node1111111".getBytes()); System.out.println(stat); }

5.5 删除节点
@Test public void delete1() throws Exception { client.delete() // 指定版本 .withVersion(-1) // 指定路径和数据 .forPath("/node4/node44"); System.out.println("结束!"); }/** * 删除不为空节点,若当前节点不为空,默认无法删除 * @throws Exception */ @Test public void delete2() throws Exception { client.delete() // 包含子节点 .deletingChildrenIfNeeded() // 指定版本 .withVersion(-1) // 指定路径和数据 .forPath("/node1"); System.out.println("结束!"); }

5.6 查看节点数据
@Test public void getData() throws Exception { byte[] bytes = client.getData() .forPath("/node4"); System.out.println(new String(bytes)); }/** * 同时获取属性 * @throws Exception */ @Test public void getData2() throws Exception { Stat stat = new Stat(); byte[] bytes = client.getData() // 获取属性 .storingStatIn(stat) .forPath("/node4"); System.out.println(new String(bytes)); System.out.println(stat.getAversion()); }

5.7 查看子节点
@Test public void getChildren() throws Exception { List> chs = client.getChildren() .forPath("/"); chs.forEach(System.out::println); }

5.8 查看节点是否存在
@Test public void update() throws Exception { Stat stat = client.checkExists() .forPath("/node1"); System.out.println(stat != null); }

5.9 监听机制 5.9.1 监听节点的新增和修改
@Test public void watch1() throws Exception { /** * 监听节点的新增和修改 */ NodeCache nodeCache = new NodeCache(client, "/create2"); nodeCache.start(); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println(nodeCache.getCurrentData().getPath()); System.out.println(new String(nodeCache.getCurrentData().getData())); } }); Thread.sleep(60000); nodeCache.close(); }

5.9.2 监听子节点变化
@Test public void watch2() throws Exception { /** * 监听子节点变化 * arg0 : 连接对象 * arg1 : 监听路径 * arg2 : 是否可以获取节点数据 */ PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/create2", true); pathChildrenCache.start(); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { System.out.println(pathChildrenCacheEvent.getData().getPath()); System.out.println(new String(pathChildrenCacheEvent.getData().getData())); System.out.println(pathChildrenCacheEvent.getType()); } }); Thread.sleep(60000); pathChildrenCache.close(); }

5.10 事务操作
@Test public void transaction() throws Exception {/** * 事务操作,一旦其中一个操作发生错误,则全部操作都无法提交 * 原子性 */ try{ client.inTransaction() .create() .forPath("/transaction") .and() .delete() .forPath("/xxx") .and() .commit(); }catch (Exception e) { System.out.println("事务操作发生错误"); e.printStackTrace(); } }

6. 分布式锁 6.1 排他锁
package cn.tianqb.zookeeper.curator.lock; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.RetryOneTime; /** * @Description: * @Author tianqb * @Mail tianqingbo@tianqb.cn * @Date 2020/5/26 13:51 * @Version v1.0 */ public class InterProcessMutexDemo {private CuratorFramework client; private void init() { client = CuratorFrameworkFactory.builder() .connectString("192.168.0.1:2181") .connectionTimeoutMs(5000) .retryPolicy(new RetryOneTime(3000)) .build(); client.start(); }public InterProcessMutexDemo() { init(); }public void action() { // 排他锁 InterProcessLock interProcessLock = new InterProcessMutex(client, "/create"); try { interProcessLock.acquire(); System.out.println("客户端" + Thread.currentThread().getName() + "获取锁成功"); for (int i = 0; i < 5; i++) { Thread.sleep(2000); System.out.println(i); } interProcessLock.release(); System.out.println("客户端" + Thread.currentThread().getName() + "释放锁成功"); client.close(); } catch (Exception e) { e.printStackTrace(); } }private void close() { client.close(); } }

6.1.2 测试
package cn.tianqb.zookeeper.curator.lock; /** * @Description: * @Author tianqb * @Mail tianqingbo@tianqb.cn * @Date 2020/5/26 14:16 * @Version v1.0 */ public class Main {public static void main(String[] args) { for (int i = 0; i < 5; i++) { new Thread(() -> { new InterProcessMutexDemo().action(); }, i + "").start(); } } }

6.2 读写锁
package cn.tianqb.zookeeper.curator.lock; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; import org.apache.curator.retry.RetryOneTime; /** * @Description: * @Author tianqb * @Mail tianqingbo@tianqb.cn * @Date 2020/5/26 15:49 * @Version v1.0 */ public class InterProcessReadWriteDemo {private CuratorFramework client; private void init() { client = CuratorFrameworkFactory.builder() .connectString("192.168.0.1:2181") .connectionTimeoutMs(5000) .retryPolicy(new RetryOneTime(3000)) .build(); client.start(); }public InterProcessReadWriteDemo() { init(); }public void readLock() throws Exception { InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/create"); InterProcessMutex interProcessMutex = interProcessReadWriteLock.readLock(); interProcessMutex.acquire(); System.out.println("客户端" + Thread.currentThread().getName() + "获取锁成功"); for (int i = 0; i < 5; i++) { Thread.sleep(2000); System.out.println(i); } interProcessMutex.release(); System.out.println("客户端" + Thread.currentThread().getName() + "释放锁成功"); close(); }public void writeLock() throws Exception { InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/create"); InterProcessMutex interProcessMutex = interProcessReadWriteLock.writeLock(); interProcessMutex.acquire(); System.out.println("客户端" + Thread.currentThread().getName() + "获取锁成功"); for (int i = 0; i < 5; i++) { Thread.sleep(2000); System.out.println(i); } interProcessMutex.release(); System.out.println("客户端" + Thread.currentThread().getName() + "释放锁成功"); close(); }private void close() { client.close(); } }

6.2.1 测试
package cn.tianqb.zookeeper.curator.lock; /** * @Description: * @Author tianqb * @Mail tianqingbo@tianqb.cn * @Date 2020/5/26 15:54 * @Version v1.0 */ public class ReadWriteLockMain { public static void main(String[] args) { // 读读异步 // 读写同步 // 写写同步for (int i = 0; i < 5; i++) { new Thread(() -> { InterProcessReadWriteDemo interProcessReadWriteDemo = new InterProcessReadWriteDemo(); try { interProcessReadWriteDemo.writeLock(); //interProcessReadWriteDemo.readLock(); } catch (Exception e) { e.printStackTrace(); } }, "" + i).start(); new Thread(() -> { InterProcessReadWriteDemo interProcessReadWriteDemo = new InterProcessReadWriteDemo(); try { //interProcessReadWriteDemo.readLock(); interProcessReadWriteDemo.writeLock(); } catch (Exception e) {e.printStackTrace(); } }, "" + (i + 5)).start(); }} }

    推荐阅读