需求:某分布式系统中,主节点有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线
实现思路:
- 服务器上线后,就在zookeeper集群上注册(要在zookeeper集群上创建一个临时node,以便服务器下线时就可以在zookeeper集群上删掉该临时node)。
- 客户端上线后,使用getChildren() 获取当前在线服务器列表,并注册监听。
- 当服务器下线后,zookeeper集群上该临时节点就会删掉,服务器下线事件通知客户端。
- 客户端在process()中,再重新获取在线服务器列表,并重新注册监听。
服务器端注册代码:
package com.allen.zookeeper01;
import org.apache.zookeeper.*;
import java.io.IOException;
public class DistributeServer {
private String connectString = "hadoop02:2181,hadoop03:2181,hadoop04:2181";
private int sessionTimeout = 2000;
private ZooKeeper zk;
public static void main(String[] args) {
DistributeServer server = new DistributeServer();
// 连接zookeeper集群连接
try {
server.getConnect();
} catch (IOException e) {
e.printStackTrace();
}
// 注册节点
try {
server.register(args[0]);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 业务逻辑处理
try {
server.business();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void register(String hostname) throws KeeperException, InterruptedException {
zk.create("/servers/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname + " is online. ");
}
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
}
});
}
}
客户端代码:
package com.allen.zookeeper01;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DistributedClient {
private String connectString = "hadoop02:2181,hadoop03:2181,hadoop04:2181";
private int sessionTimeout = 2000;
private ZooKeeper zk;
public static void main(String[] args) {
DistributedClient client = new DistributedClient();
// 获取zookeeper集群连接
try {
client.getConnect();
} catch (IOException e) {
e.printStackTrace();
}
// 注册监听
try {
client.getChildren();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 业务逻辑处理
try {
client.business();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void getChildren() throws KeeperException, InterruptedException {
List<String> children = zk.getChildren("/servers", true);
ArrayList<String> hosts = new ArrayList<String>();
for (String child : children) {
byte[] data = zk.getData("/servers/" + child, false, null);
hosts.add(new String(data));
}
// 将所有在线的服务器打印
System.out.println(hosts);
}
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
try {
getChildren();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
效果如图:
- 无服务器上线
- hadoop02上线
- hadoop03上线
- hadoop03下线
版权声明:本文为ALLENYYE原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。