LeaderLatch
一旦节点成为主节点,那么只有调用close方法,其它节点才会继续争夺
List<LeaderLatch> latches = new ArrayList<>();
List<CuratorFramework> clients = new ArrayList<>();
for (int i = 0; i < 10; i++) {
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
clients.add(client);
LeaderLatch leaderLatch = new LeaderLatch(client, "/master", "node-" + i, LeaderLatch.CloseMode.NOTIFY_LEADER);
leaderLatch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
System.out.println(leaderLatch.getId() + " is a leader ");
}
@Override
public void notLeader() {
System.out.println(leaderLatch.getId() + " not a leader ");
}
});
latches.add(leaderLatch);
}
for (LeaderLatch latch : latches) {
new Thread(new Runnable() {
@Override
public void run() {
try {
latch.start();
latch.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(latch.getId()+" elect end");
}
}).start();
}
Thread.sleep(10000);
for (LeaderLatch latch : latches) {
System.out.println(latch.getId() + " leader:" + latch.getLeader() + " isLeader:" + latch.hasLeadership());
if(latch.hasLeadership()){
latch.close();
}
}
for (CuratorFramework client : clients) {
System.out.println(client.getData() + " is close");
client.close();
}
Thread.sleep(100000);
在上面的例子中创建了10个clien,当某一个clien成为主节点后,就会触发isLeader方法,调用close方法后会触发notLeader,然后再在其它节点中选举一个新的leader,并且只有主节点才能继续执行latch.await()后面的逻辑。
LeaderSelector
当实例被选为leader之后,调用takeLeadership方法进行业务逻辑处理,处理完成即释放领导权。
autoRequeue()方法的调用确保此实例在释放领导权后还可能获得领导权。
这样保证了每个节点都可以获得领导权。
List<LeaderSelector> leaderSelectors = new ArrayList<>();
List<CuratorFramework> clients = new ArrayList<>();
for (int i = 0; i < 10; i++) {
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
clients.add(client);
LeaderSelector leaderSelector = new LeaderSelector(client, "/master", new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
System.out.println(Thread.currentThread().getName() + " is a leader");
Thread.sleep(Integer.MAX_VALUE);
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
super.stateChanged(client, newState);
}
});
leaderSelectors.add(leaderSelector);
}
leaderSelectors.forEach(leaderSelector -> {
leaderSelector.autoRequeue();
leaderSelector.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("==================");
clients.forEach(client -> {
client.close();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread.sleep(100 * 1000);
在上面的例子中同样新建10个客户端然后进行选举,当调用close方法后其它节点会重新选举新的主节点。
当我们把takeLeadership方法中的Thread.sleep(Integer.MAX_VALUE);改为Thread.sleep(3*1000);当takeLeadership方法执行结束,就自动放弃领导权,其它节点重新选举。
当注释掉leaderSelector.autoRequeue()后,释放了领导权的节点不能再次获取领导权。
区别
- leaderlatch需要调用close方法才能释放主导权,并且不能重新获得。leaderselector当执行完takeleadership方法后自动释放主导权,并且可以设置autorequeue重新再获取领导权
- 实现方式不同leaderselector使用分布式锁InterProcessMutex实现
版权声明:本文为qq447995687原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。