使用Canal订阅binlog发送到RabbitMQ的删除补偿

Canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

工作原理:

Canal的工作原理相对简单,就是把自己伪装成MySQL slave,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到Canal发送过来的dump请求,开始推送binary log给Canal,然后Canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等。

设计思路:

我们在保证缓存与数据库双写的最终一致性的时候,我们可以采用这样的思路:用户对数据库进行修改的时候,利用Canal将binlog增量数据发送到RabbitMQ,然后再从队列中将消息解析,得到缓存的RedisKey,并进行删除。

缓存设计:

利用SpringAOP,自定义一个注解作为切入点,这个注解所在的方法就会进行缓存处理。

自定义的注解

/**
 * Copyright (C), 2017-2022, 湖南兴盛优选电子商务有限公司.
 *
 * @author: zhangjie 张杰
 * @create: 2022-07-25 15:02
 * @Description:
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Cache {
 
    //过期时间,默认是60s
    long expire() default 1 * 60 * 1000;
    //缓存标识 key
    String name() default "";
}

缓存切面

@Component
@Slf4j
@Aspect
public class CacheAspect {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
 
    @Pointcut("@annotation(cn.xsyxzhangjie.redis.cache.Cache)")
    public void pt() {
    }
 
    @Around("pt()")
    public Object around(ProceedingJoinPoint pjp) {
        try {
            Signature signature = pjp.getSignature();
            //类名
            String className = pjp.getTarget().getClass().getSimpleName();
            //调用的方法名
            String methodName = signature.getName();
 
            Class[] parameterTypes = new Class[pjp.getArgs().length];
            Object[] args = pjp.getArgs();
            //参数
            String params = "";
            for (int i = 0; i < args.length; i++) {
                if (args[i] != null) {
                    params += JSON.toJSONString(args[i]);
                    parameterTypes[i] = args[i].getClass();
                } else {
                    parameterTypes[i] = null;
                }
            }
            Method method = pjp.getSignature().getDeclaringType().getMethod(methodName, parameterTypes);
            //获取Cache注解
            Cache annotation = method.getAnnotation(Cache.class);
            //缓存过期时间
            long expire = annotation.expire();
            //缓存名称
            String name = annotation.name();
             
            //拼接得到RedisKey
            String redisKey = name + "::"   + params;
             
            //从redis获取缓存
            String redisValue = redisTemplate.opsForValue().get(redisKey);
            if (StringUtils.isNotEmpty(redisValue)) {
                log.info("用到了缓存~~~,{},{}", className, methodName);
                UserInfo result = JSON.parseObject(redisValue, UserInfo.class);
                return result;
            }
            Object proceed = pjp.proceed();
            redisTemplate.opsForValue().set(redisKey, JSON.toJSONString(proceed), Duration.ofMillis(expire));
            log.info("存入缓存~~~ {},{}", className, methodName);
            return proceed;
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
        return "读取缓存出错";
    }
}

查询和修改的接口:

查询和修改接口

@RestController
@RequestMapping("/redisTest")
public class RedisTestController {
 
    @Autowired
    private RedisTemplate redisTemplate;
 
    @Autowired
    private UserInfoService userInfoService;
 
    //缓存切点
    @Cache(expire = 5 * 60 * 1000, name = "findUser")
    @GetMapping("/findUser")
    public UserInfo findUser(@RequestParam String id) {
        return userInfoService.findUserById(id);
    }
 
    @GetMapping("/updateInfo")
    public int updateInfo(@RequestParam String id,@RequestParam String name) {
        return userInfoService.updateNameById(id,name);
    }
}

数据库所用表:

队列配置类:

@Configuration
public class CacheConfig {
 
    @Autowired
    ConnectionFactory connectionFactory;
     
    public static final String CACHE_EXCHANGE_NAME = "compensate.exchange";
    public static final String CACHE_QUEUE_NAME = "compensate.queue";
    public static final String CACHE_ROUTING_KEY = "compensate";
 
    @Bean("cacheExchange")
    public DirectExchange cacheExchange() {
        return ExchangeBuilder
                .directExchange(CACHE_EXCHANGE_NAME)
                .durable(true)
                .build();
    }
 
    @Bean("cacheQueue")
    public Queue cacheQueue() {
        return QueueBuilder.durable(CACHE_QUEUE_NAME).build();
    }
 
    //声明交换机的绑定关系
    @Bean
    public Binding cacheQueueBindingCacheExchange() {
        return BindingBuilder.bind(cacheQueue()).to(cacheExchange()).with(CACHE_ROUTING_KEY);
    }
     
    @Bean
    public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //配置手动确认
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        return factory;
    }
}

消费者:

@Configuration
public class CacheConfig {
 
    @Autowired
    ConnectionFactory connectionFactory;
     
    public static final String CACHE_EXCHANGE_NAME = "compensate.exchange";
    public static final String CACHE_QUEUE_NAME = "compensate.queue";
    public static final String CACHE_ROUTING_KEY = "compensate";
 
    @Bean("cacheExchange")
    public DirectExchange cacheExchange() {
        return ExchangeBuilder
                .directExchange(CACHE_EXCHANGE_NAME)
                .durable(true)
                .build();
    }
 
    @Bean("cacheQueue")
    public Queue cacheQueue() {
        return QueueBuilder.durable(CACHE_QUEUE_NAME).build();
    }
 
    //声明交换机的绑定关系
    @Bean
    public Binding cacheQueueBindingCacheExchange() {
        return BindingBuilder.bind(cacheQueue()).to(cacheExchange()).with(CACHE_ROUTING_KEY);
    }
     
    @Bean
    public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //配置手动确认
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        return factory;
    }
}

测试:

发起一个查询请求,会生成一个RedisKey

 

当我们发起一个修改请求时,这个RedisKey会异步的被删除。

  

因为有消费者这边删除消息缓存失败的情况,所以应该

  1. MQ消息默认设置为持久化,为队列设置镜像队列。
  2. 消费者默认设置手动确认autoAck=false,并设置死信队列。

Canal server接收到的binlog数据直接投递到MQ参考文档:Canal Kafka RocketMQ QuickStart · alibaba/canal Wiki · GitHub


版权声明:本文为zhangjie602原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。