使用队列实现redis数据一致性的思考

简介

  • 在使用redis的时候,前面介绍了,由于操作数据库和操作redis缓存不是一个原子操作,且还会存在多个CPU之间并行执行的情况,所以就会有一个线程在操作数据库和缓存的时间节点之间,另外一个线程也在执行操作数据库和缓存,这样就会导致数据可以与缓存之间会存在数据不一致的情况。
  • 并且,无论使用何种更新策略,都无法保证数据的一致性,那么,如果某些数据在业务场景下不能出现数据库与缓存的长时间的不同步情况,就需要考虑,如何才能保证数据的一致性。
  • 具体要了解在不同更新策略下发生缓存与数据库数据不一致的各种情况,可以看我的上篇博客介绍
  • 博客链接: Redis中的几种更新策略.

分析及制定解决方案

这个最初实现思路是从龙果学院的亿级电商大型分布式缓存架构实战的课程里面来得,但是也不一样。

  • 上面我们也说了,之所以会数据不一致,就是因为出现了多个线程的数据库和缓存操作同时执行的情况,那么,要解决这个问题,就要从这点入手,就不能有多个线程的数据库和缓存操作同时执行的时候,而请求过来的时候,都是tomcat自己的线程池里分发的线程来执行这个请求的,请求可以多线程同时运行,但是,更新数据库和缓存的操作就不行。
  • 因此,可以考虑以下方案:
  • 加锁,对执行数据库和缓存操作的那部分代码进行加锁,每当多个线程代码运行到这里的时候,如果此时已经有线程在执行了,下一个线程就会被阻塞,乍一看,确实已经实现了功能,并且能够确保解决问题,但是一想想每秒有上万或者更高的请求打过来的时候,就算只有几百的写请求,但是上万的读查询在缓存失效后也会有大量的请求进入到锁的代码块中,你就能马上意识到,这样的解决方案是行不通的,性能太差。
  • 使用队列串行化执行数据库与缓存更新的操作,首先,最简单的,把这些操作放到一个队列里面,然后有个线程取出来执行,这样的机制是不行的,这样的话它和reentrantLock锁利用AQS和LockSupport.park的机制是一毛一样的,相当于变相的自己实现了个锁。所以我们这里是要用到多个队列的。
  • 我们这里之所以还要用到队列,是因为可以在使用队列的基础上进行灵活的修改。

使用队列的详细方案

多线程多队列

  • 首先,从队列中取出请求去执行的线程不能只有一个线程,要保证每个线程都有自己对应的队列,否则就失去了多队列的意义了。
  • 通过路由请求到不同的队列来实现,同一个键的请求,不会出现并发执行数据库和缓存操作的行为,而这里的路由请求的策略,我们就采用最老土的hash算法,因为我们在代码中的队列数量和线程数量是不会去实时改变的,通过该请求的键的hashcode对队列数量进行取余操作,将其路由分配到不同的队列里面。

查询去重

  • 再者,仅仅只是增加队列数量是不够的,这也只能是提高了队列数量倍的性能而已,应该要有更多的优化操作,,,比如,所有进入队列的相同的查询操作,其实都可以优化为一个操作,这样,只要该队列中存在查询该键的请求,就不会把这个键放入到队列里面去了,尤其是对于redis来说,查询的操作数量要远大于写入的操作数量,这个是一个很值得去优化的地方。

等待策略

  • 好了,这样就能够大概的把解决方案给描述出来了,那么,还有一点就是,在一个线程把这部分请求发送到队列后,需要执行什么样的等待策略呢?
  • 比如:通过while去轮询,在redis里面查找,找到数据了就break,没找到就继续找,然后再设定一个超时时间,超时了就执行超时的处理程序返回。首先,这样是不太好的,比如你在这个while里面加上了thread.sleep(),你sleep时间小了吧,老是上下文切换,引起不必要的性能损耗,再者就是,你查的时候,说不定你发送到队列里的查请求刚执行完,又有更新写入请求给把缓存删除了,你就不一定能找到刚好redis里有缓存的时间点,尤其是并发量比较大的时候,应该是会经常查询超时把,再者,就是你这样又给redis增加了压力,又凭空多出来这么多查询的请求。所以说,我看网上很多的人在说这样的做法,我也不知道他们有没有思考过,到底有没有真正的在生产环境中使用过,还是说复制过来,贴个原创标签就完事了。
  • 在网上有看到有人设置个变量用while刷,while完以后才去读redis,他干脆就不在while里面加thread.sleep()了,直接让CPU裸奔,这样就更加不用说了,这样是一定会有内存可见性的问题的,而且那么多的查询请求,很多个线程在那里while,非得让每一个线程使用完自己的CPU时间片了才将CPU让出来,这样就不用想了,不仅自己这个服务有问题,电脑上的其他程序依然分配不到CPU资源,肯定就卡死了。
  • 所以说,这里是一定要使用可以阻塞和释放的操作的,比如object.wait()或者LockSupport.park()的,本来想着是使用LockSupport.park()的,不用像object.wait()一样还要去用个synchronized同步块,你进入队列里面的请求记录一个入队时的线程就好了,但是后来做查询去重的时候发现,还得有一个list来存放多个线程,还得去维护这个list的并发安全性,还得遍历它去unpark,比较麻烦,我这里又不需要单独操作某个特定的线程,干脆就用object.wait()完事了。

总结一下实现步骤:
(1)请求封装类
(2)线程和队列初始化管理
(3)请求路由分发的详细设计
(4)查询去重和入队列的详细设计
(5)给实际业务调用的服务如查询、更新的封装,通过它来生成请求,发送队列和等待执行完成
(6)使用泛型和接口方法来对其做一个通用的架构,使得具体业务和这个方案代码分离

方案代码

这是方案的文件的图。
在这里插入图片描述
首先,forRedisQueue里面是对请求的封装,RedisServiceImpl是对redis基本操作的封装,ResdisQueueService是对使用redis队列进行操作的封装,代码如下:

  • 请求基类:
public abstract class Request<T> {
    String key;
    T result;
    Exception requestException;
    /**
     * 判断请求是否完成
     */
    boolean isDone;

    /**
     * 执行请求
     */
    abstract void execute();

    public boolean isDone() {
        return isDone;
    }

    public String getKey() {
        return key;
    }

    public T getResult() {
        return result;
    }

    public Exception getRequestException() {
        return requestException;
    }
}
  • 查询请求类:
public class QueryRequest<T> extends Request<T> {

    private QueryFunction<T> queryFunction;

    public QueryRequest(String key, QueryFunction<T> queryFunction) {
        this.key = key;
        this.queryFunction = queryFunction;
    }

    @Override
    public void execute() {
        try {
            if (queryFunction != null) {
                result = queryFunction.queryExecution(key);
            }
        } catch (Exception e) {
            this.requestException = e;
        } finally {
            synchronized (this) {
                isDone = true;
                this.notifyAll();
            }
        }
    }
}
  • 更新请求类(注意这个更新和写入什么的都是用的这个类):
public class UpdateRequest<T> extends Request<T> {
    private T value;
    private UpdateFunction<T> updateFunction;

    public UpdateRequest(String key, T value, UpdateFunction<T> updateFunction) {
        this.key = key;
        this.value = value;
        this.updateFunction = updateFunction;
    }

    @Override
    public void execute() {
        try {
            if (updateFunction!=null){
                updateFunction.updateExecution(key, value);
            }
        } catch (Exception e) {
            this.requestException =e;
        } finally {
            synchronized (this) {
                isDone=true;
                this.notify();
            }
        }
    }
}
  • 查找请求方法的接口
public interface QueryFunction<T> {
    /**需要执行查询的动作,一般是查数据库和写入redis缓存的操作
     *
     * @param key 要查询的key
     * @return 查询得到的值
     */
    public T queryExecution(String key);
}
  • 更新请求方法的接口
public interface UpdateFunction<T> {
    /**
     * 需要执行更新的动作,一般是更新数据库和删除redis缓存的操作
     *
     * @param key   要更新的key
     * @param value 要更新的value
     */
    public void updateExecution(String key, T value);
}
  • 请求队列的封装
public class RequestQueue {
    private ArrayBlockingQueue<Request> queue;

    public RequestQueue() {
        queue = new ArrayBlockingQueue(100);
    }

    /**
     * 处理请求
     */
    public void process() {
        while (true) {
            Request request = null;
            try {
                request = queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            request.execute();
        }
    }

    /**
     * 如果队列中有该读请求,那么就不插入,直接返回队列中的请求
     *
     * @param request 请求
     * @return
     * @throws InterruptedException
     */
    public <T> Request<T> enqueue(Request<T> request) throws InterruptedException {
        Request<T> request1 = getIfPresent(request);
        if (request1 == null) {
            queue.put(request);
            return request;
        }
        return request1;
    }

    /**
     * 查询队列中是否存在该请求
     *
     * @param request 请求
     * @param <T>
     * @return
     */
    private <T> Request<T> getIfPresent(Request<T> request) {
        if (!(request instanceof QueryRequest)) {
            return null;
        }
        for (Request request2 : queue) {
            if (request2.getKey().equals(request.getKey()) && (request2 instanceof QueryRequest)) {
                return request2;
            }
        }
        return null;
    }
}
  • 请求队列管理类,包括线程和队列的初始化,以及请求的路由分发,springboot结束时是不是要先判断队列里面是否为空,是不是要把请求执行完才结束?暂时还没去深入了解,先标记一下
public class QueueManager {
    private static final int QUEUE_NUM = 10;
    private ExecutorService threadPool;
    private List<RequestQueue> queues;
    private static QueueManager manager = new QueueManager();

    public static QueueManager getInstance() {
        return manager;
    }

    private QueueManager() {
        initial();
    }

    private void initial() {
        threadPool = new ThreadPoolExecutor(QUEUE_NUM, 10, 200, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        queues = new ArrayList<>(QUEUE_NUM);
        for (int i = 0; i < QUEUE_NUM; i++) {
            RequestQueue queue = new RequestQueue();
            queues.add(queue);
            threadPool.execute(queue::process);
        }
    }

    /**
     * 请求路由,发送队列
     *
     * @param request 请求
     * @return
     */
    public <T> Request<T> dispatch(Request<T> request) {
        int h = request.getKey().hashCode();
        int hash = request.getKey() == null ? 0 : h ^ (h >>> 16);
        int index = hash & (QUEUE_NUM - 1);
        try {
            request = queues.get(index).enqueue(request);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return request;
    }

    public void shunDown() {

    }
}
  • 真正的供项目调用的服务接口
/**
 * 用于给队列发送消息,使更新和查询操作串行化
 *
 * @author liug132055
 */
@Service
public class RedisQueueService {
    @Resource
    PlatformTransactionManager platformTransactionManager;
    @Resource
    TransactionDefinition definition;

    /**
     * 先从redis缓存获取,获取不到,再入队列查数据库
     *
     * @param key                要查询的key
     * @param timeout            超时时间,以毫秒为单位
     * @param queryCacheFunction 定义查询操作
     * @return
     */
    public <T> T query(String key, long timeout, QueryFunction<T> queryCacheFunction, QueryFunction<T> queryDataBaseFunction) throws Exception {
        T result = queryCacheFunction.queryExecution(key);
        if (result != null) {
            return result;
        }
        Request<T> request = new QueryRequest<>(key, queryDataBaseFunction);
        request = QueueManager.getInstance().dispatch(request);
        try {
            long deadline = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) + timeout;
            synchronized (request) {
                while (!request.isDone()) {
                    if (timeout <= 0)
                        break;
                    request.wait(timeout);
                    timeout = deadline - TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (request.getRequestException() != null) {
            throw request.getRequestException();
        }
        return request.getResult();
    }

    /**
     * 执行更新操作
     *
     * @param key            要更新的key
     * @param value          要更新的值
     * @param timeout        超时时间,以毫秒为单位
     * @param updateFunction 定义更新操作
     */
    public <T> void update(String key, T value, long timeout, UpdateFunction<T> updateFunction) throws Throwable {
        //将传进来的更新操作再包装一层事务,生成请求
        Request<T> request = new UpdateRequest<>(key, value, (key1, value1) -> {
            TransactionStatus status = platformTransactionManager.getTransaction(definition);
            try {
                if (updateFunction != null) {
                    updateFunction.updateExecution(key1, value1);
                }
                System.out.println("提交");
                platformTransactionManager.commit(status);
            } catch (Throwable e) {
                System.out.println("回滚");
                platformTransactionManager.rollback(status);
                throw e;
            }
        });
        QueueManager.getInstance().dispatch(request);
        try {
            long deadline = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) + timeout;
            synchronized (request) {
                while (!request.isDone()) {
                    if (timeout <= 0)
                        break;
                    request.wait(timeout);
                    timeout = deadline - TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //在发生异常以及更新超时的时候需要给controller抛出异常
        if (request.getRequestException() != null || !request.isDone()) {
            throw request.isDone() ? request.getRequestException() : new TimeOutException("更新超时");
        }
    }
}
  • 上面都是固定的代码,大部分的业务场景上面的代码都不要更改
  • 下面就是业务中使用这个代码了,所有的业务代码,包括你想放到队列里面去执行的逻辑,都是直接在这里去写
@Service("attrDictService")
public class AttrDictServiceImpl implements AttrDictService {
    @Resource
    private AttrDictDao attrDictDao;
    @Resource
    private RedisQueueService redisQueueService;
    @Resource
    private RedisService redisService;


    /**
     * 查询多条数据
     *
     * @param table 要查询的表名
     * @return 对象列表
     */
    @Override
    public Map<String, DictEntity> queryAllByTable(String table) throws Throwable {
        Map<String, DictEntity> map = redisQueueService.query(table, key->
            redisService.hGetAll("core:dict:" + table) , key1 -> {
            /* 这里可以自定义查询的要放入队列的业务逻辑 **/
            //查询缓存
            Map<String, DictEntity> value1 = redisService.hGetAll("core:dict:" + table);
            if (!ObjectUtils.isEmpty(value1)) {//第二次检查
                return value1;
            }
            //查询数据库
            List<DictEntity> dictEntities = attrDictDao.queryAllByTable(key1);
            Map<String, DictEntity> map1 = new HashMap<>(dictEntities.size());
            dictEntities.forEach(x -> {
                map1.put(x.getId().toString(), x);
            });
            //设置缓存
            if (map1.size() != 0) {
                redisService.hSetAll("core:dict:" + table, map1);
            }
            return map1;
            /* 这里可以自定义查询的要放入队列的业务逻辑 **/
        },200);
        return map;
    }

    /**
     * 新增数据
     *
     * @param dict 实例对象
     */
    @Override
    public void insert(DictEntity dict) throws Throwable {
        Timestamp created = new Timestamp(System.currentTimeMillis());
        dict.setCreatedAt(created);
        redisQueueService.update(dict.getTable(), dict, (key1, value1) -> {
            /* 这里可以自定义插入数据的要放入队列的业务逻辑,采取更新数据库再删除更新缓存的策略 **/
            attrDictDao.insert(value1);
            redisService.del("core:dict:" + key1);
            /* 这里可以自定义插入数据的要放入队列的业务逻辑,采取更新数据库再删除缓存的策略 **/
        },200);
    }

    /**
     * 修改数据
     *
     * @param dict 实例对象
     */
    @Override
    public void update(DictEntity dict) throws Throwable {
        redisQueueService.update(dict.getTable(), dict, (key1, value1) -> {
            /* 这里可以自定义更新数据的要放入队列的业务逻辑,采取更新数据库再删除缓存的策略 **/
            attrDictDao.update(value1);
            redisService.del("core:dict:" + key1);
            /* 这里可以自定义更新数据的要放入队列的业务逻辑,采取更新数据库再删除缓存的策略 **/
        },200);
    }

    /**
     * 通过主键删除数据
     *
     * @param id 主键
     */
    @Override
    public void deleteById(Long id, String table) throws Throwable {
        redisQueueService.update(table, null, (key1, value1) -> {
            /* 这里可以自定义删除数据的要放入队列的业务逻辑 **/
            attrDictDao.deleteById(id);
            redisService.del("core:dict:" + key1);
            /* 这里可以自定义删除数据的要放入队列的业务逻辑 **/
        }200);
    }
}
  • 那么,在这里,就可以自定义要放入队列中执行的业务代码了。

一些技术要点

一、使用request.wait(200)超时机制

  • 在RedisQueueService 中,使用request.wait(200),需要使用有带超时的功能,根据自己的实际情况定制超时时间,考虑的因素有这几点:
    (1)第一个点,防止请求的实在太多,一直阻塞很长时间才得到返回值,可以提前返回,不然耗尽了tomcat的线程池资源,使得其他服务也被卡住了,或者说使用hystrix的资源隔离,利用线程池隔离技术来保证即使这个处理时间太长,也不至于会将线程资源耗尽,
    (2)第二个点就是,分发完请求进队列之后,说不定也会有极端情况队列里面的这个请求已经执行完了,并且先notifyAll了,然后才执行request.wait(),那么这个request.wait()就会一直在等,永远都不会返回了。所以肯定也是需要一个超时时间来应对这种特殊的极端的情况的,虽然执行redis和数据库操作需要花费一定的时间,出现这种情况的概率很小,在这段时间里面即使这个线程分发请求到队列里然后被中断了,也该重新切换回来执行了,但是没有人能保证在redis和数据库操作完成前它就一定会被切换回来执行。
    。。。。我其实可以在request里设置一个isDone变量,执行完请求把isDone设置为true,然后wait前先判断isDone为false才执行,这样的话也能够实现就是先notify了的话,wait就不会执行了。主要就是发现了一点,这种做法包括java并发包里面有大量的实现的object.wait()或者condition.await()都是这样做的。(已改,更新了一下,这里也采用这种做法,加了个isDone判断,确实感觉先notify了后还能wait这样不太好,并且在我的并发编程系列也有深入分析了condition.await()他们的原理,也解释了这一点)原文地址:- Java并发编程系列 | AQS之条件队列的原理

二、内部异常捕获

  • 然后就是,这个请求被分发到了队列里,然后最终是通过别的线程去执行的,那么我们就需要在执行这些请求的命令的线程里进行异常捕获,然后在wait结束之后抛出,供controller的全局异常捕获,统一处理,不然要是不进行捕获的话,那这个线程池的线程便会死掉,接下来所有分发到这个线程的请求都没有办法得到执行,最终队列越来越长,且得不到执行。

三、封装代码,底层逻辑与具体业务逻辑分离

  • 还有就是呢,一般来说,业务的代码就不要写到底层去了,这里的请求的放入队列的要执行的逻辑,都可以直接在调用业务接口时指定,这样的话就不用去底层改了,比较好。

四、对更新操作采用编程式事务进行封装,应对redis写入失败问题

  • 在RedisQueueService 的update方法中,对业务传进来的更新操作再次加上封装,然后生成请求放入队列中进行消费,这里主要是看到网上大部分的方案都没有去解决redis写入失败的问题但是又都提到了,所以在这里进行事务操作的封装的话,防止数据库删除操作成功了,而缓存则因为可能是redis短时间内宕机或者网络异常等各种原因导致的删除操作失败,没有事务的话那么数据库删除了而redis中还存在数据,就会造成数据不一致,而加入事务的话,只要传入的操作发生异常,数据库直接回滚,并将异常继续抛出给该线程,再抛给controller处理,绝对的缓存数据库双写一致性。
  • 在这里使用的是编程式的事务模式,对更新操作进行了一次封装,因为在service这里数据库的操作只是在这里声明,并不是在这里执行。在spring中,如果被标注了事务注解的方法中,出现了在其他的线程中执行数据库操作,那么这个数据库操作是没有办法被事务所管理的,所以这个事务其实对他就根本不生效。因为spring总是根据threadlocal来获取该线程所对应的数据库连接,从而执行事务,所以采用编程式事务的时候,getTransaction方法总会在线程池里面执行,和执行sql语句的是同一个线程,因此获取到的数据库连接也是同一个数据库连接,这样才能够使得该sql语句被事务所管理。

五、对传入的更新、插入、删除操作的要求,要求采用更新数据库再删除缓存的策略

  • 这里主要是考虑到网络的不稳定性因素,因为毕竟MySQL和Redis之间,加入事务并不能保证两者的完全一致性,他只能够保证在redis操作失败后,数据库能够回滚。
  • 但是如果采用更新redis的策略的话,就会存在redis写入成功,返回响应时由于网络不稳定而导致响应失败,数据库回滚,这样redis成功数据库回滚就会造成数据的不一致。

一些局限性

  • 在上面其实已经考虑的够多了,但是没有办法,在写线程执行删除数据库数据操作的时候,依然没有办法避免写和读数据库接连发生的情况。因为删除数据库是要执行删除缓存的,在读请求二次检查时也并不能检查出来,导致数据被删了仍然读出了数据,能否考虑无论是增删改都是往redis中写数据?想问一问,这种情况下,删除数据库数据,就往redis的这个key存个null值可行不?反正也都是设置了过期时间的,有没有人一起讨论一下?
  • 这个方案只能是基于单个key的查询,适用于那些大量查询都是基于这个单个key来查询的场景,如果有大量请求需要一次性查询或者更改多条key的数据的话,这个方案是不能提供并发安全性的保障的,少量这样的请求的话则没有关系,直接操作数据库就好了,反正也是少量,并不会给数据库造成多大的压力。并且大部分需要通过key查询数据的场景一般也会有大量批量查询多条数据的情况。

总结

  • 使用队列来解决redis缓存数据不一致的方案差不多就是这样了,由于写这个花了差不多两天多,后面又更新了好几次,可能会存在一些不足的地方和漏洞,希望大家能指出来。然后这个我自己也在自己的笔记本上用Jmeter进行过测试,在极端情况,只对同一个键进行查询和更新操作的话,那就就是只用到了一个队列,更新操作吞吐量在一百每秒的时候,查询操作可以有几千每秒,并且平均响应时间都能在50ms以内,如果是不同的键,用到多个队列,阻塞的时间就会变短,速度还会快不少。
  • 当然了,只要是使用队列和wait() 这种操作的,基本上都会对性能有一定的影响,和写操作的数量成正比,如果写操作突然变多,那么其响应时间就会更长,那么由于tomcat最大线程数的限制,总共就这么多线程,响应时间变长了,其吞吐量肯定也会有一定的下降。所以说,这种方案是不适合写操作过多的情况的,但是一般情况下写操作也是很少数的,为了数据的一致性,这样的一点牺牲也是必要的。
  • 再就是,在使用redis 的时候,如果真的读写操作比较多的情况,这个服务是可以去部署多个实例到多台设备上,通过上层再进行一次key的分发,对同一个key路由到同一个服务器,来缓解压力,再或者分库分表,这样的话性能会更高一点。
  • 新发表了一片文章:链接: 对使用队列解决缓存一致性的优化(二).
    此方案的项目源码放到了我的github地址中:github地址

版权声明:本文为weixin_44228698原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。