zookeeper从小白到精通

【zookeeper从小白到精通】
目录

  • 1.介绍
    • 1.1概念
    • 1.2特点
    • 1.3主要的集群步骤
    • 1.4数据结构
    • 1.5应用场景
  • 2.本地安装
    • 2.1安装jdk
    • 2.2下载安装
    • 2.3配置文件修改
    • 2.4启动服务端
    • 2.5启动客户端
    • 2.6zookeeper常用命令
    • 2.7配置文件解读
  • 3.集群安装
    • 3.1集群规划
    • 3.2安装
    • 3.3配置
    • 3.4启动zookeeper集群
  • 4.选举机制
    • 4.1触发选举时机
    • 4.2zookeeper选举机制---第一次启动
    • 4.2zookeeper选举机制---非第一次启动
    • 4.3选举机制总结
  • 5.客户端命令行操作
    • 5.1常用命令整合
    • 5.2启动客户端
    • 5.3znode节点数据信息
    • 5.4节点类型(持久/短暂/有序号/无序号)
    • 5.5节点操作
      • 5.5.1创建/删除节点
      • 5.5.2获取/查看的值
    • 5.6节点监听
      • 5.6.1监听节点值改变
      • 5.6.2监听节点下子节点数量改变
  • 6.goang操作zookeeper
    • 6.1创建节点create(增)
    • 6.2查看节点get(查)
    • 6.3修改节点set(改)
    • 6.4删除节点delete(删)
    • 6.5查看节点的子节点Children
    • 6.6遍历节点Children后再get
    • 6.7判断节点是否存在-conn.Exists
  • 7.监听/watch
    • 7.1监听节点-全局监听
    • 7.2监听部分事件
  • 8.微服务动态上下线监听(服务注册/发现)
    • 8.1需求实现
    • 8.2服务端创建代码-(注册服务)
    • 8.3客户端监听代码-(服务发现)
  • 9.分布式锁
    • 9.1分布式锁案例
    • 9.2监控锁案例

1.介绍 1.1概念
zookeeper作用:用于维护配置信息、命名、提供分布式同步和提供组服务 zookeeper主要是文件系统和通知机制文件系统主要是用来存储数据 通知机制主要是服务器或者客户端进行通知,并且监督 基于观察者模式设计的分布式服务管理框架,开源的分布式框架

1.2特点
1):一个leader,多个follower的集群 2):集群只要有半数以上包括半数就可正常服务,一般安装奇数台服务器 3):全局数据一致,每个服务器都保存同样的数据,实时更新 4):更新的请求顺序保持顺序(来自同一个服务器) 5):数据更新的原子性,数据要么成功要么失败(大事务) 6):数据实时更新性很快(因为数据量很小)

1.3主要的集群步骤
1):服务端启动时去注册信息(创建都是临时节点) 2):获取到当前在线服务器列表,并且注册监听 3):服务器节点下线 4):服务器节点上下线事件通知 5):process(){重新再去获取服务器列表,并注册监听}

1.4数据结构
与 Unix 文件系统很类似,可看成树形结构,每个节点称做一个ZNode。每一个ZNode默认能够存储 1MB 的数据。也就是只能存储小数据(一般配置信息,注册信息等)

1.5应用场景
1):统一命名服务(域名服务) 在分布式环境下,经常需要对应用/服务进行统一命名,便于识别,eg:ip不容易记住,而域名容易记住2):统一配置管理(一个集群中的所有配置都一致,且也要实时更新同步)3):将配置信息写入ZooKeeper上的一个Znode,各个客户端服务器监听这个Znode。一旦Znode中的数据被修改,ZooKeeper将通知各个客户端服务器4):统一集群管理(掌握实时状态)5):将节点信息写入ZooKeeper上的一个ZNode。监听ZNode获取实时状态变化6):服务器节点动态上下线7):软负载均衡(根据每个节点的访问数,让访问数最少的服务器处理最新的数据需求)

zookeeper从小白到精通
文章图片

2.本地安装 2.1安装jdk
1.查看是否已安装JDK yum list installed |grep java 2.卸载CentOS系统Java环境 # yum -y remove java-1.8.0-openjdk**表示卸载所有openjdk相关文件输入 # yum -y remove tzdata-java.noarch卸载tzdata-java3.查看JDK软件包版本 # yum -y list java*或者使用# yum searchjava | grep -i --color JDK 4.安装 yum install java-1.8.0-openjdk*//安装java1.8.0所有程序 java -version //查看java版本信息注意:使用yum安装环境变量自动就配好了

2.2下载安装
官网:https://zookeeper.apache.org/ 下载3.5.7稳定版本 下载:https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/ wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz 解压:tar -xf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/ 改名:mv apache-zookeeper-3.5.7-bin zookeeper-3.5.7 配置文件改名:mv zoo_sample.cfg zoo.cfg

bin目录 框架启动停止,客户端和服务端的 conf 配置文件信息 docs文档 lib 配置文档的依赖

2.3配置文件修改 配置文件:/opt/module/zookeeper-3.5.7/conf/zoo.cfg
修改: dataDir = /opt/module/zookeeper-3.5.7/zkData // 通常修改的路径

2.4启动服务端
cd /opt/module/zookeeper-3.5.7/bin [root@sg-15 bin]# ./zkServer.sh start// 启动服务端//查看进程 [root@sg-15 bin]# jps -l 21172 sun.tools.jps.Jps 21110 org.apache.zookeeper.server.quorum.QuorumPeerMain

2.5启动客户端
[root@sg-15 bin]# ./zkCli.sh// 启动客户端 [zk: localhost:2181(CONNECTED) 4] ls / [zookeeper][zk: localhost:2181(CONNECTED) 5] quit //退出客户端[root@sg-15 bin]# ./zkServer.sh status // 查看zookeeper状态 /usr/bin/java ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: standalone

2.6zookeeper常用命令
[root@sg-15 bin]# ./zkServer.sh start// 启动服务端 [root@sg-15 bin]# ./zkServer.sh stop// 停止服务端 [root@sg-15 bin]# jps -l// 查看进程 [root@sg-15 bin]# ./zkCli.sh// 启动客户端 [zk: localhost:2181(CONNECTED) 5] quit // 退出客户端 [root@sg-15 bin]# ./zkServer.sh status // 查看zookeeper状态

2.7配置文件解读
配置文件的5大参数:tickTime = 2000//通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒 initLimit = 10//Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量) syncLimit = 5//Leader和Follower之间通信时间如果超过5个心跳时间,Leader认为Follwer死掉,从服务器列表中删除Follwer。 dataDir =/tmp/zookeeper //保存zookeeper的数据,这是默认值,会定时被系统清除 dataDir保存zookeeper的数据,默认是temp会被系统定期清除,通常改为自己的路径dataDir = /opt/module/zookeeper-3.5.7/zkData // 通常修改的路径clientPort = 2181//客户端的连接端口,一般不需要修改

3.集群安装 3.1集群规划
sg15,sg16,sg17三台机器部署Zookeeper

3.2安装
解压:tar -xf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/ 改名:mv apache-zookeeper-3.5.7-bin zookeeper-3.5.7 配置文件改名:mv zoo_sample.cfg zoo.cfg

3.3配置
[root@sg-15 zookeeper-3.5.7]# mkdir zkData// 创建目录zkData [root@sg-15 zkData]# vi myid// 创建一个 myid 的文件 在文件中添加与 server 对应的编号(注意:上下不要有空行,左右不要有空格) 5// 192.168.0.215,这里设置服务器尾号,其他的不重复就行[root@sg-15 conf]# mv zoo_sample.cfg zoo.cfg// 配置文件改名 [root@sg-15 conf]# vi zoo.cfg//修改配置文件 dataDir = /opt/module/zookeeper-3.5.7/zkData // 通常修改的路径 // 添加配置 #######################cluster########################## server.5=192.168.0.215:2888:3888 server.6=192.168.0.216:2888:3888 server.7=192.168.0.217:2888:3888##配置参数解读 server.A=B:C:D A 是一个数字,表示这个是第几号服务器,就是myid中的值 B 是这个服务器的地址 C 是这个服务器Follower 与集群中的 Leader 服务器交换信息的端口; D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。注意:一台机器弄好之后,打包发送到其他机器。其他机器修改myid值 tar -cf module.tar ./module// 打包 scp -r module.tar root@192.168.0.216:/opt///发送 tar -xf module.tar// 解包

3.4启动zookeeper集群
注意:集群只要有半数以上包括半数就可正常服务。三台机器启动: [root@sg-15 bin]# ./zkServer.sh start//启动 [root@sg-15 bin]# ./zkServer.sh restart//重启 [root@sg-17 bin]# ./zkServer.sh status//检查状态 /usr/bin/java ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: leader//本机器为leader[root@sg-15 bin]# ./zkServer.sh status /usr/bin/java ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: follower//本机器为follower

4.选举机制
选举谁当leader 介绍: SID:服务器ID。用来唯一标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid一致。 ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻, 集群中的每台机器的ZXID值不一定完全一样 Epoch:每个Leader任期的代号。没有Leader时同一轮投票过程中的逻辑时钟值是

4.1触发选举时机
1.服务器刚启动 2.服务器运行期间无法和leader保持连接当一台机器进入leader选举流程时,当前集群出现两种状态: 1.集群中已经存在一个leader(此机器和leader建立连接,并状态同步) 2.集群中不存在leader(触发选举),looking状态

4.2zookeeper选举机制---第一次启动
服务器1:myid=1 服务器2:myid=2 服务器3:myid=3 (1) 服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING; (2) 服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:服务器1和服务器2比较谁的myid大,更改选票为推举服务器myid大的。此时服务器1票数0票,服务器2票数2票,大于半数以上结果,选举完成。服务器1为follower,2状态为leader (3) 服务器3启动,发起一次选举。此时服务器1,2已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为1票,此时服务器3服从多数,更改选票信息为服务器2,并更改状态为FOLLOWING;

4.2zookeeper选举机制---非第一次启动
服务器1:myid=1 服务器2:myid=2 服务器3:myid=3 服务器运行期间无法和leader保持连接触发重新选举: 假设zookeeper由5台服务器组成,SID分别为1,2,3,4,5,ZXID分别为8,8,8,7,7,并且此时SID为3的服务器此时时leader。某时刻3和5出现故障,因此开始进行leader选举: SID为1的机器:(1,8,1) SID为2的机器:(1,8,2) SID为4的机器:(1,7,4)选举规则: 优先比较SID,再比较ZXID,其次比较Epoch。大的直接胜出当选leader

4.3选举机制总结
半数机制,超过半数的投票通过,即通过。 第一次启动选举规则:投票过半数时,服务器 myid 大的胜出当leader 第二次启动选举规则:①EPOCH大的直接胜出 ②EPOCH相同,事务id大的胜出 ③事务id相同,任期代号id大的胜出

5.客户端命令行操作 5.1常用命令整合
登陆客户端操作:很多命令和linux命令相似,比如ls,history等 jps : 查看zookeeper运行的进程 help :显示所有操作命令 ls / :查看当前znode中包含的内容 ls -s / :查看当前节点详细数据 create : 创建普通节点 create -s :创建带序号节点 create -e :创建临时普通节点 create -e -s :创建临时有序节点 delete:删除节点create /wangzhe "this is wangzhe"//新建节点(永久节点,不带序号) create /wangzhe/fashi "this is fashi" // //新建节点(永久节点,不带序号) get /wangzhe// this is wangzhe取值 get -s /wangzhe//节点详情 set set /wangzhe "this is wangzherongyao" //修改值get -w :监听值 ls -w /wangzhe :监听数量 监听注册一个生效一次

5.2启动客户端
[root@sg-15 bin]# ./zkCli.sh -server 192.168.0.215:2181 quit //退出

5.3znode节点数据信息
命令:ls -s / [zk: 192.168.0.215:2181(CONNECTED) 3] ls -s / [zookeeper]cZxid = 0x0 ctime = Thu Jan 01 08:00:00 CST 1970 mZxid = 0x0 mtime = Thu Jan 01 08:00:00 CST 1970 pZxid = 0x0 cversion = -1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 1################################## 1):czxid:创建节点的事务id zxid 每次修改ZooKeeper 状态都会产生一个ZooKeeper 事务 ID。事务 ID 是ZooKeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么zxid1 在 zxid2 之前发生。2):ctime:znode 被创建的毫秒数(从 1970 年开始) 3):mzxid:znode 最后更新的事务zxid 4):mtime:znode 最后更新的毫秒数(从 1970 年开始) 5):pZxid:znode 最后更新的子节点zxid 6):cversion:znode 子节点变化号,znode 子节点修改次数 7):dataversion:znode 数据变化号 8):aclVersion:znode 访问控制列表的变化号 9):ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。 10):dataLength:znode 的数据长度 11):numChildren:znode 子节点数量

5.4节点类型(持久/短暂/有序号/无序号)
持久:客户端和服务端端开连接后,创建的节点不删除 序号:在分布式系统中,顺序号可以被用于所有事件排序,这样客户端可以通过顺序号推断事件的顺序 { 持久有序号:客户端和服务端端开连接后,创建的节点不删除。且节点名称顺序编号 持久无序号:客户端和服务端端开连接后,创建的节点不删除。无序号 }短暂:客户端和服务端端开连接后,创建的节点自己删除 { 短暂有序号:客户端和服务端端开连接后,创建的节点自己删除。且节点名称顺序编号 短暂无序号:客户端和服务端端开连接后,创建的节点自己删除。无序号 }

5.5节点操作 5.5.1创建/删除节点
create : 创建普通节点 create -s :创建带序号节点 create -e :创建临时普通节点 create -e -s :创建临时有序节点 delete:删除一个节点(如果这个节点下有子节点,删除失败) deleteall:递归删除所有节点(如果这个节点下有子节点,全部删除) eg:delete /wangzhe/fashi //只删除fashi节点 deleteall /wangzhe//递归删除wangzhe所有节点//1.创建节点:普通永久节点 [zk: 192.168.0.215:2181(CONNECTED) 27] create /wangzhe "this is wangzhe" Created /wangzhe[zk: 192.168.0.215:2181(CONNECTED) 40] create /wangzhe/fashi "this is fashi" Created /wangzhe/fashi//2.创建节点:有序永久节点 [zk: 192.168.0.215:2181(CONNECTED) 66] create -s /wangzhe/fashi/daji "daji" Created /wangzhe/fashi/daji0000000000[zk: 192.168.0.215:2181(CONNECTED) 70] create -s /wangzhe/fashi/daji "daji" Created /wangzhe/fashi/daji0000000001 //再次创建daji,序号+1//3.创建节点:普通临时节点 [zk: 192.168.0.215:2181(CONNECTED) 71] create -e /wangzhe/fashi/ganjiang "ganjiang" Created /wangzhe/fashi/ganjiang//4.创建节点:有序临时节点 [zk: 192.168.0.215:2181(CONNECTED) 73] create -e -s /wangzhe/fashi/anqila "anqila" Created /wangzhe/fashi/anqila0000000003//查看 [zk: 192.168.0.215:2181(CONNECTED) 74] ls /wangzhe/fashi [anqila0000000003, daji0000000000, daji0000000001, ganjiang]//delete删除 [zk: 192.168.0.215:2181(CONNECTED) 19] delete /wangzhe/fashi/xiaoqiao xiaoqiaoxiaoqiao0000000007

5.5.2获取/查看的值
注意:get获取有序节点的值时:必须获取最后一个序号的值,比如获取下面ganjiang0000000004报错,获取ganjiang0000000005正常。 ls get get -s//查看 [zk: 192.168.0.215:2181(CONNECTED) 10] ls /wangzhe/fashi [daji0000000000, daji0000000001, ganjiang0000000004, ganjiang0000000005, xiaoqiao]//获取值 [zk: 192.168.0.215:2181(CONNECTED) 12] get /wangzhe/fashi/xiaoqiao0000000007 xiaoqiao// 获取详细 [zk: 192.168.0.215:2181(CONNECTED) 13] get -s /wangzhe/fashi/xiaoqiao0000000007 xiaoqiao cZxid = 0x30000002f ctime = Sat Mar 26 18:37:35 CST 2022 mZxid = 0x30000002f mtime = Sat Mar 26 18:37:35 CST 2022 pZxid = 0x30000002f cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 8 numChildren = 0[zk: 192.168.0.215:2181(CONNECTED) 14] get -s /wangzhe/fashi this is fashi cZxid = 0x300000020 ctime = Sat Mar 26 17:31:12 CST 2022 mZxid = 0x300000020 mtime = Sat Mar 26 17:31:12 CST 2022 pZxid = 0x30000002f cversion = 10 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 13 numChildren = 6

5.6节点监听
get -w :监听值 ls -w /wangzhe :监听数量 监听注册一个生效一次

5.6.1监听节点值改变
[zk: 192.168.0.215:2181(CONNECTED) 25] get -w /wangzhe //开启一次监听 this is wangzhe [zk: 192.168.0.215:2181(CONNECTED) 26] set /wangzhe "jianting:this is wangzhe"//修改值WATCHER:://监听值发生变化 WatchedEvent state:SyncConnected type:NodeDataChanged path:/wangzhe

5.6.2监听节点下子节点数量改变
[zk: 192.168.0.215:2181(CONNECTED) 27] ls -w /wangzhe////开启一次监听 [fashi, fashi0000000001] [zk: 192.168.0.215:2181(CONNECTED) 28] delete /wangzhe/fashi0000000001//删除一个节点WATCHER:://监听数量发生变化WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/wangzhe

6.goang操作zookeeper 6.1创建节点create(增)
package mainimport ( "fmt" "time" "github.com/go-zookeeper/zk" )func main() { // 任意一个ip都可以,但是建议放主节点ip conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second) if err != nil { panic(err) } defer conn.Close() // 1.创建的普通永久节点 path, err := conn.Create("/hello", []byte("world"), 0, zk.WorldACL(zk.PermAll)) if err != nil { fmt.Println("err:",err) } fmt.Println("创建的普通永久节点:", path)// 2.创建的普通临时节点,创建此节点的会话结束后立即清除此节点 ephemeral, err := conn.Create("/ephemeral", []byte("world"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) if err != nil { fmt.Println("err:",err) } fmt.Println("创建的普通临时节点:", ephemeral)// 3.创建的有序永久节点 sequence, err := conn.Create("/sequence", []byte("world"), zk.FlagSequence, zk.WorldACL(zk.PermAll)) if err != nil { panic(err) } fmt.Println("创建的有序永久节点:", sequence)// 4.创建的有序临时节点,创建此节点的会话结束后立即清除此节点 ephemeralSequence, err := conn.Create("/ephemeralSequence", []byte("world"), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll)) if err != nil { panic(err) } fmt.Println("创建的有序临时节点:", ephemeralSequence) }

6.2查看节点get(查)
package mainimport ( "fmt" "time" "github.com/go-zookeeper/zk" )func main() { conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second) if err != nil { fmt.Println("err:",err) return } defer conn.Close() result, state, err := conn.Get("/wangzhe/fashi") if err != nil { fmt.Println("err:",err) return } fmt.Println("result: ", string(result)) // this is fashi fmt.Printf("%#v",state) //状态结果:&zk.Stat{Czxid:12884901920, Mzxid:12884901920, Ctime:1648287072539, Mtime:1648287072539, Version:0, Cversion:11, Aversion:0, EphemeralOwner:0, DataLength:13, NumChildren:5, Pzxid:12884901936} }

6.3修改节点set(改)
package mainimport ( "fmt" "time" "github.com/go-zookeeper/zk" )func main() { conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second) if err != nil { fmt.Println("err:",err) return } defer conn.Close() path := "/wangzhe/fashi" _, state, _ := conn.Get(path) // 先查询,拿到当前版本 state, err = conn.Set(path, []byte("hello"), state.Version) if err != nil { fmt.Println("err:",err) return } fmt.Printf("%#v",state)//结果:&zk.Stat{Czxid:12884901920, Mzxid:12884902012, Ctime:1648287072539, Mtime:1648305221598, Version:1, Cversion:11, Aversion:0, EphemeralOwner:0, DataLength:5, NumChildren:5, Pzxid:12884901936}2022/03/26 22:33:39 recv loop }

6.4删除节点delete(删) 注意:此方法不能递归删除节点
package mainimport ( "fmt" "time" "github.com/go-zookeeper/zk" )func main() { conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second) if err != nil { fmt.Println("err:",err) return } defer conn.Close() path := "/hello" exists, state, err := conn.Exists(path)//判断是否存在和查询版本 if exists{ err = conn.Delete(path, state.Version) if err != nil { fmt.Println("err:",err) return } fmt.Println("节点删除成功!!!") }else { fmt.Println("节点不存在!!!") } }

6.5查看节点的子节点Children
package mainimport ( "fmt" "time" "github.com/go-zookeeper/zk" )func main() { conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second) if err != nil { fmt.Println("err:",err) return } defer conn.Close() childrenList, state, err := conn.Children("/wangzhe/fashi") if err != nil { fmt.Println("err:",err) return } fmt.Println(childrenList) fmt.Printf("%#v",state) }

6.6遍历节点Children后再get
package mainimport ( "fmt" "time" "github.com/go-zookeeper/zk" )func main() { conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second) if err != nil { fmt.Println("err:",err) return } defer conn.Close() childrenList, _, err := conn.Children("/wangzhe/fashi") if err != nil { fmt.Println("err:",err) return } for _,children:=range childrenList{ childrenPath:= fmt.Sprintf("/wangzhe/fashi/%s",children) result, state, err := conn.Get(childrenPath) if err != nil { fmt.Println("err:",err) return } fmt.Println(string(result)) fmt.Println(state) } }

6.7判断节点是否存在-conn.Exists
package mainimport ( "fmt" "time" "github.com/go-zookeeper/zk" )func main() { conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second) if err != nil { fmt.Println("err:",err) return } defer conn.Close() path := "/hello" exists, _, err := conn.Exists(path) if err != nil { fmt.Println("err:",err) return }if exists{ fmt.Println("节点存在") }else{ fmt.Println("节点不存在") } }

7.监听/watch 7.1监听节点-全局监听 将监听器放到Connect函数中,如果有监听事件发生,会一直执行监听器的回调函数。监听执行了一次之后要重新注册监听。
package mainimport ( "fmt" "time" "github.com/go-zookeeper/zk" )func callback(e zk.Event) { fmt.Println("========================") fmt.Println("path:", e.Path) fmt.Println("type:", e.Type.String()) fmt.Println("state:", e.State.String()) }func main() { eventCallbackOption := zk.WithEventCallback(callback) // 经过测试,在连接时会执行3次回调函数 conn, _, err := zk.Connect([]string{"192.168.0.215","192.168.0.216","192.168.0.217"}, time.Second,eventCallbackOption) if err != nil { fmt.Println("err:",err) return } defer conn.Close() // 注册一个 watch exists, state, _, err := conn.ExistsW("/watch") if err != nil { fmt.Println("err:",err) return } fmt.Println("exists:",exists) if !exists { // 创建 /watch 时,触发监听事件,watch 失效 _, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) if err != nil { fmt.Println("err:",err) return }// 再注册一个 watch _, state, _, err = conn.ExistsW("/watch") if err != nil { fmt.Println("err:",err) return } } // 删除 /watch 时,触发监听事件,watch 失效 err = conn.Delete("/watch", state.Version) if err != nil { fmt.Println("err:",err) return } }

结果:
======================== path: type: EventSession state: StateConnecting ======================== path: type: EventSession state: StateConnected 2022/03/27 11:09:27 connected to 192.168.0.215:2181 ======================== path: type: EventSession state: StateHasSession 2022/03/27 11:09:27 authenticated: id=360292329056632906, timeout=4000 2022/03/27 11:09:27 re-submitting `0` credentials after reconnect exists: false ======================== path: /watch type: EventNodeCreated state: Unknown ======================== path: /watch type: EventNodeDeleted state: Unknown

7.2监听部分事件
package mainimport ( "fmt" "time" "github.com/go-zookeeper/zk" )func main() { conn, _, err := zk.Connect([]string{"192.168.0.215","192.168.0.216","192.168.0.217"}, time.Second) if err != nil { fmt.Println("err:",err) return } defer conn.Close() // 注册一个 watch exists, _, eventChannel, err := conn.ExistsW("/watch") if err != nil { fmt.Println("err:",err) return } go func() { // 从事件 channel 中取出事件 e := <-eventChannel fmt.Println("========================") fmt.Println("path:", e.Path) fmt.Println("type:", e.Type.String()) fmt.Println("state:", e.State.String()) }() if !exists { // 创建 临时节点/watch 时,触发监听事件,watch 失效 _, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) if err != nil { fmt.Println("err:",err) return } } }

8.微服务动态上下线监听(服务注册/发现) 8.1需求实现
微服务分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。需求实现: 服务端:服务端启动时,在zookeeper中创建临时有序节点,服务关闭时,临时节点自动删除了(zookeeper临时节点机制) 客户端:监听节点的变化

8.2服务端创建代码-(注册服务)
package mainimport ( "fmt" "time" "github.com/go-zookeeper/zk" )func main() { conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second) if err != nil { fmt.Println("err:",err) return } defer conn.Close() //创建的有序临时节点,创建此节点的会话结束后立即清除此节点 create -e -s ephemeralSequence, err := conn.Create("/servers/bikesvc", []byte("bikesvc"), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll)) if err != nil { fmt.Println("err:",err) return } fmt.Println("创建的有序临时节点:", ephemeralSequence) time.Sleep(time.Second*10) }

8.3客户端监听代码-(服务发现)
package mainimport ( "fmt" "time" "github.com/go-zookeeper/zk" )func mirror(conn *zk.Conn, path string) (chan []string, chan error) { snapshots := make(chan []string) errors := make(chan error) go func() { for { snapshot, _, events, err := conn.ChildrenW(path) if err != nil { errors <- err return } snapshots <- snapshot evt := <-events if evt.Err != nil { errors <- evt.Err return } } }() return snapshots, errors }func main() { conn, _, err := zk.Connect([]string{"192.168.0.215:2181", "192.168.0.216:2181", "192.168.0.217:2181"}, time.Second) if err != nil { fmt.Println("err:", err) return } defer conn.Close() snapshots, errors := mirror(conn, "/servers") //监控的根节点,根节点不能删除 go func() { for { select { case snapshot := <-snapshots: fmt.Println("监控变化:", snapshot) case err := <-errors: fmt.Println("err:", err) } } }() for { } }

结果: 服务端: 创建的有序临时节点: /servers/bikesvc0000000010客户端: 监控变化: [] 监控变化: [bikesvc0000000009] 监控变化: []

9.分布式锁
加锁进行资源保护 go-zookeeper 添加分布式锁的方法为NewLock(c *Conn, path string, acl []ACL)。 锁的结构体为: type Lock struct { c*Conn pathstring acl[]ACL lockPath string seqint }这个结构体实现了三个方法:Lock(),LockWithData(data []byte)和Unlock()

9.1分布式锁案例
根节点“/root”判断是否存在,不存在则创建

package mainimport ( "fmt" "sync" "time" "github.com/go-zookeeper/zk" )func main() { conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second) if err != nil { fmt.Println("err:",err) return } defer conn.Close() var wg sync.WaitGroup for i := 0; i < 2; i++ { wg.Add(1) go func(n int) { defer wg.Done() lock := zk.NewLock(conn, "/root/lock", zk.WorldACL(zk.PermAll)) //加锁 err = lock.LockWithData([]byte("it is a lock")) if err != nil { panic(err) } fmt.Println("第", n, "个 goroutine 获取到了锁") time.Sleep(time.Second*1) // 1 秒后释放锁 lock.Unlock()//解锁 }(i) } wg.Wait() }

这里给了两个进程抢锁,ls查看一下锁: 解释:把所有进程按有序排列,当成节点放入lock节点中,按照最小的序号执行。解锁一个删除一个。直到节点为空,进程执行完毕。 [zk: localhost:2181(CONNECTED) 32] ls /root/lock [_c_1dbbc1ec75b285ef10a6d6154627335c-lock-0000000153, _c_793a837ded040d01608395e5eac65979-lock-0000000152]先执行152,再执行153

9.2监控锁案例 监控锁节点变化
监控代码
package mainimport ( "fmt" "time" "github.com/go-zookeeper/zk" )func mirror(conn *zk.Conn, path string) (chan []string, chan error) { snapshots := make(chan []string) errors := make(chan error) go func() { for { snapshot, _, events, err := conn.ChildrenW(path) if err != nil { errors <- err return } snapshots <- snapshot evt := <-events if evt.Err != nil { errors <- evt.Err return } } }() return snapshots, errors }func main() { conn, _, err := zk.Connect([]string{"192.168.0.215:2181", "192.168.0.216:2181", "192.168.0.217:2181"}, time.Second) if err != nil { fmt.Println("err:", err) return } defer conn.Close() snapshots, errors := mirror(conn, "/root/lock") //监控的根节点,根节点不能删除 go func() { for { select { case snapshot := <-snapshots: fmt.Println("监控变化:", snapshot) case err := <-errors: fmt.Println("err:", err) } } }() for { } }

分布式锁代码
package mainimport ( "fmt" "sync" "time" "github.com/go-zookeeper/zk" )func main() { conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second) if err != nil { fmt.Println("err:",err) return } defer conn.Close() var wg sync.WaitGroup for i := 0; i < 2; i++ { wg.Add(1) go func(n int) { defer wg.Done() lock := zk.NewLock(conn, "/root/lock", zk.WorldACL(zk.PermAll)) //加锁 err = lock.LockWithData([]byte("it is a lock")) if err != nil { panic(err) } fmt.Println("第", n, "个 goroutine 获取到了锁") time.Sleep(time.Second*1) // 1 秒后释放锁 lock.Unlock()//解锁 }(i) } wg.Wait() }

结果:
zookeeper从小白到精通
文章图片

zookeeper从小白到精通
文章图片

    推荐阅读