#|Zookeeper后端开发工具Curator的使用 | Curator对节点的增删改查 | ACL权限控制 | 分布式锁 | 分布式计数器 | 附带最新版本下载

前言

Curator是Apache开源的一个Java工具类,通过它操作Zookeeper会变得极度舒适!
前置条件:已掌握的基本操作,比如在后台可以增减节点、ACL权限设置等。
1.Zookeeper原生API
1.超时重连,不支持自动,需要手动操作
2.Watch注册一次后会失效
3.不支持递归创建节点
2.ZookeeperAPI 升级版 Curator
1.解决watcher的注册一次就失效
2.提供更多解决方案并且实现简单
3.提供常用的ZooKeeper工具类
4.编程风格更爽,点点点就可以了
5.可以递归创建节点等
3. 知识点
1.使用curator建立与zk的连接
2.使用curator添加/递归添加节点
3.使用curator删除/递归删除节点
4.使用curator创建/验证 ACL(访问权限列表)
5.使用curator监听 单个/父 节点的变化(watch事件)
6.基于curator实现Zookeeper分布式锁(需要掌握基本的多线程知识)
7.基于curator实现分布式计数器
由于代码量比较大,下文只会涉及到重点代码片段,从而突出重点。
准备工作 1.Maven的pom.xml中配置Zookeeper和Curator的依赖
org.apache.zookeeper zookeeper 3.7.0 org.apache.curator curator-framework 5.2.0 org.apache.curator curator-recipes 5.2.0

curator-recipes:封装了一些高级特性,如:Cache事件监听、 Elections选举、分布式锁、分布式计数器、分布式Barrier、Queues队列等
一、使用Curator建立与Zookeeper服务连接
该类会被频繁使用,故抽离为一个单独的Utils,里面只存放前后台Connect的代码。
public class ZkConnectCuratorUtil { final static Logger log = LoggerFactory.getLogger(ZkConnectCuratorUtil.class); public CuratorFramework zkClient = null; //zk的客户端工具Curator(在本类通过new实例化的是,自动start) private static final int MAX_RETRY_TIMES = 3; //定义失败重试次数 private static final int BASE_SLEEP_TIME_MS = 5000; //连接失败后,再次重试的间隔时间 单位:毫秒 private static final int SESSION_TIME_OUT = 1000000; //会话存活时间,根据业务灵活指定 单位:毫秒 private static final String ZK_SERVER_IP_PORT = "192.168.31.216:2181"; //Zookeeper服务所在的IP和客户端端口 private static final String NAMESPACE = "workspace"; //指定后,默认操作的所有的节点都会在该工作空间下进行 //本类通过new ZkCuratorUtil()时,自动连通zkClient public ZkConnectCuratorUtil() { RetryPolicy retryPolicy = new RetryNTimes(MAX_RETRY_TIMES, BASE_SLEEP_TIME_MS); //首次连接失败后,重试策略 zkClient = CuratorFrameworkFactory.builder() //.authorization("digest", "root:root".getBytes())//登录超级管理(需单独配) .connectString(ZK_SERVER_IP_PORT) .sessionTimeoutMs(SESSION_TIME_OUT) .retryPolicy(retryPolicy) .namespace(NAMESPACE).build(); zkClient.start(); } public void closeZKClient() { if (zkClient != null) { this.zkClient.close(); } } public static void main(String[] args) { ZkConnectCuratorUtil zkUtil=new ZkConnectCuratorUtil(); boolean ifStarted=zkUtil.zkClient.isStarted(); System.out.println("当前客户的状态:" + (ifStarted ? "连接中" : "已关闭")); zkUtil.closeZKClient(); boolean ifClose = zkUtil.zkClient.isStarted(); System.out.println("当前客户的状态:" + (ifClose ? "连接成功" : "已关闭")); } }

下方预告:
增删改查均属前后台交互的操作,故统一写在CuratorDao.java中,统一管理。
各方法第一个入参(CuratorFramework zkClient),使用时通过如下代码获取:
ZkConnectCuratorUtil zkUtil=new ZkConnectCuratorUtil(); //new的同时,zk也被启动
CuratorFramework zkClient=zkUtil.zkClient;
注: CuratorFramework相当于ZK原生API中的ZooKeeper类
二、 使用Curator来实现节点的增删改查 1.使用curator(递归)添加节点
//级联创建节点(原生API不支持/后台客户端也不支持,但是Curator支持) public static void createNodes(CuratorFramework zkClient,String nodePath,String nodeData) throws Exception { zkClient.create() .creatingParentContainersIfNeeded()//创建父节点,如果需要的话 .withMode(CreateMode.PERSISTENT) //指定节点是临时的,还是永久的 .withACL(Ids.OPEN_ACL_UNSAFE) //指定节点的操作权限 .forPath(nodePath, nodeData.getBytes()); System.out.println(nodePath+"节点已成功创建…"); }

2.使用curator(递归)删除节点
//删除node节点及其子节点 public static void deleteNodeWithChild(CuratorFramework zkClient,String nodePath) throws Exception { zkClient.delete() .guaranteed()//保证删除:如果删除失败,那么在后端还是继续会删除,直到成功 .deletingChildrenIfNeeded()//级联删除子节点 //.withVersion(1)//版本号可以据需使用 .forPath(nodePath); System.out.println(nodePath+"节点已删除成功…"); }

3.使用curator更新节点数据
//更新节点data数据 public static void updateNodeData(CuratorFramework zkClient,String nodePath,String nodeNewData) throws Exception { zkClient.setData().withVersion(0).forPath(nodePath, nodeNewData.getBytes()); //版本号据需使用,默认可以不带 System.out.println(nodePath+"节点数据已修改成功…"); }

4.使用curator查询节点数据
//查询node节点数据 public static void getNodeData(CuratorFramework zkClient,String nodePath) throws Exception { Stat stat=new Stat(); byte [] data=https://www.it610.com/article/zkClient.getData().storingStatIn(stat).forPath(nodePath); System.out.println("节点"+nodePath+"的数据为"+new String(data)); System.out.println("节点的版本号为:"+stat.getVersion()); }

5.使用curator查询节点的子节点
//打印node子节点 public static void printChildNodes(CuratorFramework zkClient,String parentNodePath) throws Exception { List childNodes= zkClient.getChildren().forPath(parentNodePath); System.out.println("开始打印子节点"); for (String str : childNodes) { System.out.println(str); } }

6.使用curator判断节点是否存在
//判断node节点是否存在 public static void checkNodeExists(CuratorFramework zkClient,String nodePath) throws Exception { Stat stat=zkClient.checkExists().forPath(nodePath); System.out.println(null==stat?"节点不存在":"节点存在"); }

关于CuratorFramework 的更多用法,点击这里 。
三、 使用Curator高级API特性之Cache缓存监控节点变化
cache是一种缓存机制,可以借助cache实现监听。
简单来说,cache在客户端缓存了znode的各种状态,当感知到zk集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。
curator支持的cache种类有4种Path Cache,Node Cache,Tree Cache,Curator Cache
1)Path Cache
Path Cache用来观察ZNode的子节点并缓存状态,如果ZNode的子节点被创建,更新或者删除,那么Path Cache会更新缓存,并且触发事件给注册的监听器。
它是通过PathChildrenCache类来实现的,监听器注册是通过PathChildrenCacheListener。
2)Node Cache
Node Cache用来观察ZNode自身,如果ZNode节点本身被创建,更新或者删除,那么Node Cache会更新缓存,并触发事件给注册的监听器。
它是通过NodeCache类来实现的,监听器对应的接口为NodeCacheListener。
3)Tree Cache
Tree Cache是上两种的合体,Tree Cache观察的是自身+所有子节点的所有数据,并缓存所有节点数据。
它是通过TreeCache类来实现的,监听器对应的接口为TreeCacheListener。
4)Curator Cache ( requires ZooKeeper 3.6+)
Curator Cache,是在zk3.6新版本添加的特性,该版本的出现是为了逐步淘汰上面3监听。
它是通过CuratorCache类来实现的,监听器对应的接口为CuratorCacheListener。
Curator一次性的watch
import org.apache.curator.framework.api.CuratorWatcher; import org.apache.zookeeper.WatchedEvent; public class MyCuratorWatcher implements CuratorWatcher { @Override public void process(WatchedEvent event) throws Exception { System.out.println("触发watcher,节点路径为:" + event.getPath()); switch (event.getType()) { case NodeCreated: break; default: break; } } }

//一次性的watch public static void watchOnce(CuratorFramework zkClient,String nodePath) throws Exception { zkClient.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath); }

1.NodeCache监听当前节点变化 通过NodeCacheListener接口持续监听节点的变化来实现
//持续监听的watch public static void watchForeverByNodeCache(CuratorFramework zkClient,String nodePath) throws Exception { final NodeCache nodeCache=new NodeCache(zkClient, nodePath); //把监听节点,转换为nodeCache nodeCache.start(false); //默认为false设置为true时,会自动把节点数据存放到nodeCache中;设置为false时,初始化数据为空 ChildData cacheData=https://www.it610.com/article/nodeCache.getCurrentData(); if(null==cacheData) { System.out.println("NodeCache节点的初始化数据为空……"); }else { System.out.println("NodeCache节点的初始化数据为"+new String(cacheData.getData())); }//设置循环监听 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { ChildData cdata=https://www.it610.com/article/nodeCache.getCurrentData(); if(null==cdata) { System.out.println("节点发生了变化,可能刚刚被删除!"); nodeCache.close(); //关闭监听 }else { String data=https://www.it610.com/article/new String(cdata.getData()); String path=nodeCache.getCurrentData().getPath(); System.out.println("节点路径"+path+"数据发生了变化,最新数据为:"+data); } } }); }

2.PathChildrenCache只监听子节点变化 通过PathChildrenCacheListener接口持续监听子节点来实现
//持续监听watch子节点的任何变化 public static void watchForeverByPathChildrenCache(CuratorFramework zkClient,String nodePath) throws Exception { final PathChildrenCache childrenCache=new PathChildrenCache(zkClient, nodePath,true); //把监听节点,转换为childrenCache /** * StartMode:初始化方式 * POST_INITIALIZED_EVENT: 异步初始化,初始化之后会触发事件(会进入下面的第一个case) * NORMAL:异步初始化 (不会进入下面的第一个case) * BUILD_INITIAL_CACHE: 同步初始化(把节点数据同步缓存到Cache中) */ childrenCache.start(StartMode.NORMAL); List childDataList=childrenCache.getCurrentData(); System.out.println("当前节点所有子节点的数据列表如下:"); for (ChildData childData : childDataList) { System.out.println(new String(childData.getData())); }childrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case INITIALIZED: System.out.println("子节点初始化OK…"); break; case CHILD_ADDED: System.out.println("子节点"+event.getData().getPath()+"已被成功添加,数据data="https://www.it610.com/article/+new String(event.getData().getData())); break; case CHILD_UPDATED: System.out.println("子节点"+event.getData().getPath()+"数据发生变化,新数据data="https://www.it610.com/article/+new String(event.getData().getData())); break; case CHILD_REMOVED: System.out.println("子节点"+event.getData().getPath()+"已被移除~"); break; case CONNECTION_RECONNECTED: System.out.println("正在尝试重新建立连接…"); break; case CONNECTION_SUSPENDED: System.out.println("连接状态被暂时停止…"); break; default: break; } } }); }

3.TreeCache是上两者的合体,既监听自身,也监听所有子节点变化 通过TreeCacheListener接口来实现
public static void treeCache(CuratorFramework zkClient) throws Exception { final String path = "/treeChildrenCache"; final TreeCache treeCache = new TreeCache(zkClient, path); treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { switch (event.getType()){ case NODE_ADDED: System.out.println("节点变动触发:NODE_ADDED:" + event.getData().getPath()); break; case NODE_REMOVED: System.out.println("节点变动触发:NODE_REMOVED:" + event.getData().getPath()); break; case NODE_UPDATED: System.out.println("节点变动触发:NODE_UPDATED:" + event.getData().getPath()); break; case CONNECTION_LOST: System.out.println("节点变动触发:CONNECTION_LOST:" + event.getData().getPath()); break; case CONNECTION_RECONNECTED: System.out.println("节点变动触发:CONNECTION_RECONNECTED:" + event.getData().getPath()); break; case CONNECTION_SUSPENDED: System.out.println("节点变动触发:CONNECTION_SUSPENDED:" + event.getData().getPath()); break; case INITIALIZED: System.out.println("节点变动触发:INITIALIZED:" + event.getData().getPath()); break; default: break; } } }); //据需可以继续做一些其他的增删改操作 zkClient.create().withMode(CreateMode.PERSISTENT).forPath(path); Thread.sleep(1000); zkClient.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1"); Thread.sleep(1000); zkClient.delete().forPath(path + "/c1"); Thread.sleep(1000); zkClient.delete().forPath(path); Thread.sleep(1000); zkClient.close(); }

4.Curator Cache,是在zk3.6新版本添加的特性,Curator需5.*+
它的出现是为了替换以上3个监听(NodeCache、PathCache、TreeCache),它通过CuratorCacheListener.builder().for***来选择对应的监听。最后再通过curatorCache.listenable().addListener(listener); 注册监听。
public static void curatorCache1(CuratorFramework zkClient) { final String path = "/curatorCache"; CuratorCache curatorCache = CuratorCache.build(zkClient, path); curatorCache.listenable().addListener(new CuratorCacheListener() { @Override public void event(Type type, ChildData oldData, ChildData newdata) { switch (type) { case NODE_CREATED: //各种判断 break; default: break; } } }); }public static void curatorCache2(CuratorFramework zkClient) throws InterruptedException { final String path = "/curatorCache"; CuratorCache curatorCache = CuratorCache.builder(zkClient,path).build(); //构建监听器 //新旧对照: //1.node cache--> CuratorCacheListener.builder().forNodeCache(new NodeCacheListener(){} ); //2.path cache--> CuratorCacheListener.builder().forPathChildrenCache(); //3.tree cache--> CuratorCacheListener.builder().forTreeCache.forTreeCache(); CuratorCacheListener listener = CuratorCacheListener.builder() .forNodeCache(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("节点改变了..."); } }) .build(); //添加监听 curatorCache.listenable().addListener(listener); //开启监听 curatorCache.start(); //让线程休眠30s(为了方便测试) Thread.sleep(1000 * 30); }

5.测试环节
public static void main(String[] args) throws Exception {ZkConnectCuratorUtil cto = new ZkConnectCuratorUtil(); CuratorFramework zkClient=cto.zkClient; //获取zk客户端 CuratorDao dao=new CuratorDao(); String nodePath="/super/succ"; dao.createNodes(zkClient, nodePath, "super"); //创建节点 //dao.updateNodeData(zkClient, nodePath, "hello"); //更新节点数据 //dao.deleteNodeWithChild(zkClient, nodePath); //dao.getNodeData(zkClient, nodePath); //dao.printChildNodes(zkClient, nodePath); //dao.checkNodeExists(zkClient, nodePath); //dao.watchOnce(zkClient, nodePath); //dao.watchForeverByNodeCache(zkClient, nodePath); //dao.watchForeverByPathChildrenCache(zkClient, nodePath); Thread.sleep(300000); //延迟sleep时间,便于后才修改节点,看前台是否会继续触发watch cto.closeZKClient(); }

四、使用Curator创建/验证ACL(访问权限列表)
为了更清晰的表示ACL的代码实现,下面代码与上面代码完全隔离,新建CuratorAcl.java
准备环节 1.连通Zk时,就指定登录权限
下面代码判断,比上面的ZkConnectCuratorUtil.java中的登录代码,多了一行账号密码。连通zk时,就指定账号密码,避免后面操作需要登录权限的节点,每次都输入账号密码。通常情况下,账号密码可以通过构造参数传入;也可以在操作节点时指定登录权限。
//本类代码,只涉及ACL操作 public class CuratorAcl { public CuratorFramework client = null; public static final String workspace="workspace"; public static final String zkServerPath = "192.168.31.216:2181"; public CuratorAcl() { RetryPolicy retryPolicy = new RetryNTimes(3, 5000); client = CuratorFrameworkFactory.builder().authorization("digest", "mayun:mayun".getBytes())//通常情况下,登录账号、密码可以通过构造参数传入,暂时固定,据需修改 .connectString(zkServerPath) .sessionTimeoutMs(20000).retryPolicy(retryPolicy) .namespace(workspace).build(); client.start(); } public void closeZKClient() { if (client != null) { this.client.close(); } } }

2.写一个把明文的账号密码转换为加密后的密文的工具类
//把明文的账号密码转换为加密后的密文 public class AclUtils { public static String getDigestUserPwd(String loginId_Username_Passwd) { String digest = ""; try { digest = DigestAuthenticationProvider.generateDigest(loginId_Username_Passwd); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } return digest; } public static void main(String[] args) throws IOException, InterruptedException, KeeperException, Exception { String id = "mayun:mayun"; String idDigested = getDigestUserPwd(id); System.out.println(idDigested); // mayun:KThXmEntEPZyHsQk7tbP5ZzEevk= } }

下面各方法,均在CuratorAcl.java内部完成
1.使用自定义工具类AclUtils,一次性给多个用户赋Acl权限
public static List getAcls() throws NoSuchAlgorithmException{ List acls=new ArrayList(); Id mayun =new Id("digest", AclUtils.getDigestUserPwd("mayun:mayun")); Id lilei =new Id("digest", AclUtils.getDigestUserPwd("lilei:lilei")); acls.add(new ACL(Perms.ALL, mayun)); //给mayun一次性赋值所有权限 acls.add(new ACL(Perms.READ, lilei)); acls.add(new ACL(Perms.DELETE | Perms.CREATE, lilei)); //给lilei分两次赋权限(目的:看不同的赋权方式) return acls; }

2.级联创建节点,并赋予节点操作权限
//级联创建节点,并赋予节点操作权限 public static void createNodesCascade(CuratorAcl cto,String nodePath,String nodeData,List acls) throws Exception { String result=cto.client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(acls, true)//给节点赋权限 .forPath(nodePath, nodeData.getBytes()); System.out.println("创建成功,result="+result); }

3.读取节点数据
// 读取节点数据 publicvoid getNodeData(CuratorAcl cto,String nodePath) throws Exception { Stat stat = new Stat(); byte[] data = https://www.it610.com/article/cto.client.getData().storingStatIn(stat).forPath(nodePath); if(null!=stat) { System.out.println("节点" + nodePath + "的数据为: " + new String(data)); System.out.println("该节点的版本号为: " + stat.getVersion()); } }

4.修改具有ACL权限节点的data数据
//修改具有ACL权限节点的data public void modNodeDataWhichWithAcl(CuratorAcl cto,String nodePath,String nodeNewData) throws Exception { cto.getNodeData(cto, nodePath); System.out.println("节点修改后的数据为:"+nodeNewData); cto.client.setData().forPath(nodePath, nodeNewData.getBytes()); System.out.println("修改成功"); }

5.两种方法判断node节点是否存(优先使用第一种)
//两种方法判断node节点是否存 public void checkNodeExists(CuratorAcl cto,String nodePath) throws Exception { cto.getNodeData(cto, nodePath); System.out.println("-----------=================-------------"); //判断节点是否存在,方法一(路径前面会自动添加workspace) Stat stat=cto.client.checkExists().forPath(nodePath); System.out.println("======="+stat==null?"不存在":"存在"); //判断节点是否存在,方法二(路径前面需手动添加workspace) Stat stat2 = cto.client.getZookeeperClient().getZooKeeper().exists("/"+workspace+nodePath, false); System.out.println("======="+stat2==null?"不存在":"存在"); }

6.ACL权限的main方法测试
通过java代码给某个节点添加ACL权限后,后台登陆zk客户端时,是无法直接操作该节点被ACL控制的权限的操作的,要想操作具有ACL权限的节点,方法只有两个。
1、知道该节点输入用户都有哪些,用这些用户的账号密码登录
2、使用超级用户登录(需要单独配置,如何配置超级用户(见:三、5))
#getAcl /succ/testDigest 查看都有哪些用户对该节点有操作权限
#addauth digest succ:succ 登录
public static void main(String[] args) throws Exception { CuratorAcl cto = new CuratorAcl(); boolean isZkCuratorStarted = cto.client.isStarted(); System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接成功" : "已关闭")); String nodePath1 = "/acl/tom/bin"; String nodePath2 = "/acl/father/child/sub"; //cto.createNodesCascade(cto, nodePath1, "aclTest", getAcls()); //首次创建,报错,只能创建父节点,子节点无法创建 //cto.client.setACL().withACL(getAcls()).forPath("/curatorNode"); //给节点创建权限 //cto.getNodeData(cto, "/super"); //cto.getNodeData(cto, "/acl"); cto.checkNodeExists(cto, nodePath2); cto.closeZKClient(); boolean isZkCuratorStarted2 = cto.client.isStarted(); System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接成功" : "已关闭")); }

五、分布式锁
假如某网上商城,在单线程单系统情况下,不会存在库存为负数的情况。
然后多线程、微服务的情况下,这种情况不加处理就难以避免了,例如:库存10,A进来订购6,在锁定这个6个商品的同时,在高并发情况下,就有可能有个B和A在同一时间点,也订购了7个商品,那么库存就有可能会变成 负3。
为了避免这个情况的出现,就需要有个分布式锁,监控这库存,锁定每笔交易……
Curator的5种分布式锁及其对应的核心类:
1.重入式排它锁 Shared Reentrant Lock,实现类:InterProcessMutex
2.不可重入排它锁 Shared Lock ,实现类:InterProcessSemaphoreMutex
3.可重入读写锁 Shared Reentrant Read Write Lock,实现类: InterProcessReadWriteLock 、InterProcessLock
4.多锁对象容器(多共享锁) Multi Shared Lock,将多个锁作为单个实体管理的容器,实现类:InterProcessMultiLock、InterProcessLock
5.共享信号锁Shared Semaphore ,实现类:InterProcessSemaphoreV2
跨 JVM 工作的计数信号量。使用相同锁路径的所有 JVM 中的所有进程将实现进程间有限的租用集。此外,这个信号量大多是“公平的”——每个用户将按照请求的顺序获得租用(从 ZK 的角度来看)。
有两种模式可用于确定信号量的最大租用。在第一种模式中,最大租用是由给定路径的用户维护的约定。在第二种模式中,SharedCountReader 用作给定路径的信号量的方法,以确定最大租用。
1.重入式排它锁InterProcessMutex
public InterProcessMutex(CuratorFramework client, String path)

获取/释放锁的API
public void acquire() throws Exception; //获取锁,获取不到锁一直阻塞,zk连接中断则抛异常 public boolean acquire(long time, TimeUnit unit) throws Exception; //获取锁,超过该时间后,直接返回false,zk连接中断则抛异常 public void release() throws Exception; //释放锁

通过release()方法释放锁。InterProcessMutex 实例可以重用。Revoking ZooKeeper recipes wiki定义了可协商的撤销机制。为了撤销mutex, 调用下面的方法
/** * 将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。 * Parameters: * listener - the listener */ public void makeRevocable(RevocationListener listener)

2.不可重入排它锁InterProcessSemaphoreMutex
public InterProcessSemaphoreMutex(CuratorFramework client, String path)

使用InterProcessSemaphoreMutex,调用方法类似,区别在于该锁是不可重入的,在同一个线程中不可重入
3.可重入读写锁InterProcessReadWriteLock 、InterProcessLock
一个读写锁管理一对相关的锁。一个负责读操作,另外一个负责写操作。读操作在写锁没被使用时可同时由多个进程使用,而写锁使用时不允许读 (阻塞)。此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁, 比如请求写锁 —>读锁 —->释放写锁。从读锁升级成写锁是不成的。
4.多锁对象容器(多共享锁) ,将多个锁作为单个实体管理,InterProcessMultiLock、InterProcessLock
Multi Shared Lock是一个锁的容器。当调用acquire, 所有的锁都会被acquire(上锁),如果请求失败,所有的锁都会被release (释放锁)。同样调用release时所有的锁都被release(失败被忽略)。基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。主要涉及两个类:InterProcessMultiLock、InterProcessLock
它的构造函数需要包含的锁的集合,或者一组ZooKeeper的path。
public InterProcessMultiLock(List locks) public InterProcessMultiLock(CuratorFramework client, List paths)

5.分布式锁示例
public class ZkLock { final static Logger log = LoggerFactory.getLogger(ZkLock.class); public CuratorFramework zkClient = null; // zk的客户端工具Curator(在本类通过new实例化的是,自动start) private static final int BASE_SLEEP_TIME_MS = 1000; // 连接失败后,再次重试的间隔时间 单位:毫秒 private static final int MAX_RETRY_TIMES = 10; // 定义失败重试次数 private static final int SESSION_TIME_OUT = 1000000; // 会话存活时间,根据业务灵活指定 单位:毫秒 private static final String ZK_SERVER_IP_PORT = "192.168.31.216:2181"; // Zookeeper服务所在的IP和客户端端口 private static final String NAMESPACE = "workspace"; // 指定后,默认操作的所有的节点都会在该工作空间下进行 static int j = 10; //初始化zk客户端 public ZkLock() { // 重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRY_TIMES); // 通过工厂建立连接 zkClient = CuratorFrameworkFactory.builder().connectString(ZK_SERVER_IP_PORT) // 连接地址 .sessionTimeoutMs(SESSION_TIME_OUT).retryPolicy(retryPolicy)// 重试策略 .build(); zkClient.start(); } }

下面是核心测试方法
public static void lockTest(CuratorFramework zkClient) throws InterruptedException { // 使用分布式锁,所有系统同时监听同一个节点,达到分布式锁的目的 final InterProcessMutex lock = new InterProcessMutex(zkClient, "/test"); final CountDownLatch countDownLatch = new CountDownLatch(1); for (int i = 0; i < 10; i++) {//启动10个线程 new Thread(new Runnable() { @Override public void run() { try { countDownLatch.await(); // 线程等待一起执行 lock.acquire(); // 分布式锁,数据同步 // 处理业务 j--; System.out.println(j); } catch (Exception e) { e.printStackTrace(); } finally { try {// 释放锁 lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }, "t" + i).start(); } Thread.sleep(1000); countDownLatch.countDown(); // 模拟十个线程一起并发.指定一起执行 } public static void main(String[] args) throws InterruptedException { ZkLock zkl=new ZkLock(); ZkLock.lockTest(zkl.zkClient); }

#|Zookeeper后端开发工具Curator的使用 | Curator对节点的增删改查 | ACL权限控制 | 分布式锁 | 分布式计数器 | 附带最新版本下载
文章图片

六、分布式计数器
利用Zookeeper可以实现一个集群共享的计数器。只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器:DistributedAtomicInteger,DistributedAtomicLong。这个两个除了计数范围(int、long)不同外,没有任何不同。操作也非常简单,跟AtomicInteger大同小异。
increment() //加1 decrement() //减1 compareAndSet(Integer expectedValue, Integer newValue) //cas操作 get() //获取当前值 add():增加特定的值 subtract(): 减去特定的值 trySet(): 尝试设置计数值

使用的时候,必须检查返回结果的succeeded(), 它代表此操作是否成功。如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。
1.代码示例
public static void count(CuratorFramework zkClient) throws Exception { //分布式计数器 DistributedAtomicInteger counter=new DistributedAtomicInteger(zkClient,"/super",new RetryNTimes(3,100)); //初始化 counter.forceSet(0); AtomicValue value = https://www.it610.com/article/counter.increment(); //原子自增 System.out.println("原值为"+value.preValue()); System.out.println("更改后的值为"+value.postValue()); System.out.println("状态"+value.succeeded()); } public static void main(String[] args) throws Exception { ZkLock zkl=new ZkLock(); //ZkLock.lockTest(zkl.zkClient); ZkLock.count(zkl.zkClient); }

#|Zookeeper后端开发工具Curator的使用 | Curator对节点的增删改查 | ACL权限控制 | 分布式锁 | 分布式计数器 | 附带最新版本下载
文章图片

尾言
力求语言简洁,清晰描述出Curator的常规使用方法,如有不正之处请批评指正。另外Curator还有一些高端的用法:分布式屏障—Barrier、Double-barrier,分布式队列DistributedQueueDistributed Queue(数据量很小的话,酌情考虑可以使用,大的话不建议使用),后续持续更新!
附注 1、Zookeeper集群的快速搭建(图文详解)
2、Zookeeper常规命令 | Watch监控命令 | ACL权限操作 | 四字命令详解 | 配置super超级用户权限
3、Zookeeper 的计数器CountDownLatch 的使用(模拟火箭发射)
【#|Zookeeper后端开发工具Curator的使用 | Curator对节点的增删改查 | ACL权限控制 | 分布式锁 | 分布式计数器 | 附带最新版本下载】4、Zookeeper图形化的客户端(ZooInspector)| 图形化的监控工具(taoKeeper)的下载和使用

    推荐阅读