Spring Data Redis基于jedis实现读写分离

spring-data-redis :是对jedis或Lettuce这些客户端的封装,提供一套与客户端无关的api供应用使用。

jedis:是redis的java客户端,通过它可以对redis进行操作,相似的还有Lettuce。

本文中主要是通过jedis来实现读写分离。分为两步,1、window环境搭建redis主从服务。2、实现读写分离

一、搭建redis主从服务

1、下载window版的redis,可以去https://github.com/microsoftarchive/redis/releases下载

2、采用的是一主二从三哨兵的结构进行搭建

先搭建主从结构

解压好并复制三份放到一个文件中

首先配置Redis的主从服务器,修改redis.conf文件如下

#链接IP
bind 192.168.3.115
#端口号
port 6379
#密码 注释后可以为空
requirepass "123456"
#密码 注释后可以为空
masterauth "123456"

#保护模式 建立关闭方便测试

protected-mode no

配置从服务1,修改redis.conf文件如下

#链接IP
bind 192.168.3.115

#端口号
port 6380
slaveof 192.168.3.115 6379

#连接主服务器(指定IP,和端口)

#密码 注释后可以为空
requirepass "123456"
#密码 注释后可以为空
masterauth "123456"

protected-mode no

配置从服务2,修改redis.conf文件如下

#链接IP
bind 192.168.3.115

#端口号
port 6381

slaveof 192.168.3.115 6379
#连接主服务器(指定IP,和端口)

#密码 注释后可以为空
requirepass "123456"
#密码 注释后可以为空
masterauth "123456"
protected-mode no

注意:从服务比主服务多了一个slaveof 192.168.3.115 6379

slaveof :命令用于在 Redis 运行时动态地修改复制(replication)功能的行为,链接主服务

可以在三个redis安装目录中分别建一个.bat启动文件

写入命令:

title redis-6379/6380/6381
redis-server.exe redis.conf

3、进行测试

依次启动建立的bat启动文件,必须要启动主服务、

(也可以使用cmd启动,分别cd到redis的安装目录中输入redis-server.exe redis.conf命令)

 

 

 

4、添加哨兵

新建一个redis-sentinel.conf

#当前Sentinel服务运行的端口
port 26379
protected-mode no
#master
#Sentinel去监视一个名为mymaster的主redis实例,这个主实例的IP地址为本机地址192.168.3.115,端口号为6379,
#而将这个主实例判断为失效至少需要2个 Sentinel进程的同意,只要同意Sentinel的数量不达标,自动failover就不会执行
sentinel monitor mymaster 192.168.3.115 6379 2
#如果没有密码就可以不要这一行
sentinel auth-pass mymaster 123456

三个哨兵的redis-sentinel.conf文件除了port不同外,其余的都相同。在启动哨兵时reids服务会自动补齐其余的配置。

复制两份记得修改port,分别粘贴到主服务目录和两个从服务目录中。

5、启动哨兵

可以在三个redis安装目录中分别建一个.bat启动文件可以用startRedis-sentinel-(端口号),方便启动。

写入命令:

title    redis-sentinel-26379/26380/26381
redis-server.exe redis-sentinel.conf --sentinel

注意:启动前先将redis.conf和redis-sentinel.conf文件进行备份,因为启动哨兵,redis-sentinel.conf文件会被修改,如果redis宕机redis.conf会被修改

依次启动建立的bat启动文件

或使用cmd启动,cd到redis安装目录中输入redis-server.exe redis-sentinel.conf --sentinel

 

6、测试哨兵

把主服务器关闭后剩下的两个会有一个被推选出来作为主服务器。如果把把关闭的主服务再次启动,它会作为一个从服务链接刚被推选出的主服务。

如果没有出现以上所述,那么配置是有问题的,哨兵机制没有起到作用,请耐心的查看自己的配置文件。

 

二、jedis实现读写分离

可以通过RedisSentinelConfiguration成一个基于Sentinel的连接池,而不用通过主服务地址和从服务地址进行链接。这样的好处就是我们可以通过哨兵来获取当前的主服务的链接地址和从服务的链接地址。

然而当我们使用这种方式的时候,得到的链接池都是链接到主服务的,并没有我们想要的从服务链接的地址

在redis.clients.jedis.JedisSentinelPool源码中可以看到

在源码中你得到的链接永远都是主服务的链接地址。和我们想象中并不一样,怎么办呢,jedis并没有提供相应的api,所以只能进行改造了。

1、首先要创建重新三个类

第一个

第二个

第三个

 

2、具体代码

MyJedisSentinelPool
public class MyJedisSentinelPool extends Pool<Jedis> {
    protected GenericObjectPoolConfig poolConfig;
    protected int connectionTimeout;
    protected int soTimeout;
    protected String password;
    protected int database;
    protected String clientName;
    protected Set<MyJedisSentinelPool.MasterListener> masterListeners;
    protected java.util.logging.Logger log;
    private volatile MyJedisSentinelFactory factory;
    private volatile HostAndPort currentHostMaster;

    /**
     * 链接目标 master链接主服务,slave链接从服务
     */
    private String connectTarget="master";

    private Set<String> sentinels;

    public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig) {
        this(masterName, sentinels, poolConfig, 2000, (String)null, 0);
    }

    public MyJedisSentinelPool(String masterName, Set<String> sentinels) {
        this(masterName, sentinels, new GenericObjectPoolConfig(), 2000, (String)null, 0);
    }

    public MyJedisSentinelPool(String masterName, Set<String> sentinels, String password) {
        this(masterName, sentinels, new GenericObjectPoolConfig(), 2000, password);
    }

    public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int timeout, String password) {
        this(masterName, sentinels, poolConfig, timeout, password, 0);
    }

    public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int timeout) {
        this(masterName, sentinels, poolConfig, timeout, (String)null, 0);
    }

    public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, String password) {
        this(masterName, sentinels, poolConfig, 2000, password);
    }

    public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int timeout, String password, int database) {
        this(masterName, sentinels, poolConfig, timeout, timeout, password, database);
    }

    public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int timeout, String password, int database, String clientName) {
        this(masterName, sentinels, poolConfig, timeout, timeout, password, database, clientName);
    }

    public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int timeout, int soTimeout, String password, int database) {
        this(masterName, sentinels, poolConfig, timeout, soTimeout, password, database, (String)null);
    }

    /**
     * 创建主服务链接池
     * @param masterName
     * @param sentinels
     * @param poolConfig
     * @param connectionTimeout
     * @param soTimeout
     * @param password
     * @param database
     * @param clientName
     */
    public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, int database, String clientName) {
        this.connectionTimeout = 2000;
        this.soTimeout = 2000;
        this.database = 0;
        this.masterListeners = new HashSet();
        this.log = java.util.logging.Logger.getLogger(this.getClass().getName());
        this.poolConfig = poolConfig;
        this.connectionTimeout = connectionTimeout;
        this.soTimeout = soTimeout;
        this.password = password;
        this.database = database;
        this.clientName = clientName;
        HostAndPort master = this.initSentinels(sentinels, masterName);
        this.initPool(master);
    }

    /**
     * 创建主服务链接池
     * @param masterName
     * @param sentinels
     * @param poolConfig
     * @param connectionTimeout
     * @param soTimeout
     * @param password
     * @param database
     * @param clientName
     */
    public MyJedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, int database, String clientName,String connectTarget) {
        this.connectTarget=connectTarget;
        this.connectionTimeout = 2000;
        this.soTimeout = 2000;
        this.database = 0;
        this.masterListeners = new HashSet();
        this.log = java.util.logging.Logger.getLogger(this.getClass().getName());
        this.poolConfig = poolConfig;
        this.connectionTimeout = connectionTimeout;
        this.soTimeout = soTimeout;
        this.password = password;
        this.database = database;
        this.clientName = clientName;
        HostAndPort master = this.initSentinels(sentinels, masterName);
        this.initPool(master);
    }



    @Override
    public void destroy() {
        Iterator var1 = this.masterListeners.iterator();
        while(var1.hasNext()) {
            MyJedisSentinelPool.MasterListener m = (MyJedisSentinelPool.MasterListener)var1.next();
            m.shutdown();
        }
        super.destroy();
    }

    public HostAndPort getCurrentHostMaster() {
        return this.currentHostMaster;
    }

    private void initPool(HostAndPort master) {
        System.out.println(connectTarget+"====master="+master+"===currentHostMaster==="+this.currentHostMaster);
        if (!master.equals(this.currentHostMaster)) {
            System.out.println("==initPool==="+connectTarget+"====master="+master+"===currentHostMaster==="+this.currentHostMaster);
            this.currentHostMaster = master;
            if (this.factory == null) {
                this.factory = new MyJedisSentinelFactory(master.getHost(), master.getPort(), this.connectionTimeout, this.soTimeout, this.password, this.database, this.clientName, false, (SSLSocketFactory)null, (SSLParameters)null, (HostnameVerifier)null);
                this.initPool(this.poolConfig, this.factory);
            } else {
                this.factory.setHostAndPort(this.currentHostMaster);
                this.internalPool.clear();
            }

            this.log.info("Created JedisPool to master at " + master);
        }

    }

    private HostAndPort initSentinels(Set<String> sentinels, String masterName) {
        this.sentinels=sentinels;
        HostAndPort masterOrSlave = null;
        boolean sentinelAvailable = false;
        this.log.info("Trying to find master from available Sentinels...");
        Iterator var5 = sentinels.iterator();

        String sentinel;
        HostAndPort hap;
        over:while(var5.hasNext()) {
            sentinel = (String)var5.next();
            hap = HostAndPort.parseString(sentinel);
            this.log.fine("Connecting to Sentinel " + hap);
            Jedis jedis = null;

            try {
                jedis = new Jedis(hap.getHost(), hap.getPort());
                //链接从服务
                if (connectTarget.equals("slave")){
                    List<Map<String, String>> slaves = jedis.sentinelSlaves(masterName);
              
                    if (slaves.size()>0){
                        for (int i=0;i<slaves.size();i++){
                            Map<String, String> stringMap = slaves.get(i);
                            if (!stringMap.get("flags").contains("s_down") && "ok".equals(stringMap.get("master-link-status"))){
                                String slaveIp = stringMap.get("ip");
                                String slavePort = stringMap.get("port");
                                List<String> masterAddr = new ArrayList<>();
                                masterAddr.add(slaveIp);
                                masterAddr.add(slavePort);
                                masterOrSlave = this.toHostAndPort(masterAddr);
                                this.log.fine("Found Redis slave at " + masterOrSlave);
                                break over;
                            }
                        }

                    }
                }
                //链接主服务
                List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
                sentinelAvailable = true;
                if (masterAddr != null && masterAddr.size() == 2) {
                    masterOrSlave = this.toHostAndPort(masterAddr);
                    this.log.fine("Found Redis master at " + masterOrSlave);
                    break;
                }
                this.log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap + ".");
            } catch (JedisException var13) {
                this.log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + var13 + ". Trying next one.");
            } finally {
                if (jedis != null) {
                    jedis.close();
                }

            }
        }

        if (masterOrSlave == null) {
            if (sentinelAvailable) {
                throw new JedisException("Can connect to sentinel, but " + masterName + " seems to be not monitored...");
            } else {
                throw new JedisConnectionException("All sentinels down, cannot determine where is " + masterName + " master is running...");
            }
        } else {
            this.log.info("Redis master running at " + masterOrSlave + ", starting Sentinel listeners...");
            var5 = sentinels.iterator();
            while(var5.hasNext()) {
                sentinel = (String)var5.next();
                hap = HostAndPort.parseString(sentinel);
                MyJedisSentinelPool.MasterListener masterListener = new MyJedisSentinelPool.MasterListener(masterName, hap.getHost(), hap.getPort());
                masterListener.setDaemon(true);
                this.masterListeners.add(masterListener);
                masterListener.start();
            }
            return masterOrSlave;
        }
    }

    private HostAndPort toHostAndPort(List<String> getMasterAddrByNameResult) {
        String host = (String)getMasterAddrByNameResult.get(0);
        int port = Integer.parseInt((String)getMasterAddrByNameResult.get(1));
        return new HostAndPort(host, port);
    }

    @Override
    public Jedis getResource() {
        while(true) {
            Jedis jedis = (Jedis)super.getResource();
            jedis.setDataSource(this);
            HostAndPort master = this.currentHostMaster;
            HostAndPort connection = new HostAndPort(jedis.getClient().getHost(), jedis.getClient().getPort());
            if (master.equals(connection)) {
                return jedis;
            }

            this.returnBrokenResource(jedis);
        }
    }

    /** @deprecated */
    @Deprecated
    public void returnBrokenResource(Jedis resource) {
        if (resource != null) {
            this.returnBrokenResourceObject(resource);
        }

    }

    /** @deprecated */
    @Deprecated
    public void returnResource(Jedis resource) {
        if (resource != null) {
            resource.resetState();
            this.returnResourceObject(resource);
        }

    }

    protected class MasterListener extends Thread {
        protected String masterName;
        protected String host;
        protected int port;
        protected long subscribeRetryWaitTimeMillis;
        protected volatile Jedis j;
        protected AtomicBoolean running;

        protected MasterListener() {
            this.subscribeRetryWaitTimeMillis = 5000L;
            this.running = new AtomicBoolean(false);
        }

        public MasterListener(String masterName, String host, int port) {
            super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port));
            this.subscribeRetryWaitTimeMillis = 5000L;
            this.running = new AtomicBoolean(false);
            this.masterName = masterName;
            this.host = host;
            this.port = port;
        }

        public MasterListener(String masterName, String host, int port, long subscribeRetryWaitTimeMillis) {
            this(masterName, host, port);
            this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
        }

        @Override
        public void run() {
            this.running.set(true);

            while(this.running.get()) {
                this.j = new Jedis(this.host, this.port);

                try {
                    if (!this.running.get()) {
                        break;
                    }
                    this.j.subscribe(new JedisPubSub() {
                        @Override
                        public void onMessage(String channel, String message) {
                            HostAndPort hostAndPort = null;
                            System.out.println(channel+"========"+message);
                            //主服务宕机处理,包含链接主服务防护栏和链接从夫处理
                            //处理链接新的主服务
                            if (channel.equals("+switch-master") &&  MyJedisSentinelPool.this.connectTarget.equals("master")){
                                MyJedisSentinelPool.this.log.fine("Sentinel " + MyJedisSentinelPool.MasterListener.this.host + ":" + MyJedisSentinelPool.MasterListener.this.port + " published: " + message + ".");
                                String[] switchMasterMsg = message.split(" ");
                                if (switchMasterMsg.length > 3) {
                                    if (MyJedisSentinelPool.MasterListener.this.masterName.equals(switchMasterMsg[0])) {
                                        hostAndPort = MyJedisSentinelPool.this.toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]));
                                    } else {
                                        MyJedisSentinelPool.this.log.fine("Ignoring message on +switch-master for master name " + switchMasterMsg[0] + ", our master name is " + MyJedisSentinelPool.MasterListener.this.masterName);
                                    }
                                } else {
                                    MyJedisSentinelPool.this.log.severe("Invalid message received on Sentinel " + MyJedisSentinelPool.MasterListener.this.host + ":" + MyJedisSentinelPool.MasterListener.this.port + " on channel +switch-master: " + message);
                                }
                            }
                            //处理链接新的从服务,原来链接的从服务有可能被选举为主服务了,所以要重新获取去一个从服务进行链接,如果没有合适的从服务就链接主服务
                            //+slave-reconf-done频道是从服务器已经成功完成对新主服务器的同步。使用此频道是确保响应的事件的从服务已经和主服务建立了链接
                            else if (channel.equals("+slave-reconf-done") && MyJedisSentinelPool.this.connectTarget.equals("slave")){
                                String[] slaveReconfDoneMsg = message.split(" ");
                                if (slaveReconfDoneMsg.length>2){
                                    hostAndPort = MyJedisSentinelPool.this.toHostAndPort(Arrays.asList(slaveReconfDoneMsg[2], slaveReconfDoneMsg[3]));
                                    if (hostAndPort ==null){
                                        Jedis jedis = new Jedis(MasterListener.this.host,MasterListener.this.port);
                                        List<String> masterlist = jedis.sentinelGetMasterAddrByName(MasterListener.this.masterName);
                                        if (masterlist != null && masterlist.size() == 2) {
                                            hostAndPort = MyJedisSentinelPool.this.toHostAndPort(masterlist);
                                        }
                                    }
                                }
                            }
                            //从服务宕机处理
                            //+sdown频道是给定的实例现在处于主观下线状态。这里使用的是+sdown而没有使用+odown频道是因为再从服务宕机后+odown频道并不能响应,虽然再哨兵服务中监听到了但是没有响应到订阅
                            //如果是主服务宕机+odown频道会响应到订阅中,具体原因为没有找到。或许redis的机制就是这样的吧。
                            else if (channel.equals("+sdown") && MyJedisSentinelPool.this.connectTarget.equals("slave")){
                                String[] sdownMsg = message.split(" ");
                                //从服务线下并且当前的连接池是链接的下线的从服务,那么就换一个从服务器,如果没有找到合适的从服务器就链接主服务器
                                if ("slave".equals(sdownMsg[0]) && sdownMsg.length>6){
                                    HostAndPort sdownhostAndPort = MyJedisSentinelPool.this.toHostAndPort(Arrays.asList(sdownMsg[2], sdownMsg[3]));
                                    if (MyJedisSentinelPool.this.currentHostMaster.equals(sdownhostAndPort)){
                                        Jedis jedis = new Jedis(MasterListener.this.host,MasterListener.this.port);
                                        List<Map<String, String>> sentinelSlaves = jedis.sentinelSlaves(MasterListener.this.masterName);
                                        for (int i=0;i<sentinelSlaves.size();i++){
                                            Map<String, String> slaveMap = sentinelSlaves.get(i);
                                            String ip = slaveMap.get("ip");
                                            String port = slaveMap.get("port");
                                            String flags = slaveMap.get("flags");
                                            String status = slaveMap.get("master-link-status");
                                            if ("ok".equals(status)){
                                                hostAndPort = MyJedisSentinelPool.this.toHostAndPort(Arrays.asList(ip, port));
                                                break;
                                            }
                                        }
                                        if (hostAndPort == null){
                                            hostAndPort=MyJedisSentinelPool.this.toHostAndPort(Arrays.asList(sdownMsg[6], sdownMsg[7]));
                                        }

                                    }
                                }
                            }
                            if (hostAndPort !=null){
                                MyJedisSentinelPool.this.initPool(hostAndPort);
                            }else {
                                MyJedisSentinelPool.this.log.severe("Invalid message received on Sentinel " + MasterListener.this.host + ":" + MasterListener.this.port + " on channel "+channel+" : " + message+",initPool for hostAndPort is null");
                            }
                        }
                    }, new String[]{"+switch-master","+slave-reconf-done","+sdown","+odown"});
                } catch (JedisConnectionException var8) {
                    if (this.running.get()) {
                        MyJedisSentinelPool.this.log.log(Level.SEVERE, "Lost connection to Sentinel at " + this.host + ":" + this.port + ". Sleeping 5000ms and retrying.", var8);

                        try {
                            Thread.sleep(this.subscribeRetryWaitTimeMillis);
                        } catch (InterruptedException var7) {
                            MyJedisSentinelPool.this.log.log(Level.SEVERE, "Sleep interrupted: ", var7);
                        }
                    } else {
                        MyJedisSentinelPool.this.log.fine("Unsubscribing from Sentinel at " + this.host + ":" + this.port);
                    }
                } finally {
                    this.j.close();
                }
            }

        }

        public void shutdown() {
            try {
                MyJedisSentinelPool.this.log.fine("Shutting down listener on " + this.host + ":" + this.port);
                this.running.set(false);
                if (this.j != null) {
                    this.j.disconnect();
                }
            } catch (Exception var2) {
                MyJedisSentinelPool.this.log.log(Level.SEVERE, "Caught exception while shutting down: ", var2);
            }

        }
    }
}

 

MyJedisSentinelFactory
class MyJedisSentinelFactory implements PooledObjectFactory<Jedis> {
    private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference();
    private final int connectionTimeout;
    private final int soTimeout;
    private final String password;
    private final int database;
    private final String clientName;
    private final boolean ssl;
    private final SSLSocketFactory sslSocketFactory;
    private SSLParameters sslParameters;
    private HostnameVerifier hostnameVerifier;

    public MyJedisSentinelFactory(String host, int port, int connectionTimeout, int soTimeout, String password, int database, String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier) {
        this.hostAndPort.set(new HostAndPort(host, port));
        this.connectionTimeout = connectionTimeout;
        this.soTimeout = soTimeout;
        this.password = password;
        this.database = database;
        this.clientName = clientName;
        this.ssl = ssl;
        this.sslSocketFactory = sslSocketFactory;
        this.sslParameters = sslParameters;
        this.hostnameVerifier = hostnameVerifier;
    }

    public MyJedisSentinelFactory(URI uri, int connectionTimeout, int soTimeout, String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier) {
        if (!JedisURIHelper.isValid(uri)) {
            throw new InvalidURIException(String.format("Cannot open Redis connection due invalid URI. %s", uri.toString()));
        } else {
            this.hostAndPort.set(new HostAndPort(uri.getHost(), uri.getPort()));
            this.connectionTimeout = connectionTimeout;
            this.soTimeout = soTimeout;
            this.password = JedisURIHelper.getPassword(uri);
            this.database = JedisURIHelper.getDBIndex(uri);
            this.clientName = clientName;
            this.ssl = ssl;
            this.sslSocketFactory = sslSocketFactory;
            this.sslParameters = sslParameters;
            this.hostnameVerifier = hostnameVerifier;
        }
    }

    public void setHostAndPort(HostAndPort hostAndPort) {
        this.hostAndPort.set(hostAndPort);
    }

    @Override
    public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
        BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();
        if (jedis.getDB() != (long)this.database) {
            jedis.select(this.database);
        }

    }

    @Override
    public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {
        BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();
        if (jedis.isConnected()) {
            try {
                try {
                    jedis.quit();
                } catch (Exception var4) {
                    ;
                }

                jedis.disconnect();
            } catch (Exception var5) {
                ;
            }
        }

    }

    @Override
    public PooledObject<Jedis> makeObject() throws Exception {
        HostAndPort hostAndPort = (HostAndPort)this.hostAndPort.get();
        Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), this.connectionTimeout, this.soTimeout, this.ssl, this.sslSocketFactory, this.sslParameters, this.hostnameVerifier);

        try {
            jedis.connect();
            if (null != this.password) {
                jedis.auth(this.password);
            }

            if (this.database != 0) {
                jedis.select(this.database);
            }

            if (this.clientName != null) {
                jedis.clientSetname(this.clientName);
            }
        } catch (JedisException var4) {
            jedis.close();
            throw var4;
        }

        return new DefaultPooledObject(jedis);
    }

    @Override
    public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception {
    }

    @Override
    public boolean validateObject(PooledObject<Jedis> pooledJedis) {
        BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();

        try {
            HostAndPort hostAndPort = (HostAndPort)this.hostAndPort.get();
            String connectionHost = jedis.getClient().getHost();
            int connectionPort = jedis.getClient().getPort();
            return hostAndPort.getHost().equals(connectionHost) && hostAndPort.getPort() == connectionPort && jedis.isConnected() && jedis.ping().equals("PONG");
        } catch (Exception var6) {
            return false;
        }
    }
}
MyJedisSentinelConnectionFactory
public class MyJedisSentinelConnectionFactory extends JedisConnectionFactory{
    private String connectTrage;
    public MyJedisSentinelConnectionFactory(RedisSentinelConfiguration sentinelConfig, JedisPoolConfig poolConfig,String connectTrage){
        super(sentinelConfig,poolConfig);
        this.connectTrage=connectTrage;
    }
    @Override
    protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config){
        GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig();
        return new MyJedisSentinelPool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()),
                poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName(),connectTrage);
    }

    private int getConnectTimeout() {
        return Math.toIntExact(getClientConfiguration().getConnectTimeout().toMillis());
    }

    private Set<String> convertToJedisSentinelSet(Collection<RedisNode> nodes) {

        if (CollectionUtils.isEmpty(nodes)) {
            return Collections.emptySet();
        }

        Set<String> convertedNodes = new LinkedHashSet<>(nodes.size());
        for (RedisNode node : nodes) {
            if (node != null) {
                convertedNodes.add(node.asString());
            }
        }
        return convertedNodes;
    }

    private int getReadTimeout() {
        return Math.toIntExact(getClientConfiguration().getReadTimeout().toMillis());
    }

}

3、接下来就是创建RedisTemplate对象。我是创建了两个,一个是主要用来写,链接的是主服务。一个是用来读链接的是从服务。

/**
 * @program: boot-hero
 * @description: redis哨兵配置
 * @author: maque
 * @create: 2020-04-27 10:36
 */
@Configuration
public class MyRedisConfig{

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private int port;

    @Value("${spring.redis.timeout}")
    private String timeout;

    @Value("${spring.redis.database}")
    private int database;

    @Value("${spring.redis.password}")
    private String password;

    @Value("${spring.redis.sentinel.nodes}")
    private String redisNodes;

    @Value("${spring.redis.sentinel.master}")
    private String master;

    @Value("${spring.redis.jedis.pool.max-idle}")
    private String maxIdle;

    @Value("${spring.redis.jedis.pool.min-idle}")
    private String minIdle;

    @Value("${spring.redis.jedis.pool.max-active}")
    private String maxActive;

//    @Value("{spring.redis.jedis.pool.max-wait}")
//    private String maxWait;

    //redis哨兵配置
    @Primary
    @Bean
    public RedisConnectionFactory redisConnectionFactoryMaster(){
        String[] host = redisNodes.split(",");
        Set<String> nodes = new HashSet<>();
        for(String redisHost : host){
            nodes.add(redisHost);
        }
        RedisSentinelConfiguration redisSentinelConfiguration = new RedisSentinelConfiguration(master, nodes);
        redisSentinelConfiguration.setPassword(RedisPassword.of(password.toCharArray()));
        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
        genericObjectPoolConfig.setMaxIdle(Integer.valueOf(maxIdle));
        genericObjectPoolConfig.setMinIdle(Integer.valueOf(minIdle));
        genericObjectPoolConfig.setMaxTotal(Integer.valueOf(maxActive));
//        genericObjectPoolConfig.setMaxWaitMillis(maxWait.toMillis());
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        Set<RedisNode> sentinels = redisSentinelConfiguration.getSentinels();
        for (RedisNode node:sentinels){
            boolean master = node.isMaster();
            System.out.println("===isMaster===="+master);
        }
//        JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(redisSentinelConfiguration,jedisPoolConfig);
        JedisConnectionFactory jedisConnectionFactory = new MyJedisSentinelConnectionFactory(redisSentinelConfiguration,jedisPoolConfig,"master");
        return jedisConnectionFactory;
    }

    @Bean
    public RedisConnectionFactory redisConnectionFactorySlave(){
        String[] host = redisNodes.split(",");
        Set<String> nodes = new HashSet<>();
        for(String redisHost : host){
            nodes.add(redisHost);
        }
        RedisSentinelConfiguration redisSentinelConfiguration = new RedisSentinelConfiguration(master, nodes);
        redisSentinelConfiguration.setPassword(RedisPassword.of(password.toCharArray()));
        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
        genericObjectPoolConfig.setMaxIdle(Integer.valueOf(maxIdle));
        genericObjectPoolConfig.setMinIdle(Integer.valueOf(minIdle));
        genericObjectPoolConfig.setMaxTotal(Integer.valueOf(maxActive));
//        genericObjectPoolConfig.setMaxWaitMillis(maxWait.toMillis());
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        Set<RedisNode> sentinels = redisSentinelConfiguration.getSentinels();
        for (RedisNode node:sentinels){
            boolean master = node.isMaster();
            System.out.println("===isMaster===="+master);
        }
//        JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(redisSentinelConfiguration,jedisPoolConfig);
        JedisConnectionFactory jedisConnectionFactory = new MyJedisSentinelConnectionFactory(redisSentinelConfiguration,jedisPoolConfig,"slave");
        return jedisConnectionFactory;
    }

    @Bean(name = "redisTemplateMaster")
    public RedisTemplate<String,Object> redisTemplateMaster(){

        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        JdkSerializationRedisSerializer jdkSerializationRedisSerializer = new JdkSerializationRedisSerializer();
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jdkSerializationRedisSerializer.serialize(om);
        RedisConnectionFactory factory = redisConnectionFactoryMaster();
        redisTemplate.setConnectionFactory(factory);

        // key、value序列化方式;(不然会出现乱码;),但是如果方法上有Long等非String类型的话,会报类型转换错误;
        // 所以在没有自己定义key生成策略的时候,以下这个代码建议不要这么写,可以不配置或者自己实现ObjectRedisSerializer
        // 或者JdkSerializationRedisSerializer序列化方式;
        StringRedisSerializer redisSerializer = new StringRedisSerializer();// Long类型不可以会出现异常信息;
        redisTemplate.setKeySerializer(redisSerializer);
        redisTemplate.setHashKeySerializer(redisSerializer);
        redisTemplate.setValueSerializer(jdkSerializationRedisSerializer);
        redisTemplate.setHashValueSerializer(jdkSerializationRedisSerializer);
        System.out.println("===afterPropertiesSet=========");

        return redisTemplate;
    }

    @Bean(name = "redisTemplateSlave")
    public RedisTemplate<String,Object> redisTemplateSlave(){
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        JdkSerializationRedisSerializer jdkSerializationRedisSerializer = new JdkSerializationRedisSerializer();
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jdkSerializationRedisSerializer.serialize(om);
        RedisConnectionFactory factory = redisConnectionFactorySlave();
        redisTemplate.setConnectionFactory(factory);
        // key、value序列化方式;(不然会出现乱码;),但是如果方法上有Long等非String类型的话,会报类型转换错误;
        // 所以在没有自己定义key生成策略的时候,以下这个代码建议不要这么写,可以不配置或者自己实现ObjectRedisSerializer
        // 或者JdkSerializationRedisSerializer序列化方式;
        StringRedisSerializer redisSerializer = new StringRedisSerializer();// Long类型不可以会出现异常信息;
        redisTemplate.setKeySerializer(redisSerializer);
        redisTemplate.setHashKeySerializer(redisSerializer);
        redisTemplate.setValueSerializer(jdkSerializationRedisSerializer);
        redisTemplate.setHashValueSerializer(jdkSerializationRedisSerializer);
        System.out.println("===afterPropertiesSet=========");

        return redisTemplate;
    }
}

4、RedisTemplate对象注入

 

5、讲述:代码中都有备注,可以参考一下源码,多做些测试,不懂的地方就会明了。重点是明白重写的原理,如果只是按部就班的搬代码,是很难实现的,因为引用的jar版本不同,内部的实现原理是不同的。

我主要还是使用了源码的代码只是在其上进行了修改,大部分的逻辑及思想并没有改变。

在MyJedisSentinelConnectionFactory类中添加了一个属性private String connectTrage;以方便

传递给MyJedisSentinelPool对象,用于告诉MyJedisSentinelPool对象我需要服务的链接池还是从服务的链接池。是通过构造函数来传递的

这个构造函数时新建的,没有修改原有的构造函数,原因是避免不必要的麻烦。所以要在创建

MyJedisSentinelConnectionFactory对象的时候要使用这个构造函数。

创建MyJedisSentinelFactory类是因为MyJedisSentinelPool中需要,这个类就是redis.clients.jedis.JedisFactory的源码,没有做任何的修改

主要的修改工作还是在MyJedisSentinelPool类中。

在类中声明了两个属性

在构造函数中有三个很重要的地方

 

注意:在创建redisTemplateMaster用到的RedisConnectionFactory的bean上主一定要加上@Primary注解,不然会出现多个类型错误的问题,在监听器中不要使用一创建好的Jedis对像

因为这个对象只能用于订阅的相关事宜,不然会抛出异常 ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context。

 

参考文章:https://blog.csdn.net/Jack_Yin/article/details/84921899

 


版权声明:本文为weixin_38326506原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。