public class MyJedisSentinelPool extends Pool<Jedis> {
protected GenericObjectPoolConfig poolConfig;
protected int connectionTimeout;
protected int soTimeout;
protected String password;
protected int database;
protected String clientName;
protected Set<MyJedisSentinelPool.MasterListener> masterListeners;
protected java.util.logging.Logger log;
private volatile MyJedisSentinelFactory factory;
private volatile HostAndPort currentHostMaster;
/**
* 链接目标 master链接主服务,slave链接从服务
*/
private String connectTarget="master";
private Set<String> sentinels;
public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig) {
this(masterName, sentinels, poolConfig, 2000, (String)null, 0);
}
public MyJedisSentinelPool(String masterName, Set<String> sentinels) {
this(masterName, sentinels, new GenericObjectPoolConfig(), 2000, (String)null, 0);
}
public MyJedisSentinelPool(String masterName, Set<String> sentinels, String password) {
this(masterName, sentinels, new GenericObjectPoolConfig(), 2000, password);
}
public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int timeout, String password) {
this(masterName, sentinels, poolConfig, timeout, password, 0);
}
public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int timeout) {
this(masterName, sentinels, poolConfig, timeout, (String)null, 0);
}
public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, String password) {
this(masterName, sentinels, poolConfig, 2000, password);
}
public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int timeout, String password, int database) {
this(masterName, sentinels, poolConfig, timeout, timeout, password, database);
}
public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int timeout, String password, int database, String clientName) {
this(masterName, sentinels, poolConfig, timeout, timeout, password, database, clientName);
}
public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int timeout, int soTimeout, String password, int database) {
this(masterName, sentinels, poolConfig, timeout, soTimeout, password, database, (String)null);
}
/**
* 创建主服务链接池
* @param masterName
* @param sentinels
* @param poolConfig
* @param connectionTimeout
* @param soTimeout
* @param password
* @param database
* @param clientName
*/
public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, int database, String clientName) {
this.connectionTimeout = 2000;
this.soTimeout = 2000;
this.database = 0;
this.masterListeners = new HashSet();
this.log = java.util.logging.Logger.getLogger(this.getClass().getName());
this.poolConfig = poolConfig;
this.connectionTimeout = connectionTimeout;
this.soTimeout = soTimeout;
this.password = password;
this.database = database;
this.clientName = clientName;
HostAndPort master = this.initSentinels(sentinels, masterName);
this.initPool(master);
}
/**
* 创建主服务链接池
* @param masterName
* @param sentinels
* @param poolConfig
* @param connectionTimeout
* @param soTimeout
* @param password
* @param database
* @param clientName
*/
public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, int database, String clientName,String connectTarget) {
this.connectTarget=connectTarget;
this.connectionTimeout = 2000;
this.soTimeout = 2000;
this.database = 0;
this.masterListeners = new HashSet();
this.log = java.util.logging.Logger.getLogger(this.getClass().getName());
this.poolConfig = poolConfig;
this.connectionTimeout = connectionTimeout;
this.soTimeout = soTimeout;
this.password = password;
this.database = database;
this.clientName = clientName;
HostAndPort master = this.initSentinels(sentinels, masterName);
this.initPool(master);
}
@Override
public void destroy() {
Iterator var1 = this.masterListeners.iterator();
while(var1.hasNext()) {
MyJedisSentinelPool.MasterListener m = (MyJedisSentinelPool.MasterListener)var1.next();
m.shutdown();
}
super.destroy();
}
public HostAndPort getCurrentHostMaster() {
return this.currentHostMaster;
}
private void initPool(HostAndPort master) {
System.out.println(connectTarget+"====master="+master+"===currentHostMaster==="+this.currentHostMaster);
if (!master.equals(this.currentHostMaster)) {
System.out.println("==initPool==="+connectTarget+"====master="+master+"===currentHostMaster==="+this.currentHostMaster);
this.currentHostMaster = master;
if (this.factory == null) {
this.factory = new MyJedisSentinelFactory(master.getHost(), master.getPort(), this.connectionTimeout, this.soTimeout, this.password, this.database, this.clientName, false, (SSLSocketFactory)null, (SSLParameters)null, (HostnameVerifier)null);
this.initPool(this.poolConfig, this.factory);
} else {
this.factory.setHostAndPort(this.currentHostMaster);
this.internalPool.clear();
}
this.log.info("Created JedisPool to master at " + master);
}
}
private HostAndPort initSentinels(Set<String> sentinels, String masterName) {
this.sentinels=sentinels;
HostAndPort masterOrSlave = null;
boolean sentinelAvailable = false;
this.log.info("Trying to find master from available Sentinels...");
Iterator var5 = sentinels.iterator();
String sentinel;
HostAndPort hap;
over:while(var5.hasNext()) {
sentinel = (String)var5.next();
hap = HostAndPort.parseString(sentinel);
this.log.fine("Connecting to Sentinel " + hap);
Jedis jedis = null;
try {
jedis = new Jedis(hap.getHost(), hap.getPort());
//链接从服务
if (connectTarget.equals("slave")){
List<Map<String, String>> slaves = jedis.sentinelSlaves(masterName);
if (slaves.size()>0){
for (int i=0;i<slaves.size();i++){
Map<String, String> stringMap = slaves.get(i);
if (!stringMap.get("flags").contains("s_down") && "ok".equals(stringMap.get("master-link-status"))){
String slaveIp = stringMap.get("ip");
String slavePort = stringMap.get("port");
List<String> masterAddr = new ArrayList<>();
masterAddr.add(slaveIp);
masterAddr.add(slavePort);
masterOrSlave = this.toHostAndPort(masterAddr);
this.log.fine("Found Redis slave at " + masterOrSlave);
break over;
}
}
}
}
//链接主服务
List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
sentinelAvailable = true;
if (masterAddr != null && masterAddr.size() == 2) {
masterOrSlave = this.toHostAndPort(masterAddr);
this.log.fine("Found Redis master at " + masterOrSlave);
break;
}
this.log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap + ".");
} catch (JedisException var13) {
this.log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + var13 + ". Trying next one.");
} finally {
if (jedis != null) {
jedis.close();
}
}
}
if (masterOrSlave == null) {
if (sentinelAvailable) {
throw new JedisException("Can connect to sentinel, but " + masterName + " seems to be not monitored...");
} else {
throw new JedisConnectionException("All sentinels down, cannot determine where is " + masterName + " master is running...");
}
} else {
this.log.info("Redis master running at " + masterOrSlave + ", starting Sentinel listeners...");
var5 = sentinels.iterator();
while(var5.hasNext()) {
sentinel = (String)var5.next();
hap = HostAndPort.parseString(sentinel);
MyJedisSentinelPool.MasterListener masterListener = new MyJedisSentinelPool.MasterListener(masterName, hap.getHost(), hap.getPort());
masterListener.setDaemon(true);
this.masterListeners.add(masterListener);
masterListener.start();
}
return masterOrSlave;
}
}
private HostAndPort toHostAndPort(List<String> getMasterAddrByNameResult) {
String host = (String)getMasterAddrByNameResult.get(0);
int port = Integer.parseInt((String)getMasterAddrByNameResult.get(1));
return new HostAndPort(host, port);
}
@Override
public Jedis getResource() {
while(true) {
Jedis jedis = (Jedis)super.getResource();
jedis.setDataSource(this);
HostAndPort master = this.currentHostMaster;
HostAndPort connection = new HostAndPort(jedis.getClient().getHost(), jedis.getClient().getPort());
if (master.equals(connection)) {
return jedis;
}
this.returnBrokenResource(jedis);
}
}
/** @deprecated */
@Deprecated
public void returnBrokenResource(Jedis resource) {
if (resource != null) {
this.returnBrokenResourceObject(resource);
}
}
/** @deprecated */
@Deprecated
public void returnResource(Jedis resource) {
if (resource != null) {
resource.resetState();
this.returnResourceObject(resource);
}
}
protected class MasterListener extends Thread {
protected String masterName;
protected String host;
protected int port;
protected long subscribeRetryWaitTimeMillis;
protected volatile Jedis j;
protected AtomicBoolean running;
protected MasterListener() {
this.subscribeRetryWaitTimeMillis = 5000L;
this.running = new AtomicBoolean(false);
}
public MasterListener(String masterName, String host, int port) {
super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port));
this.subscribeRetryWaitTimeMillis = 5000L;
this.running = new AtomicBoolean(false);
this.masterName = masterName;
this.host = host;
this.port = port;
}
public MasterListener(String masterName, String host, int port, long subscribeRetryWaitTimeMillis) {
this(masterName, host, port);
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
}
@Override
public void run() {
this.running.set(true);
while(this.running.get()) {
this.j = new Jedis(this.host, this.port);
try {
if (!this.running.get()) {
break;
}
this.j.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
HostAndPort hostAndPort = null;
System.out.println(channel+"========"+message);
//主服务宕机处理,包含链接主服务防护栏和链接从夫处理
//处理链接新的主服务
if (channel.equals("+switch-master") && MyJedisSentinelPool.this.connectTarget.equals("master")){
MyJedisSentinelPool.this.log.fine("Sentinel " + MyJedisSentinelPool.MasterListener.this.host + ":" + MyJedisSentinelPool.MasterListener.this.port + " published: " + message + ".");
String[] switchMasterMsg = message.split(" ");
if (switchMasterMsg.length > 3) {
if (MyJedisSentinelPool.MasterListener.this.masterName.equals(switchMasterMsg[0])) {
hostAndPort = MyJedisSentinelPool.this.toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]));
} else {
MyJedisSentinelPool.this.log.fine("Ignoring message on +switch-master for master name " + switchMasterMsg[0] + ", our master name is " + MyJedisSentinelPool.MasterListener.this.masterName);
}
} else {
MyJedisSentinelPool.this.log.severe("Invalid message received on Sentinel " + MyJedisSentinelPool.MasterListener.this.host + ":" + MyJedisSentinelPool.MasterListener.this.port + " on channel +switch-master: " + message);
}
}
//处理链接新的从服务,原来链接的从服务有可能被选举为主服务了,所以要重新获取去一个从服务进行链接,如果没有合适的从服务就链接主服务
//+slave-reconf-done频道是从服务器已经成功完成对新主服务器的同步。使用此频道是确保响应的事件的从服务已经和主服务建立了链接
else if (channel.equals("+slave-reconf-done") && MyJedisSentinelPool.this.connectTarget.equals("slave")){
String[] slaveReconfDoneMsg = message.split(" ");
if (slaveReconfDoneMsg.length>2){
hostAndPort = MyJedisSentinelPool.this.toHostAndPort(Arrays.asList(slaveReconfDoneMsg[2], slaveReconfDoneMsg[3]));
if (hostAndPort ==null){
Jedis jedis = new Jedis(MasterListener.this.host,MasterListener.this.port);
List<String> masterlist = jedis.sentinelGetMasterAddrByName(MasterListener.this.masterName);
if (masterlist != null && masterlist.size() == 2) {
hostAndPort = MyJedisSentinelPool.this.toHostAndPort(masterlist);
}
}
}
}
//从服务宕机处理
//+sdown频道是给定的实例现在处于主观下线状态。这里使用的是+sdown而没有使用+odown频道是因为再从服务宕机后+odown频道并不能响应,虽然再哨兵服务中监听到了但是没有响应到订阅
//如果是主服务宕机+odown频道会响应到订阅中,具体原因为没有找到。或许redis的机制就是这样的吧。
else if (channel.equals("+sdown") && MyJedisSentinelPool.this.connectTarget.equals("slave")){
String[] sdownMsg = message.split(" ");
//从服务线下并且当前的连接池是链接的下线的从服务,那么就换一个从服务器,如果没有找到合适的从服务器就链接主服务器
if ("slave".equals(sdownMsg[0]) && sdownMsg.length>6){
HostAndPort sdownhostAndPort = MyJedisSentinelPool.this.toHostAndPort(Arrays.asList(sdownMsg[2], sdownMsg[3]));
if (MyJedisSentinelPool.this.currentHostMaster.equals(sdownhostAndPort)){
Jedis jedis = new Jedis(MasterListener.this.host,MasterListener.this.port);
List<Map<String, String>> sentinelSlaves = jedis.sentinelSlaves(MasterListener.this.masterName);
for (int i=0;i<sentinelSlaves.size();i++){
Map<String, String> slaveMap = sentinelSlaves.get(i);
String ip = slaveMap.get("ip");
String port = slaveMap.get("port");
String flags = slaveMap.get("flags");
String status = slaveMap.get("master-link-status");
if ("ok".equals(status)){
hostAndPort = MyJedisSentinelPool.this.toHostAndPort(Arrays.asList(ip, port));
break;
}
}
if (hostAndPort == null){
hostAndPort=MyJedisSentinelPool.this.toHostAndPort(Arrays.asList(sdownMsg[6], sdownMsg[7]));
}
}
}
}
if (hostAndPort !=null){
MyJedisSentinelPool.this.initPool(hostAndPort);
}else {
MyJedisSentinelPool.this.log.severe("Invalid message received on Sentinel " + MasterListener.this.host + ":" + MasterListener.this.port + " on channel "+channel+" : " + message+",initPool for hostAndPort is null");
}
}
}, new String[]{"+switch-master","+slave-reconf-done","+sdown","+odown"});
} catch (JedisConnectionException var8) {
if (this.running.get()) {
MyJedisSentinelPool.this.log.log(Level.SEVERE, "Lost connection to Sentinel at " + this.host + ":" + this.port + ". Sleeping 5000ms and retrying.", var8);
try {
Thread.sleep(this.subscribeRetryWaitTimeMillis);
} catch (InterruptedException var7) {
MyJedisSentinelPool.this.log.log(Level.SEVERE, "Sleep interrupted: ", var7);
}
} else {
MyJedisSentinelPool.this.log.fine("Unsubscribing from Sentinel at " + this.host + ":" + this.port);
}
} finally {
this.j.close();
}
}
}
public void shutdown() {
try {
MyJedisSentinelPool.this.log.fine("Shutting down listener on " + this.host + ":" + this.port);
this.running.set(false);
if (this.j != null) {
this.j.disconnect();
}
} catch (Exception var2) {
MyJedisSentinelPool.this.log.log(Level.SEVERE, "Caught exception while shutting down: ", var2);
}
}
}
}
|