当前位置: 首页 > news >正文

Zookeeper 4 Zookeeper JavaAPI 操作 4.7 Curator API 常用操作【Watch 事件监听】

Zookeeper

【黑马程序员Zookeeper视频教程,快速入门zookeeper技术】

文章目录

      • Zookeeper
      • 4 Zookeeper JavaAPI 操作
        • 4.7 Curator API 常用操作
          • 4.7.1 Watch 事件监听

4 Zookeeper JavaAPI 操作

4.7 Curator API 常用操作

4.7.1 Watch 事件监听

【基本概念】

ZooKeeper 允许用户在指定节点上注册一些Watcher( 监听器),并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。

[举个栗子]

在这里插入图片描述

OK, 这里是一个 服务端,三个客户端【3 个应用程序】,他们仨都可以通过 Curator 写代码来操作 服务端

在这里插入图片描述

而下面这张图 就是 Zookeeper 的数据模型中的节点 和信息

观察者模式

ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者(即上面 图中的应用程序)。

ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员自己反复注册Watcher,比较繁琐。

Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。

ZooKeeper提供了三种Watcher:

  • NodeCache : 只是监听某一个特定的节点
  • PathChildrenCache : 监控一个ZNode的子节点.
  • TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

【NodeCache 实现】

复制我们 之前那个测试类

在这里插入图片描述

保留建立连接 和 释放资源的代码

    private CuratorFramework client;

    /**
     * 建立连接
     * */
    @Before
    public void testConnect(){

        //1. 第一种方式
        /*
         * Create a new client
         *
         * @param connectString       list of servers to connect to【连接字符串,zk server的 地址和端口】
         * @param sessionTimeoutMs    session timeout【会话超时时间,单位ms】
         * @param connectionTimeoutMs connection timeout【连接超时时间,单位ms】
         * @param retryPolicy         retry policy to use【重试策略】
         */

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
//        CuratorFramework client = CuratorFrameworkFactory
//                .newClient("43.138.50.253:2181", 60 * 1000, 15 * 1000, retryPolicy);
//
//        //开启连接
//        client.start();

        //2. 第二种方式
        client = CuratorFrameworkFactory.builder()
                .connectString("自己的服务器IP:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .namespace("dingjiaxiong")
                .build();

        //第二种方式开启连接
        client.start();
    }

    @After
    public void close(){

        if (client != null){
            client.close();
        }
    }

编写测试方法

先来个节点

在这里插入图片描述

OK,对它进行监听

/**
 * 演示NodeCache:给指定的一个节点注册监听器
 * */
@Test
public void testNodeCache() throws Exception {

    //1. 创建NodeCache 对象
    NodeCache nodeCache = new NodeCache(client,"/app1");

    //2. 注册监听
    nodeCache.getListenable().addListener(new NodeCacheListener() {
        @Override
        public void nodeChanged() throws Exception {
            System.out.println("节点变化了");
        }
    });

    //3. 开启监听【设置为true,则开启监听时加载 缓冲数据】
    nodeCache.start(true);

    while (true){
        
    }

}

运行

在这里插入图片描述

OK,现在就在 被监听了

现在去 修改一下 /app1 节点的值

在这里插入图片描述

OK, 查看控制台

在这里插入图片描述

没毛病,修改可以 感知到

删除app1 节点

在这里插入图片描述

OK, 删除也会被监听到

创建

在这里插入图片描述

创建也行

现在试试 “获取 变化”

@Test
public void testNodeCache() throws Exception {

    //1. 创建NodeCache 对象
    final NodeCache nodeCache = new NodeCache(client,"/app1");

    //2. 注册监听
    nodeCache.getListenable().addListener(new NodeCacheListener() {
        @Override
        public void nodeChanged() throws Exception {
            System.out.println("节点变化了");

            //获取修改节点后的数据
            byte[] data = nodeCache.getCurrentData().getData();
            System.out.println(new String(data));
        }
    });

    //3. 开启监听【设置为true,则开启监听时加载 缓冲数据】
    nodeCache.start(true);

    while (true){

    }

}

OK, 重新运行这个方法

在这里插入图片描述

OK,这样就行 了

【这就是 对一个节点进行监听 的NodeCache】

【PathChildrenCache】:监控一个ZNode 的子节点

/**
 * 演示PathChildrenCache:监控一个ZNode的子节点.【监听某个节点的所有子节点们】
 * */
@Test
public void testPathChildrenCache() throws Exception {

    

    while (true){

    }

}

直接干

先来一个 app2 节点

在这里插入图片描述

OK

@Test
public void testPathChildrenCache() throws Exception {

    //1. 创建监听对象
    PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);

    //2. 绑定监听器
    pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
        @Override
        public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
            System.out.println("子节点变化了");

            System.out.println(pathChildrenCacheEvent);
        }
    });

    //3. 开启监听
    pathChildrenCache.start();

    while (true){

    }

}

直接运行

在这里插入图片描述

可以看到刚启动,就来了一些东西【意思是 重新建立了 连接 】【这个不是我们 要关心的】

创建一个 app2 的子节点

在这里插入图片描述

OK,事件 类型是 “孩子_添加了”,然后就是 path ,状态信息,数据

给p1 来个值

在这里插入图片描述

这次的事件类型 就是 “孩子_ 修改了”

删除 p1

在这里插入图片描述

没毛病

试试给app2 自己来个值

在这里插入图片描述

OK, 它感知不到

[来个 小练习]

监听子节点的变更,并且拿到变更后的数据

@Test
public void testPathChildrenCache() throws Exception {

    //1. 创建监听对象
    PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);

    //2. 绑定监听器
    pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
        @Override
        public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
            System.out.println("子节点变化了");
            System.out.println(pathChildrenCacheEvent);

            // 1. 获取类型
            PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();

            //2. 判断类型是否是update
            if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){

                byte[] data = pathChildrenCacheEvent.getData().getData();
                System.out.println(new String(data));
            }
        }
    });

    //3. 开启监听
    pathChildrenCache.start();

    while (true){

    }

}

OK, 直接启动测试

创建p2

在这里插入图片描述

OK, 监听到 子节点变化了, 但是并不是更新,所以没有通过if 判断,修改一下 p2 的数据

在这里插入图片描述

没毛病

【这就是PathChildrenCache,注意的是它只能感知到 “儿子“们 的变化,不能感知到 自己的】

【还有最后一个 TreeCahe】:可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

直接试试

/**
 * 演示TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合【监听某个节点自己和所有子节点们】
 * */
@Test
public void testTreeCache() throws Exception {

    

    while (true){

    }

}

OK

在这里插入图片描述

就监听 app2 吧

/**
 * 演示TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合【监听某个节点自己和所有子节点们】
 * */
@Test
public void testTreeCache() throws Exception {
    //1. 创建监听器
    TreeCache treeCache = new TreeCache(client,"/app2");

    //2. 注册监听
    treeCache.getListenable().addListener(new TreeCacheListener() {
        @Override
        public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
            System.out.println("节点变化了");

            System.out.println(treeCacheEvent);
        }
    });

    //3. 开启监听
    treeCache.start();

    while (true){

    }

}

直接启动

在这里插入图片描述

当然这些 都是缓存信息,不看’

修改 app2 自己的数据

在这里插入图片描述

修改儿子 p1

在这里插入图片描述

OK,也监听到了

【这就是 TreeCache】

相关文章:

  • 【发表案例】计算机科学类SCI,仅1个月零6天录用,涵盖软件、信息、数据云计算、网络、建模等研究方向
  • 【JavaScript】15_debug,立即执行函数 与 严格模式
  • 智能制造数字化转型难点有哪些?
  • 【UE4 第一人称射击游戏】06-设置动画角色2
  • U-net
  • pytest-需要模块相应的库
  • VSCode连GitHub的代理服务器配置和获取历史版本命令
  • 笔试训练(4)
  • “世界上最鸽派”的央行转鹰,透露了什么信号?
  • 从“小螺栓血案”谈装配体模型连接螺栓6个正确的处理方法
  • 讯飞听见SaaS服务迈入全新时代
  • mapstruct 无法生成字段映射code
  • 全志V853常用模型跑分数据
  • 迅为3A5000开发板龙芯自主指令集从里到外100%全国产设计方案
  • 安卓面经<15/30>之SharedPreference解析
  • LeetCode 10. 正则表达式匹配(C++)*
  • 《计算机体系结构量化研究方法》第2章-存储器层次结构设计 2.1 引言
  • android 9.0 屏蔽所有电话来电功能
  • 代替塞规的高精度孔径测量方法——泊肃叶压差法
  • Vue的父传子