Zookeeper 4 Zookeeper JavaAPI 操作 4.7 Curator API 常用操作【Watch 事件监听】
Zookeeper
【黑马程序员Zookeeper视频教程,快速入门zookeeper技术】
文章目录
- Zookeeper
- 4 Zookeeper JavaAPI 操作
- 4.7 Curator API 常用操作
- 4.7.1 Watch 事件监听
4 Zookeeper JavaAPI 操作
4.7 Curator API 常用操作
4.7.1 Watch 事件监听
【基本概念】
ZooKeeper 允许用户在指定节点上注册一些Watcher( 监听器),并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。
[举个栗子]
OK, 这里是一个 服务端,三个客户端【3 个应用程序】,他们仨都可以通过 Curator 写代码来操作 服务端
而下面这张图 就是 Zookeeper 的数据模型中的节点 和信息
观察者模式
ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者(即上面 图中的应用程序)。
ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员自己反复注册Watcher,比较繁琐。
Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。
ZooKeeper提供了三种Watcher:
- NodeCache : 只是监听某一个特定的节点
- PathChildrenCache : 监控一个ZNode的子节点.
- TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合
【NodeCache 实现】
复制我们 之前那个测试类
保留建立连接 和 释放资源的代码
private CuratorFramework client;
/**
* 建立连接
* */
@Before
public void testConnect(){
//1. 第一种方式
/*
* Create a new client
*
* @param connectString list of servers to connect to【连接字符串,zk server的 地址和端口】
* @param sessionTimeoutMs session timeout【会话超时时间,单位ms】
* @param connectionTimeoutMs connection timeout【连接超时时间,单位ms】
* @param retryPolicy retry policy to use【重试策略】
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
// CuratorFramework client = CuratorFrameworkFactory
// .newClient("43.138.50.253:2181", 60 * 1000, 15 * 1000, retryPolicy);
//
// //开启连接
// client.start();
//2. 第二种方式
client = CuratorFrameworkFactory.builder()
.connectString("自己的服务器IP:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.namespace("dingjiaxiong")
.build();
//第二种方式开启连接
client.start();
}
@After
public void close(){
if (client != null){
client.close();
}
}
编写测试方法
先来个节点
OK,对它进行监听
/**
* 演示NodeCache:给指定的一个节点注册监听器
* */
@Test
public void testNodeCache() throws Exception {
//1. 创建NodeCache 对象
NodeCache nodeCache = new NodeCache(client,"/app1");
//2. 注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了");
}
});
//3. 开启监听【设置为true,则开启监听时加载 缓冲数据】
nodeCache.start(true);
while (true){
}
}
运行
OK,现在就在 被监听了
现在去 修改一下 /app1 节点的值
OK, 查看控制台
没毛病,修改可以 感知到
删除app1 节点
OK, 删除也会被监听到
创建
创建也行
现在试试 “获取 变化”
@Test
public void testNodeCache() throws Exception {
//1. 创建NodeCache 对象
final NodeCache nodeCache = new NodeCache(client,"/app1");
//2. 注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了");
//获取修改节点后的数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
//3. 开启监听【设置为true,则开启监听时加载 缓冲数据】
nodeCache.start(true);
while (true){
}
}
OK, 重新运行这个方法
OK,这样就行 了
【这就是 对一个节点进行监听 的NodeCache】
【PathChildrenCache】:监控一个ZNode 的子节点
/**
* 演示PathChildrenCache:监控一个ZNode的子节点.【监听某个节点的所有子节点们】
* */
@Test
public void testPathChildrenCache() throws Exception {
while (true){
}
}
直接干
先来一个 app2 节点
OK
@Test
public void testPathChildrenCache() throws Exception {
//1. 创建监听对象
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
//2. 绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点变化了");
System.out.println(pathChildrenCacheEvent);
}
});
//3. 开启监听
pathChildrenCache.start();
while (true){
}
}
直接运行
可以看到刚启动,就来了一些东西【意思是 重新建立了 连接 】【这个不是我们 要关心的】
创建一个 app2 的子节点
OK,事件 类型是 “孩子_添加了”,然后就是 path ,状态信息,数据
给p1 来个值
这次的事件类型 就是 “孩子_ 修改了”
删除 p1
没毛病
试试给app2 自己来个值
OK, 它感知不到
[来个 小练习]
监听子节点的变更,并且拿到变更后的数据
@Test
public void testPathChildrenCache() throws Exception {
//1. 创建监听对象
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
//2. 绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点变化了");
System.out.println(pathChildrenCacheEvent);
// 1. 获取类型
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
//2. 判断类型是否是update
if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
byte[] data = pathChildrenCacheEvent.getData().getData();
System.out.println(new String(data));
}
}
});
//3. 开启监听
pathChildrenCache.start();
while (true){
}
}
OK, 直接启动测试
创建p2
OK, 监听到 子节点变化了, 但是并不是更新,所以没有通过if 判断,修改一下 p2 的数据
没毛病
【这就是PathChildrenCache,注意的是它只能感知到 “儿子“们 的变化,不能感知到 自己的】
【还有最后一个 TreeCahe】:可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合
直接试试
/**
* 演示TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合【监听某个节点自己和所有子节点们】
* */
@Test
public void testTreeCache() throws Exception {
while (true){
}
}
OK
就监听 app2 吧
/**
* 演示TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合【监听某个节点自己和所有子节点们】
* */
@Test
public void testTreeCache() throws Exception {
//1. 创建监听器
TreeCache treeCache = new TreeCache(client,"/app2");
//2. 注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println("节点变化了");
System.out.println(treeCacheEvent);
}
});
//3. 开启监听
treeCache.start();
while (true){
}
}
直接启动
’
当然这些 都是缓存信息,不看’
修改 app2 自己的数据
修改儿子 p1
OK,也监听到了
【这就是 TreeCache】