Zookeeper会话的通知机制

即使创建的会话是不同的会话,但是倘若zookeeper集群某个地方因为某个操作发生了改变,例如:节点增加了,或者删除了等等,这些不同的会话每个都会收到该节点变化的事件(注意需要是监控了的),看如下例子

节点操作类

package seessionTest;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.List;

public class Test {
    private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private final int sessionTimeout = 10 * 1000;
    private ZooKeeper zk;
    public Test() throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                //                    List<String> childs = zk.getChildren("/server1", true);
//                    for (String child : childs) {
//                        System.out.println();
//
//                        System.out.println(Thread.currentThread().getName() + "收到");
//                    }
                if(watchedEvent.getType()==Event.EventType.NodeDeleted){
                    System.out.println( );
                    System.out.println(Thread.currentThread().getName() + "收到");
                }
            }
        });

        zk.getData("/server1",true,null);

    }
}

线程类

package seessionTest;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

public class operation {
    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {





        new Thread() {
            public void run() {
                this.setName("线程1");
                try {
                    Test test1 = new Test();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        }.start();

        new Thread() {
            public void run() {
                this.setName("线程2");
                try {
                    Test test2 = new Test();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        }.start();

        new Thread() {
            public void run() {
                this.setName("线程3");
                try {
                    Test test3 = new Test();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        }.start();

        new Thread() {
            public void run() {
                this.setName("线程4");
                try {
                    Test test4 = new Test();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        }.start();
        Thread.sleep(Long.MAX_VALUE);
    }
}

结果

线程1-EventThread收到


线程4-EventThread收到

线程3-EventThread收到
线程2-EventThread收到

Process finished with exit code -1


版权声明:本文为qq_26076091原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。