概念: zookeeper是apache hadoop项目下的一个子项目,是一个树形目录服务,是一个开源的分布式的应用程序协调服务
主要功能:
一、配置管理
二、分布式锁
三、集群管理
数据模型: zookeeper是一个树形的目录结构,其数据模型和linux的文件系统目录树很类似,拥有一个层次结构,每个节点被称为znode,每个节点上都会保存自己的数据和节点信息,节点可以拥有自己的子节点,同时也允许少量的数据(1m)存储在该节点下。
节点可以分为四大类:
PERSISTENT 持久化节点
EPHEMERAL 临时节点 -e
PERSISTENT SEQUENTIAL 持久化顺序节点 -s
EPHEMERAL SEQUENTIAL 临时顺序节点 -es
zookeeper服务端常用命令:
./zkServer.sh start 启动
./zkServer.sh status 查看服务状态
./zkServer.sh stop 停止
./zkServer.sh restart 重启
zookeeper客户端命令:
连接到服务 ./zkCli.sh -server ip:port
退出 quit
查看根节点 ls / 【ls /dubbo/node】
创建节点 create /app1 数据
获取数据 get /app1
设置或者修改数据 set /app1 数据
删除节点 delete /app1 【有子节点不能删除】
删除有子节点的节点 deleteall /app1 【有子节点可以删除】
创建顺序节点 create -s /app1 数据
创建临时节点 create -e /app1 数据
创建临时顺序节点 create -es /app1 数据
JAVA客户端Curator
package com.wangxk;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* zookeeper的Java客户端Curator
*/
public class Demo {
public CuratorFramework client = null;
/**
* 创建连接
*/
@BeforeEach
public void getConnection(){
System.out.println("正在创建zookeeper连接");
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(30000, 10);
// 第一种创建方法
// CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
// 第二种创建方法
client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(60*1000)
.connectionTimeoutMs(15*1000)
.namespace("wangxk")
.retryPolicy(retryPolicy)
.build();
//开启连接
client.start();
System.out.println("创建连接" + client);
}
/**
* 释放资源
*/
@AfterEach
public void close(){
System.out.println("程序结束,释放资源");
client.close();
}
/**
* 创建节点
*/
@Test
public void createNode() throws Exception {
// 默认创建持久化节点
String path = client.create().forPath("/zmj","北京芝麻蕉".getBytes(StandardCharsets.UTF_8));
//指定创建节点的类型
// client.create().withMode(CreateMode.EPHEMERAL).forPath("/yyy", "北京云易扬".getBytes(StandardCharsets.UTF_8));
//若没有父级节点则创建父级节点
// client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/911/512");
System.out.println(path);
}
/**
* 查询数据
*/
@Test
public void query() throws Exception {
// 查询数据
byte[] bytes = client.getData().forPath("/zmj");
System.out.println(new String(bytes, StandardCharsets.UTF_8));
//查询子节点
List<String> nodes = client.getChildren().forPath("/");
for (String node : nodes) {
System.out.println(node);
}
//查询节点状态信息
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/zmj");
System.out.println(stat);
}
/**
* 修改节点
*/
@Test
public void update() throws Exception {
//修改数据
client.setData().forPath("/zmj", "小姐姐".getBytes(StandardCharsets.UTF_8));
System.out.println("修改数据成功!");
//根据版本修改,版本可以从获取节点信息中获取
int version = 3;
//client.setData().withVersion(version).forPath("/zmj", "小姐姐".getBytes(StandardCharsets.UTF_8));
}
/**
* 删除节点
*/
public void delete() throws Exception {
// 删除单个节点
client.delete().forPath("/zmj");
//删除带子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/zmj");
// 必须删除成功
client.delete().guaranteed().forPath("/zmj");
// 删除后回调
client.delete().guaranteed().inBackground((x, y) -> {
System.out.println("删除了");
System.out.println(y);
});
}
/**
* zookeeper允许用户在指定节点上注册一些watcher,并且在一些特定的事件触发的时候,zookeeper服务端会将事件通知到感兴趣的客户端上去,该机制是
* zookeeper实现分布式协调服务的重要特性。
* zookeeper提供三种watcher:
* NodeCache 监控一个特定的节点
* PathChildrenCache 监控一个节点的子节点
* TreeCache 监控整个树上的所有节点
*/
@Test
public void watcher() throws InterruptedException {
//新版本三种cache合并了(默认以指定节点为根的整个子树)
CuratorCache cache = CuratorCache.build(client, "/");
CuratorCacheListener listener = CuratorCacheListener.builder().forNodeCache(() ->{
System.out.println("节点变化了!!!");
}).build();
cache.listenable().addListener(listener);
cache.start();
Thread.sleep(1000000000000l);
}
/**
* zookeeper的分布式锁
* zookeeper分布式锁的原理:
* 核心思想:当客户端获取锁的时候创建节点,使用完锁,删除节点,客户端获取锁的时候在lock节点下创建临时顺序节点,
* 然后获取lock下的所有子节点,客户端获取到所有子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端
* 获取到了锁,使用完锁后,将节点删除。如果发现自己创建的节点并非lock所有子节点中最小的,说明自己没有获取到锁,此时
* 客户端找到比自己小的节点同时对其注册事件监听器,监听事件删除,如果发现比自己小的节点被删除,则watcher会获取到通知,
* 此时再次判断自己创建的节点是否是最小节点,如果不是则重复以上步骤,获取比自己小的节点并注册监听事件。
*
* Curator实现分布式锁的API
* InterProcessSemaphoreMutex: 分布式排他锁
* InterProcessMutex: 分布式可重入排它锁
* InterProcessReadWriteLock 分布式读写锁
* InterProcessMultLock 蒋多个锁作为单个实体管理的容器
* InterProcessSemaphoreV2 共享信号量
*
*/
public void lock() throws Exception {
InterProcessMutex lock = new InterProcessMutex(client, "/lock");
//加锁
lock.acquire(3, TimeUnit.SECONDS);
//释放锁
lock.release();
}
/**
* zookeeper中的角色:
* leader:领导者,处理事务请求,集群内部各个服务器的调度者
* follower:跟随者,处理客户端非事务的请求,转发事务请求给leader服务器,参数leader服务器的选举
* Observer:观察者,处理客户端非事务请求,转发事务请求给leader服务器
*/
}