目录
- 1. Zookeeper集群搭建
- 1.1 将集群服务器上分别放置zookeeper文件
- 1.2 修改对应服务器上的zookeeper配置文件
- 1.3 在dataDir的指定目录创建myid文件,文件内写入服务器的编号
- 1.4 启动集群服务器
- 2. zab(zookeeper atomic broadcast)原子广播协议
- 2.1 集群角色
- 3. Leader选举
- 3.1 启动时期
- 3.2 运行时期
- 4. observer角色及配置
- 5. Curator客户端
- 5.1 maven依赖
- 5.2 curator连接对象的创建以及重连机制
- 5.3 创建节点
- 5.4 更新节点
- 5.5 删除节点
- 5.6 查看节点数据
- 5.7 查看子节点
- 5.8 查看节点是否存在
- 5.9 监听机制
- 5.9.1 监听节点的新增和修改
- 5.9.2 监听子节点变化
- 5.10 事务操作
- 6. 分布式锁
- 6.1 排他锁
- 6.1.2 测试
- 6.2 读写锁
- 6.2.1 测试
1. Zookeeper集群搭建 1.1 将集群服务器上分别放置zookeeper文件 1.2 修改对应服务器上的zookeeper配置文件
# zookeeper对外公布的端口
clientPort = 2181# 快照文件,即服务器对数据的持久化文件
dataDir=/tmp/zookeeper# server.A=B:C:D
# A为zookeeper服务器编号 B为zookeeper服务器IP C为zookeeper服务器通信端口 D为zookeeperleader选举端口
server.1=192.168.0.1:2287:3387
server.2=192.168.0.2:2287:3387
1.3 在dataDir的指定目录创建myid文件,文件内写入服务器的编号
echo "1" > myid
1.4 启动集群服务器
./zkServer.sh start
2. zab(zookeeper atomic broadcast)原子广播协议 2.1 集群角色
角色 | 描述 |
---|---|
leader | 领导者,负责投票的发起和决议,更新系统状态 |
fallower | 跟随者,接收客户端请求并向客户端返回结果,在leader选举中进行投票 |
observer | 观察者,可以接收客户端连接,将写请求转发给leader,但observer不参与投票,只同步leader状态,observer的主要目的是扩展系统,提高读取速度 |
文章图片
- 客户端向集群任意一个服务器发送一个写请求
- 非leader节点将请求转发到leader节点
- leader节点向其他follower节点发送事务提议
- follower节点收到事务提议添加到历史队列中,向服务器发送响应
- 当leader收到超过一半以上节点的反馈后,leader向follower发送提交请求
- follower节点收到提交请求后将历史队列进行commit操作
文章图片
3.2 运行时期
文章图片
4. observer角色及配置
- 在观察者服务器的配置文件中加以下配置
peerType=observer
- 在所有的服务器zookeeper配置文件中添加以下配置
server.3=192.168.1.3:2289:3389:observer
- 重启
5.1 maven依赖
org.apache.curator
curator-framework
2.12.0
org.apache.curator
curator-recipes
2.12.0
5.2 curator连接对象的创建以及重连机制
package cn.tianqb.zookeeper.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.junit.Test;
/**
* @Description:
* @Author tianqb
* @Mail tianqingbo@tianqb.cn
* @Date 2020/5/24 20:48
* @Version v1.0
*/
public class ConnDemo {@Test
public void conn() {
// 美[kj??re?t?r] Curator
CuratorFramework client = CuratorFrameworkFactory.builder()
// 服务器地址
.connectString("192.168..0.1:2181")
// 会话超时时间
.connectionTimeoutMs(5000)
// 会话断开连接后的3000ms尝试再次连接(重连机制)
.retryPolicy(new RetryOneTime(3000))
// 命名空间(根节点)
.namespace("curator")
.build();
// 启动连接
client.start();
System.out.println(client.isStarted());
client.close();
}/**
* 重连机制
*/
public void retryTime() {
// 重连一次
// arg0:重连间隔
// RetryOneTime(int sleepMsBetweenRetry)// 重连N次
// arg0:连接次数arg1:间隔
// RetryNTimes(int n, int sleepMsBetweenRetries)// 一直重连
// arg0:重连间隔
// RetryForever(int retryIntervalMs)// 超过重连总时长,不再重连
// arg0:重连总时长arg1:重连间隔
// RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)// 随着次数的增加,重连间隔变长
// long sleepMs = (long)(this.baseSleepTimeMs * Math.max(1, this.random.nextInt(1 << retryCount + 1)));
// ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
}}
5.3 创建节点
@Test
public void create1() throws Exception {
String path = client.create()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/node1", "node1".getBytes());
System.out.println(path);
}@Test
public void create2() throws Exception {
List acl = new ArrayList<>();
acl.add(new ACL(ZooDefs.Perms.ALL, new Id("ip", "192.168.1.105")));
String path = client.create()
.withMode(CreateMode.PERSISTENT_SEQUENTIAL)
.withACL(acl)
.forPath("/node2", "node2".getBytes());
System.out.println(path);
}/**
* 递归创建,若父节点不存在
*
* @throws Exception
*/
@Test
public void create3() throws Exception {
List acl = new ArrayList<>();
acl.add(new ACL(ZooDefs.Perms.ALL, new Id("ip", "192.168.1.105")));
String path = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT_SEQUENTIAL)
.withACL(acl)
.forPath("/node3/node33", "node3".getBytes());
System.out.println(path);
}/**
* 异步创建
*
* @throws Exception
*/
@Test
public void create4() throws Exception {
List acl = new ArrayList<>();
acl.add(new ACL(ZooDefs.Perms.ALL, new Id("ip", "192.168.1.105")));
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(acl)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println(curatorEvent.getPath());
System.out.println(curatorEvent.getType());
}
})
.forPath("/node4/node44", "node44".getBytes());
Thread.sleep(5000);
5.4 更新节点
@Test
public void update() throws Exception {
Stat stat = client.setData()
// 指定版本
.withVersion(-1)
// 指定路径和数据
.forPath("/node1", "node1111111".getBytes());
System.out.println(stat);
}
5.5 删除节点
@Test
public void delete1() throws Exception {
client.delete()
// 指定版本
.withVersion(-1)
// 指定路径和数据
.forPath("/node4/node44");
System.out.println("结束!");
}/**
* 删除不为空节点,若当前节点不为空,默认无法删除
* @throws Exception
*/
@Test
public void delete2() throws Exception {
client.delete()
// 包含子节点
.deletingChildrenIfNeeded()
// 指定版本
.withVersion(-1)
// 指定路径和数据
.forPath("/node1");
System.out.println("结束!");
}
5.6 查看节点数据
@Test
public void getData() throws Exception {
byte[] bytes = client.getData()
.forPath("/node4");
System.out.println(new String(bytes));
}/**
* 同时获取属性
* @throws Exception
*/
@Test
public void getData2() throws Exception {
Stat stat = new Stat();
byte[] bytes = client.getData()
// 获取属性
.storingStatIn(stat)
.forPath("/node4");
System.out.println(new String(bytes));
System.out.println(stat.getAversion());
}
5.7 查看子节点
@Test
public void getChildren() throws Exception {
List> chs = client.getChildren()
.forPath("/");
chs.forEach(System.out::println);
}
5.8 查看节点是否存在
@Test
public void update() throws Exception {
Stat stat = client.checkExists()
.forPath("/node1");
System.out.println(stat != null);
}
5.9 监听机制 5.9.1 监听节点的新增和修改
@Test
public void watch1() throws Exception {
/**
* 监听节点的新增和修改
*/
NodeCache nodeCache = new NodeCache(client, "/create2");
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println(nodeCache.getCurrentData().getPath());
System.out.println(new String(nodeCache.getCurrentData().getData()));
}
});
Thread.sleep(60000);
nodeCache.close();
}
5.9.2 监听子节点变化
@Test
public void watch2() throws Exception {
/**
* 监听子节点变化
* arg0 : 连接对象
* arg1 : 监听路径
* arg2 : 是否可以获取节点数据
*/
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/create2", true);
pathChildrenCache.start();
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println(pathChildrenCacheEvent.getData().getPath());
System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
System.out.println(pathChildrenCacheEvent.getType());
}
});
Thread.sleep(60000);
pathChildrenCache.close();
}
5.10 事务操作
@Test
public void transaction() throws Exception {/**
* 事务操作,一旦其中一个操作发生错误,则全部操作都无法提交
* 原子性
*/
try{
client.inTransaction()
.create()
.forPath("/transaction")
.and()
.delete()
.forPath("/xxx")
.and()
.commit();
}catch (Exception e) {
System.out.println("事务操作发生错误");
e.printStackTrace();
}
}
6. 分布式锁 6.1 排他锁
package cn.tianqb.zookeeper.curator.lock;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryOneTime;
/**
* @Description:
* @Author tianqb
* @Mail tianqingbo@tianqb.cn
* @Date 2020/5/26 13:51
* @Version v1.0
*/
public class InterProcessMutexDemo {private CuratorFramework client;
private void init() {
client = CuratorFrameworkFactory.builder()
.connectString("192.168.0.1:2181")
.connectionTimeoutMs(5000)
.retryPolicy(new RetryOneTime(3000))
.build();
client.start();
}public InterProcessMutexDemo() {
init();
}public void action() {
// 排他锁
InterProcessLock interProcessLock = new InterProcessMutex(client, "/create");
try {
interProcessLock.acquire();
System.out.println("客户端" + Thread.currentThread().getName() + "获取锁成功");
for (int i = 0;
i < 5;
i++) {
Thread.sleep(2000);
System.out.println(i);
}
interProcessLock.release();
System.out.println("客户端" + Thread.currentThread().getName() + "释放锁成功");
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}private void close() {
client.close();
}
}
6.1.2 测试
package cn.tianqb.zookeeper.curator.lock;
/**
* @Description:
* @Author tianqb
* @Mail tianqingbo@tianqb.cn
* @Date 2020/5/26 14:16
* @Version v1.0
*/
public class Main {public static void main(String[] args) {
for (int i = 0;
i < 5;
i++) {
new Thread(() -> {
new InterProcessMutexDemo().action();
}, i + "").start();
}
}
}
6.2 读写锁
package cn.tianqb.zookeeper.curator.lock;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.retry.RetryOneTime;
/**
* @Description:
* @Author tianqb
* @Mail tianqingbo@tianqb.cn
* @Date 2020/5/26 15:49
* @Version v1.0
*/
public class InterProcessReadWriteDemo {private CuratorFramework client;
private void init() {
client = CuratorFrameworkFactory.builder()
.connectString("192.168.0.1:2181")
.connectionTimeoutMs(5000)
.retryPolicy(new RetryOneTime(3000))
.build();
client.start();
}public InterProcessReadWriteDemo() {
init();
}public void readLock() throws Exception {
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/create");
InterProcessMutex interProcessMutex = interProcessReadWriteLock.readLock();
interProcessMutex.acquire();
System.out.println("客户端" + Thread.currentThread().getName() + "获取锁成功");
for (int i = 0;
i < 5;
i++) {
Thread.sleep(2000);
System.out.println(i);
}
interProcessMutex.release();
System.out.println("客户端" + Thread.currentThread().getName() + "释放锁成功");
close();
}public void writeLock() throws Exception {
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/create");
InterProcessMutex interProcessMutex = interProcessReadWriteLock.writeLock();
interProcessMutex.acquire();
System.out.println("客户端" + Thread.currentThread().getName() + "获取锁成功");
for (int i = 0;
i < 5;
i++) {
Thread.sleep(2000);
System.out.println(i);
}
interProcessMutex.release();
System.out.println("客户端" + Thread.currentThread().getName() + "释放锁成功");
close();
}private void close() {
client.close();
}
}
6.2.1 测试
package cn.tianqb.zookeeper.curator.lock;
/**
* @Description:
* @Author tianqb
* @Mail tianqingbo@tianqb.cn
* @Date 2020/5/26 15:54
* @Version v1.0
*/
public class ReadWriteLockMain {
public static void main(String[] args) {
// 读读异步
// 读写同步
// 写写同步for (int i = 0;
i < 5;
i++) {
new Thread(() -> {
InterProcessReadWriteDemo interProcessReadWriteDemo = new InterProcessReadWriteDemo();
try {
interProcessReadWriteDemo.writeLock();
//interProcessReadWriteDemo.readLock();
} catch (Exception e) {
e.printStackTrace();
}
}, "" + i).start();
new Thread(() -> {
InterProcessReadWriteDemo interProcessReadWriteDemo = new InterProcessReadWriteDemo();
try {
//interProcessReadWriteDemo.readLock();
interProcessReadWriteDemo.writeLock();
} catch (Exception e) {e.printStackTrace();
}
}, "" + (i + 5)).start();
}}
}
推荐阅读
- 分布式集群|一起进阶学习JAVA(Zookeeper)
- zookeeper|zookeeper的简单例子带你认识zk的API
- zookeeper使用IDEA测试API出现KeeperException$ConnectionLossException解决思路
- zookeeper|Zookeeper案例
- Java|乐观锁和悲观锁在zookeeper中的应用
- 拜占庭将军-分布式领域的幽灵
- java|应用 | Redis实现 主从,单例,集群,哨兵,配置应用
- spring-boot|zookeeper与grpc集成实现服务注册与发现
- zookeeper 启动报错 : KeeperErrorCode = NoNode for /XXX