Zookeeper

Zookeeper

Zookeeper是一个分布式协调服务;就是为用户的分布式应用程序提供协调服务

Zookeeper 集群的角色分为 Leader (领导者)和 follower(随从者)(Observer(观察者)) ,集群中只要有半数以上的节点存活,集群就能正常运行,所以 Zookeeper 适合在奇数台机器 的集群

1. CAP原则

Consistency,Availability,Partition Tolerance;即一致性,可用性,分区容忍性(分片)集群

CAP理论认为,一个提供数据服务的存储系统无法同时满足数据一致性,数据可用性,数据容忍性
在分布式下优先考虑可用性和分区容忍性的组合,因为只要能够保证数据最终是一致或半数以上存储系统数据一致,我们就认为数据可用

2. 选举机制

三个核心选举原则:
(1)Zookeeper集群中只有超过半数以上的服务器启动,集群才能正常工作;
(2)在集群正常工作之前,myid小的服务器给myid大的服务器投票,直到集群正常工作,选出Leader;
(3)选出Leader之后,之前的服务器状态由Looking改变为Following,以后的服务器都是Follower。

假设zookeeper集群有有5台服务器并且是一一启动,id=1的服务器启动,它会先给自己投一票;id=2的服务器启动后,id比第一台服务器的大,所以id=1的票让给了id=2的服务器,所以id=2有两票;id=3的服务器启动后,它就有3张票并且已有半数服务器启动了,所以把id=3的服务器选举为leader,其他4台服务器则为follower

如果服务器是同时启动,那么id最大的为leader

在这里插入图片描述

只要将数据提交到集群中的任何一个节点,都会经过这个 节点传递给 leader , leader 会让所有的 follwer 去更新 这个数据,大部分数据传递时间是属于毫秒级别,所以延时 可以忽略不计

3. 数据结构

类似于标准文件系统,是一个树形结构
每个节点被称为Znode,路径是绝对的即斜杠开头并且唯一
它既是文件也是目录,既可以存储数据也可以作为路径
它是用来管理调度数据,比如分布式应用中的配置文件信息、状态信息等等元数据,至多1M

在这里插入图片描述

节点类型

  • 临时节点(Ephemeral)

依赖会话,当客户端和服务端断开连接就会自动删除
不允许有子节点

  • 永久节点

该节点的生命周期不依赖于会话,并且只有在客户端显示执行删除操作的时候,他们才能被删除。

- 细分可以分为4个节点

  • PERSISTENT-持久化目录节点
    客户端与ZooKeeper断开连接后,该节点依旧存在
  • PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点
    客户端与ZooKeeper断开连接后,该节点依旧存在,只是ZooKeeper给该节点名称进行顺序编号
  • EPHEMERAL-临时目录节点
    客户端与ZooKeeper断开连接后,该节点被删除
  • EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点
    客户端与ZooKeeper断开连接后,该节点被删除,只是ZooKeeper给该节点名称进行顺序编号

4. Zookeeper的搭建

以zookeeper-3.4.13.tar.gz为例

  • 准备
    准备三台Linux并安装好jdk jdk安装
  • 下载
    下载
  • 上传
    cd /usr/local
    mkdir zookeeper
    把压缩包上传到刚刚创建的zookeeper文件夹下
  • 解压
    tar -zxvf zookeeper-3.4.13.tar.gz
  • 编辑
    cd zookeeper-3.4.13/conf
    cp zoo_sample.cfg zoo.cfg
    vim zoo.cfg
    在这里插入图片描述

需要设置数据和日志存放的位置
并且指定linux,后面的两个端口号随便写.第一个端口号是通信端口,第二个端口号是投票端口

  • 创建文件夹
    mkdir -p /zookeeper/data
    mkdir -p /zookeeper/logs
    cd /zookeeper/data

  • 创建文件
    touch myid
    vim myid 内容设置为1

  • 其他两台Linux安装与上面的步骤一致,只需要把myid文件内容设置为2,3

  • 启动
    (1)集群同时启动小技巧
    使用xshell连接三台虚拟机,都切换到zookeeper得bin目录,点击xshell的工具–>发送键输入到所有会话,
    在其中一台得命令窗口输入./zkServer.sh start就会启动所有得zookeeper服务端
    (2)也可以使用脚本实现集群启动,请自信查找

5. Java客户端

  • 依赖
    <!--zookeeper-->
    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.4.13</version>
    </dependency>
    <!--单元测试-->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.13</version>
      <scope>test</scope>
    </dependency>
  • 常用的API类
    (1)Zookeeper:主要是用来构建zkCli客户端对象
    (2)Watch:监听当前结点的变化情况 重写process()方法–接收异步通知得信息(由zookeeper发起得通知)

  • 单元测试

package com.zy.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

/**
 * @author yxk
 * @version 1.0
 * Date 2021/4/8 10:28
 * 测试zookeeper的API操作
 */

public class TestZookeeper {

    private ZooKeeper zkCli = null;

    /**
     * 启动一个test方法
     *    开启了主线程process()
     *    开启了守护线程  和主线程相生相伴,主线程结束,它也结束
     *    也可以手动创建守护线程 new Thread().setDaemon()--把该线程标记为守护线程
     * @author yxk
     * Date 2021/4/8 11:56
     */

    @Before
    public void before() {
        /*
         * 构建客户端对象
         * @author yxk
         * Date 2021/4/8 10:33
         * 参数:
         *     String connectString -->ip:端口号,ip:端口号  ---可以连接多台
         *     int sessionTimeout-->会话超时时间 单位毫秒
         *     Watcher watcher-->监听器,监听节点上的变化情况
         */
        try {
            zkCli = new ZooKeeper("192.168.88.100:2181,192.168.88.101:2181,192.168.88.102:2181", 20000, 
            new Watcher() { //可以使用lanbdaa表达式简化,但是不利于观察
                //方法内部接收zookeeper的通知,第一次调用时,不管客户端是否对节点开启监听都会调用一次process
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println(watchedEvent.getPath());
                    System.out.println(watchedEvent.getState());
                    System.out.println(watchedEvent.getType());
                    //定义对节点持续性监听 -- /  根节点
                    try {
                        zkCli.getChildren("/", true);
                        zkCli.getChildren("/app1", true);
                    } catch (KeeperException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });

        } catch (IOException  e) {
            e.printStackTrace();
        }

    }


    /**
     * @author yxk
     * Date 2021/4/8 10:28
     * zookeeper快速入门
     */
    @Test
    public void test01() {

        try {
             //获取根节点的子节点信息,true表示监听,现在设置的监听只是一次性监听
            List<String> zNode = zkCli.getChildren("/", true);
            for (String s : zNode) {
                System.out.println(s);
            }
            //休眠,不让程序结束,然后在Linux启动客户端,创建节点,然后在这里就可以接收到通知--NodeChildrenChanged
            Thread.sleep(Long.MAX_VALUE);
        } catch ( InterruptedException |KeeperException e) {
            e.printStackTrace();
        }
    }

    /**
     * 新增节点
     * @author yxk
     * Date 2021/4/8 11:13
     * create方法的参数
     *    String path 结点的路径
     *    byte[] data 节点的数据
     *    List<ACL> acl 权限控制
     *    CreateMode createMode 节点类型
     */
    @Test
    public void test02() {
        try {
            String zNode = zkCli.create("/app1/app1_p1", "666".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println(zNode);
            Thread.sleep(Long.MAX_VALUE);
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 判断节点是否存在
     * @author yxk
     * Date 2021/4/8 11:29
     */
    @Test
    public void test03() {
        try {
            Stat stat = zkCli.exists("/app1/app1_p1", false);
            //判断节点是否为null
            if (stat == null) {
                System.out.println("节点不存在");
            }else {
                System.out.println("节点存在");
                System.out.println(stat.getCversion());
                System.out.println(stat.getCtime());
                System.out.println(stat.getCzxid());
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取节点数据
     * @author yxk
     * Date 2021/4/8 11:39
     */
    @Test
    public void test04() {
        try {
            //如果集群数据很大,数据可能不一致,第三个参数表示获取数据版本,null表示最小版本
            byte[] data = zkCli.getData("/app1", true, zkCli.exists("/app1", false));
            System.out.println(new String(data));
            Thread.sleep(Long.MAX_VALUE);
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 删除节点
     * @author yxk
     * Date 2021/4/8 11:46
     */
    @Test
    public void test05() {
        try {
            //-1表示删除所有版本
            zkCli.delete("/app1/app1_p1",-1);
            Thread.sleep(Long.MAX_VALUE);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }

    /**
     * 修改节点数据
     * @author yxk
     * Date 2021/4/8 11:56
     */
    @Test
    public void test06() {
        try {
            Stat stat = zkCli.setData("/app1/app1_p2", "9999".getBytes(), -1);
            System.out.println(new String(zkCli.getData("/app1/app1_p2", false, stat)));
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

启动一个test方法
开启了一个主线程process()
开启了守护线程: 和主线程相生相伴,主线程结束,它也结束
new Thread().setDaemon()–>把该线程标记为守护线程

在这里插入图片描述


版权声明:本文为qq_44991374原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。