zookeeper详解

概念: zookeeper是apache hadoop项目下的一个子项目,是一个树形目录服务,是一个开源的分布式的应用程序协调服务

主要功能:

一、配置管理

二、分布式锁

三、集群管理

数据模型: zookeeper是一个树形的目录结构,其数据模型和linux的文件系统目录树很类似,拥有一个层次结构,每个节点被称为znode,每个节点上都会保存自己的数据和节点信息,节点可以拥有自己的子节点,同时也允许少量的数据(1m)存储在该节点下。

节点可以分为四大类:

PERSISTENT 持久化节点

 EPHEMERAL 临时节点 -e

 PERSISTENT SEQUENTIAL 持久化顺序节点 -s

EPHEMERAL SEQUENTIAL 临时顺序节点 -es

zookeeper服务端常用命令:

./zkServer.sh start                                启动

./zkServer.sh status                             查看服务状态

./zkServer.sh stop                                停止

 ./zkServer.sh restart                            重启

zookeeper客户端命令:

连接到服务       ./zkCli.sh -server ip:port

退出          quit

查看根节点    ls /      【ls /dubbo/node】

创建节点   create   /app1  数据

获取数据  get /app1

设置或者修改数据   set /app1  数据

删除节点   delete /app1    【有子节点不能删除】

删除有子节点的节点    deleteall /app1   【有子节点可以删除】

 创建顺序节点        create -s  /app1  数据

 创建临时节点        create -e /app1 数据

 创建临时顺序节点   create -es /app1 数据

JAVA客户端Curator

package com.wangxk;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * zookeeper的Java客户端Curator
 */
public class Demo {
    public CuratorFramework client = null;
    /**
     * 创建连接
     */
    @BeforeEach
    public void getConnection(){
        System.out.println("正在创建zookeeper连接");
        //重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(30000, 10);

        // 第一种创建方法
        // CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);

        // 第二种创建方法
        client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(60*1000)
                .connectionTimeoutMs(15*1000)
                .namespace("wangxk")
                .retryPolicy(retryPolicy)
                .build();

        //开启连接
        client.start();

        System.out.println("创建连接" + client);
    }

    /**
     * 释放资源
     */
    @AfterEach
    public void close(){
        System.out.println("程序结束,释放资源");
        client.close();
    }

    /**
     * 创建节点
     */
    @Test
    public void createNode() throws Exception {
        // 默认创建持久化节点
        String path = client.create().forPath("/zmj","北京芝麻蕉".getBytes(StandardCharsets.UTF_8));
        //指定创建节点的类型
        // client.create().withMode(CreateMode.EPHEMERAL).forPath("/yyy", "北京云易扬".getBytes(StandardCharsets.UTF_8));
        //若没有父级节点则创建父级节点
        // client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/911/512");
        System.out.println(path);
    }

    /**
     * 查询数据
     */
    @Test
    public void query() throws Exception {
        // 查询数据
        byte[] bytes = client.getData().forPath("/zmj");
        System.out.println(new String(bytes, StandardCharsets.UTF_8));

        //查询子节点
        List<String> nodes = client.getChildren().forPath("/");
        for (String node : nodes) {
            System.out.println(node);
        }

        //查询节点状态信息
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath("/zmj");
        System.out.println(stat);
    }

    /**
     * 修改节点
     */
    @Test
    public void update() throws Exception {
        //修改数据
        client.setData().forPath("/zmj", "小姐姐".getBytes(StandardCharsets.UTF_8));
        System.out.println("修改数据成功!");

        //根据版本修改,版本可以从获取节点信息中获取
        int version = 3;
        //client.setData().withVersion(version).forPath("/zmj", "小姐姐".getBytes(StandardCharsets.UTF_8));

    }

    /**
     * 删除节点
     */
    public void delete() throws Exception {
        // 删除单个节点
        client.delete().forPath("/zmj");

        //删除带子节点的节点
        client.delete().deletingChildrenIfNeeded().forPath("/zmj");

        // 必须删除成功
        client.delete().guaranteed().forPath("/zmj");

        // 删除后回调
        client.delete().guaranteed().inBackground((x, y) -> {
            System.out.println("删除了");
            System.out.println(y);
        });
    }

    /**
     * zookeeper允许用户在指定节点上注册一些watcher,并且在一些特定的事件触发的时候,zookeeper服务端会将事件通知到感兴趣的客户端上去,该机制是
     * zookeeper实现分布式协调服务的重要特性。
     *  zookeeper提供三种watcher:
     *      NodeCache           监控一个特定的节点
     *      PathChildrenCache     监控一个节点的子节点
     *      TreeCache             监控整个树上的所有节点
     */
    @Test
    public void watcher() throws InterruptedException {
        //新版本三种cache合并了(默认以指定节点为根的整个子树)
        CuratorCache cache = CuratorCache.build(client, "/");

        CuratorCacheListener listener = CuratorCacheListener.builder().forNodeCache(() ->{
            System.out.println("节点变化了!!!");
        }).build();

        cache.listenable().addListener(listener);

        cache.start();

        Thread.sleep(1000000000000l);
    }

    /**
     * zookeeper的分布式锁
     *   zookeeper分布式锁的原理:
     *      核心思想:当客户端获取锁的时候创建节点,使用完锁,删除节点,客户端获取锁的时候在lock节点下创建临时顺序节点,
     *      然后获取lock下的所有子节点,客户端获取到所有子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端
     *      获取到了锁,使用完锁后,将节点删除。如果发现自己创建的节点并非lock所有子节点中最小的,说明自己没有获取到锁,此时
     *      客户端找到比自己小的节点同时对其注册事件监听器,监听事件删除,如果发现比自己小的节点被删除,则watcher会获取到通知,
     *      此时再次判断自己创建的节点是否是最小节点,如果不是则重复以上步骤,获取比自己小的节点并注册监听事件。
     *
     *  Curator实现分布式锁的API
     *  InterProcessSemaphoreMutex:   分布式排他锁
     *  InterProcessMutex:           分布式可重入排它锁
     *  InterProcessReadWriteLock     分布式读写锁
     *  InterProcessMultLock          蒋多个锁作为单个实体管理的容器
     *  InterProcessSemaphoreV2       共享信号量
     *
     */
    public void lock() throws Exception {
        InterProcessMutex lock = new InterProcessMutex(client, "/lock");
        //加锁
        lock.acquire(3, TimeUnit.SECONDS);
        //释放锁
        lock.release();
    }

    /**
     * zookeeper中的角色:
     * leader:领导者,处理事务请求,集群内部各个服务器的调度者
     * follower:跟随者,处理客户端非事务的请求,转发事务请求给leader服务器,参数leader服务器的选举
     * Observer:观察者,处理客户端非事务请求,转发事务请求给leader服务器
     */
}


版权声明:本文为u013404812原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。