Zookeeper
Zookeeper是一个分布式协调服务;就是为用户的分布式应用程序提供协调服务
Zookeeper 集群的角色分为 Leader (领导者)和 follower(随从者)(Observer(观察者)) ,集群中只要有半数以上的节点存活,集群就能正常运行,所以 Zookeeper 适合在奇数台机器 的集群
1. CAP原则
Consistency,Availability,Partition Tolerance;即一致性,可用性,分区容忍性(分片)集群
CAP理论认为,一个提供数据服务的存储系统无法同时满足数据一致性,数据可用性,数据容忍性
在分布式下优先考虑可用性和分区容忍性的组合,因为只要能够保证数据最终是一致或半数以上存储系统数据一致,我们就认为数据可用
2. 选举机制
三个核心选举原则:
(1)Zookeeper集群中只有超过半数以上的服务器启动,集群才能正常工作;
(2)在集群正常工作之前,myid小的服务器给myid大的服务器投票,直到集群正常工作,选出Leader;
(3)选出Leader之后,之前的服务器状态由Looking改变为Following,以后的服务器都是Follower。
假设zookeeper集群有有5台服务器并且是一一启动,id=1的服务器启动,它会先给自己投一票;id=2的服务器启动后,id比第一台服务器的大,所以id=1的票让给了id=2的服务器,所以id=2有两票;id=3的服务器启动后,它就有3张票并且已有半数服务器启动了,所以把id=3的服务器选举为leader,其他4台服务器则为follower
如果服务器是同时启动,那么id最大的为leader
只要将数据提交到集群中的任何一个节点,都会经过这个 节点传递给 leader , leader 会让所有的 follwer 去更新 这个数据,大部分数据传递时间是属于毫秒级别,所以延时 可以忽略不计
3. 数据结构
类似于标准文件系统,是一个树形结构
每个节点被称为Znode,路径是绝对的即斜杠开头并且唯一
它既是文件也是目录,既可以存储数据也可以作为路径
它是用来管理调度数据,比如分布式应用中的配置文件信息、状态信息等等元数据,至多1M
节点类型
- 临时节点(Ephemeral)
依赖会话,当客户端和服务端断开连接就会自动删除
不允许有子节点
- 永久节点
该节点的生命周期不依赖于会话,并且只有在客户端显示执行删除操作的时候,他们才能被删除。
- 细分可以分为4个节点
- PERSISTENT-持久化目录节点
客户端与ZooKeeper断开连接后,该节点依旧存在 - PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点
客户端与ZooKeeper断开连接后,该节点依旧存在,只是ZooKeeper给该节点名称进行顺序编号 - EPHEMERAL-临时目录节点
客户端与ZooKeeper断开连接后,该节点被删除 - EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点
客户端与ZooKeeper断开连接后,该节点被删除,只是ZooKeeper给该节点名称进行顺序编号
4. Zookeeper的搭建
以zookeeper-3.4.13.tar.gz为例
- 准备
准备三台Linux并安装好jdk jdk安装 - 下载
下载 - 上传
cd /usr/local
mkdir zookeeper
把压缩包上传到刚刚创建的zookeeper文件夹下 - 解压
tar -zxvf zookeeper-3.4.13.tar.gz
- 编辑
cd zookeeper-3.4.13/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
需要设置数据和日志存放的位置
并且指定linux,后面的两个端口号随便写.第一个端口号是通信端口,第二个端口号是投票端口
创建文件夹
mkdir -p /zookeeper/data
mkdir -p /zookeeper/logs
cd /zookeeper/data
创建文件
touch myid
vim myid
内容设置为1其他两台Linux安装与上面的步骤一致,只需要把myid文件内容设置为2,3
启动
(1)集群同时启动小技巧
使用xshell连接三台虚拟机,都切换到zookeeper得bin目录,点击xshell的工具–>发送键输入到所有会话,
在其中一台得命令窗口输入./zkServer.sh start
就会启动所有得zookeeper服务端
(2)也可以使用脚本实现集群启动,请自信查找
5. Java客户端
- 依赖
<!--zookeeper-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
</dependency>
<!--单元测试-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
常用的API类
(1)Zookeeper:主要是用来构建zkCli客户端对象
(2)Watch:监听当前结点的变化情况 重写process()方法–接收异步通知得信息(由zookeeper发起得通知)单元测试
package com.zy.zookeeper;
import org.apache.zookeeper.*;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
/**
* @author yxk
* @version 1.0
* Date 2021/4/8 10:28
* 测试zookeeper的API操作
*/
public class TestZookeeper {
private ZooKeeper zkCli = null;
/**
* 启动一个test方法
* 开启了主线程process()
* 开启了守护线程 和主线程相生相伴,主线程结束,它也结束
* 也可以手动创建守护线程 new Thread().setDaemon()--把该线程标记为守护线程
* @author yxk
* Date 2021/4/8 11:56
*/
@Before
public void before() {
/*
* 构建客户端对象
* @author yxk
* Date 2021/4/8 10:33
* 参数:
* String connectString -->ip:端口号,ip:端口号 ---可以连接多台
* int sessionTimeout-->会话超时时间 单位毫秒
* Watcher watcher-->监听器,监听节点上的变化情况
*/
try {
zkCli = new ZooKeeper("192.168.88.100:2181,192.168.88.101:2181,192.168.88.102:2181", 20000,
new Watcher() { //可以使用lanbdaa表达式简化,但是不利于观察
//方法内部接收zookeeper的通知,第一次调用时,不管客户端是否对节点开启监听都会调用一次process
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getState());
System.out.println(watchedEvent.getType());
//定义对节点持续性监听 -- / 根节点
try {
zkCli.getChildren("/", true);
zkCli.getChildren("/app1", true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* @author yxk
* Date 2021/4/8 10:28
* zookeeper快速入门
*/
@Test
public void test01() {
try {
//获取根节点的子节点信息,true表示监听,现在设置的监听只是一次性监听
List<String> zNode = zkCli.getChildren("/", true);
for (String s : zNode) {
System.out.println(s);
}
//休眠,不让程序结束,然后在Linux启动客户端,创建节点,然后在这里就可以接收到通知--NodeChildrenChanged
Thread.sleep(Long.MAX_VALUE);
} catch ( InterruptedException |KeeperException e) {
e.printStackTrace();
}
}
/**
* 新增节点
* @author yxk
* Date 2021/4/8 11:13
* create方法的参数
* String path 结点的路径
* byte[] data 节点的数据
* List<ACL> acl 权限控制
* CreateMode createMode 节点类型
*/
@Test
public void test02() {
try {
String zNode = zkCli.create("/app1/app1_p1", "666".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(zNode);
Thread.sleep(Long.MAX_VALUE);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
/**
* 判断节点是否存在
* @author yxk
* Date 2021/4/8 11:29
*/
@Test
public void test03() {
try {
Stat stat = zkCli.exists("/app1/app1_p1", false);
//判断节点是否为null
if (stat == null) {
System.out.println("节点不存在");
}else {
System.out.println("节点存在");
System.out.println(stat.getCversion());
System.out.println(stat.getCtime());
System.out.println(stat.getCzxid());
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
/**
* 获取节点数据
* @author yxk
* Date 2021/4/8 11:39
*/
@Test
public void test04() {
try {
//如果集群数据很大,数据可能不一致,第三个参数表示获取数据版本,null表示最小版本
byte[] data = zkCli.getData("/app1", true, zkCli.exists("/app1", false));
System.out.println(new String(data));
Thread.sleep(Long.MAX_VALUE);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
/**
* 删除节点
* @author yxk
* Date 2021/4/8 11:46
*/
@Test
public void test05() {
try {
//-1表示删除所有版本
zkCli.delete("/app1/app1_p1",-1);
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
/**
* 修改节点数据
* @author yxk
* Date 2021/4/8 11:56
*/
@Test
public void test06() {
try {
Stat stat = zkCli.setData("/app1/app1_p2", "9999".getBytes(), -1);
System.out.println(new String(zkCli.getData("/app1/app1_p2", false, stat)));
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
}
启动一个test方法
开启了一个主线程process()
开启了守护线程: 和主线程相生相伴,主线程结束,它也结束
new Thread().setDaemon()–>把该线程标记为守护线程