【Zookeeper】——服务器动态上下线监听案例&分布式锁案例

一、服务器动态上下线监听案例

1. 简介

在这里插入图片描述
<1> 启动Zookeeper集群
<2> 在服务器启动时在Zookeeper创建临时节点
<3> 获取当前在线服务器列表,并注册监听
<4> 服务器下线
<5> 服务器节点上的下线事件通知

2. 实现

<1>并创建/servers节点

create /servers "servers"

<2> DistributeServer

package com.zookeeper.case2;
import com.zookeeper.utils.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
/**
 * @author : hechaogao
 * @createTime : 2022/9/15 15:04
 */
public class DistributeServer {
    /**
     * zookeeper客户端
     */
    private static ZooKeeper zkClient;
    public static void main(String[] args) {
        //1. 获取连接
        zkClient = ZkUtils.getConnect();
        //2. 注册服务器(创建路径)
        regist(args[0]);
        //3. 启动具体业务(sleep)
        business();
    }
    public static void regist(String hostname) {
        /**
         * param1 创建的节点的目录
         * param2 创建的节点的值
         * param3 ACL访问权限
         * param4 创建的节点类型,这里选择的持久类型
         *
         */
        try {
            zkClient = ZkUtils.getConnect();
            zkClient.create("/servers/" + hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static void business() {
        try {
            Thread.sleep(Long.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

<3> DistributeClient

package com.zookeeper.case2;
import com.zookeeper.utils.ZkUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.List;
/**
 * @author : hechaogao
 * @createTime : 2022/9/15 15:25
 */
public class DistributeClient {
    /**
     * zookeeper客户端
     */
    private static ZooKeeper zkClient;
    public static void main(String[] args) {
        //1. 获取连接
        zkClient = ZkUtils.getConnect();
        //2. 监听/servers 下面的子节点
        getServerList();
        //3. 业务逻辑 sleep
        business();
    }
    public static void getServerList() {
        try {
            List<String> children = zkClient.getChildren("/servers", new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    getServerList();
                }
            });
            System.out.println("-------------------------------------");
            for (String child : children) {
                byte[] data = zkClient.getData("/servers/" + child, false, null);
                System.out.println("servser name : " + new String(data));
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static void business() {
        try {
            Thread.sleep(Long.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

<4> ZkUtils

package com.zookeeper.utils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class ZkUtils {
    /**
     * 逗号前后不能有空格
     */
    private static final String CONNECTSTR = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    /**
     * 单位毫秒
     */
    private static final int SESSIONTIMEOUT = 2 * 1000;
    public static ZooKeeper getConnect() {
        try {
            ZooKeeper zkClient = new ZooKeeper(CONNECTSTR, SESSIONTIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                }
            });
            return zkClient;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
}

二、分布式锁案例

1. 简介

在这里插入图片描述
<1> 启动Zookeeper集群,并创建/locks节点
<2> 接收到请求后,在/locks节点下创建一个临时顺序节点
<3> 判断自己是不是当前节点下最小的节点,如果是,则获取到锁,如果不是则对前一个节点进行监听
<4> 获取到锁,处理完业务后,delete节点释放锁,然后下面的节点将收到通知,重复判断第二步判断

2. 实现

<1> DistributeLock

package com.zookeeper.case3;

import com.zookeeper.utils.ZkUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @author : hechaogao
 * @createTime : 2022/9/15 21:16
 */
public class DistributeLock {


    /**
     * zookeeper客户端
     */
    private ZooKeeper zkClient;

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    private String currentNode;

    public DistributeLock() {
        try {
            //1. 获取连接
            zkClient = ZkUtils.getConnect();

            //2. 判断根节点/locks是否存在
            Boolean exists = exists("/locks");

            //3. 不存在则创建
            if (!exists) {
                String nodeCreasted = zkClient.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }

        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 对zk加锁
     */
    public void zklock() throws InterruptedException, KeeperException {

        //1. 创建临时带序号的节点
        currentNode = zkClient.create("/locks/seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        //2. 判断创建的节点是否是最小序号的节点  如果是 则获取到锁  如果不是 就监听比当前节点序号小的节点
        List<String> children = zkClient.getChildren("/locks", true);
        //2.1 进行排序
        Collections.sort(children);
        //2.2 获取节点名称
        String currentNodeName = currentNode.substring("/locks/".length());
        //2.3 获取当前节点 在children集合中的位置
        int index = children.indexOf(currentNodeName);
        if (index == -1) {
            //2.4 不存在则返回异常
            System.out.println("currentNode not exists on children list waring...");
        } else if (index == 0) {
            //2.5 就一个节点,可以直接获取锁
            return;
        } else {
            //2.6 需要监听当前节点的前一个节点
            String waitPath = "/locks/" + children.get(index - 1);
            zkClient.getData(waitPath, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    //2.7 如果触发监听器,并且是删除了上一个节点 ,则获取锁
                    if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                        countDownLatch.countDown();
                    }
                }
            }, null);
            //2.7 等在监听结束再往下走
            countDownLatch.await();
            return;
        }
    }

    /**
     * 对zk解锁
     */
    public void unZklock() throws InterruptedException, KeeperException {
        //1. 删除节点
        zkClient.delete(currentNode, -1);
    }


    /**
     * 判断 路径是否存在
     *
     * @param path 要判断的路径
     * @return true 存在,false 不存在
     */
    public Boolean exists(String path) throws InterruptedException, KeeperException {
        /**
         * param1 路径
         * param2 是否监听
         */
        Stat stat = null;
        stat = zkClient.exists(path, false);
        return stat != null ? true : false;
    }
}

<2> DistributeLockTest

package com.zookeeper.case3;


/**
 * @author : hechaogao
 * @createTime : 2022/9/15 22:31
 */
public class DistributeLockTest {
    public static void main(String[] args) {
        final DistributeLock lock1 = new DistributeLock();
        final DistributeLock lock2 = new DistributeLock();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.zklock();
                    System.out.println("lock1 get lock");
                    Thread.sleep(5 * 1000);
                    System.out.println("lock1 remove lock");
                    lock1.unZklock();
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.zklock();
                    System.out.println("lock2 get lock");
                    Thread.sleep(5 * 1000);
                    System.out.println("lock2 remove lock");
                    lock2.unZklock();
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
        }).start();
    }
}

<3> ZkUtils

package com.zookeeper.utils;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;

/**
 * @author : hechaogao
 * @createTime : 2022/9/15 15:06
 */
public class ZkUtils {
    /**
     * 逗号前后不能有空格
     */
    private static final String CONNECTSTR = "hadoop102:2181,hadoop103:2181,hadoop104:2181";

    /**
     * 单位毫秒
     */
    private static final int SESSIONTIMEOUT = 200 * 1000;


    public static ZooKeeper getConnect() {

        try {
            ZooKeeper zkClient = new ZooKeeper(CONNECTSTR, SESSIONTIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {

                }
            });

            return zkClient;
        } catch (IOException e) {
            e.printStackTrace();
        }

        return null;
    }

}


版权声明:本文为qq_42000631原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。