【redis知识点整理】 --- RedisTemplate使用pipeline进行批量set需要注意的坑

本文代码对应的github地址:https://github.com/nieandsun/redis-study



1 简单介绍一下什么是pipeline — 以jedis为例

以批量根据key删除数据为例,如若单纯的使用del命令进行删除,那java代码可能会这样写:

public static void delNoStus(String... keys) {
    Jedis jedis = new Jedis(RedisTools.ip, RedisTools.port);
    //在循环中进行批量删除
    for (String key : keys) {
        jedis.del(key);
    }
    jedis.close();
}

上面的删除命令每次都会建立一个新的网络连接,其模型如下,这将非常耗时,尤其是跨机房部署的情况下。
在这里插入图片描述
而pipeline可以将多个指令进行打包,一次丢给redis服务器,并获取返回结果,其运行模型如下:
在这里插入图片描述
那java操作jedis的代码就可以写成下面的样子:

public static void delNoPipe(String... keys) {
    Jedis jedis = new Jedis(RedisTools.ip, RedisTools.port);
    Pipeline pipelined = jedis.pipelined();
    for (String key : keys) {
        pipelined.del(key);//封装未提交
    }
    pipelined.sync();//一次性提交
    jedis.close();
}

有兴趣的可以clone下来我的代码进行测试一下,我测试的结果为对10000个key进行删除,不使用pipelie的情况下总用时为416ms,而使用pipeline的情况下总用时为34ms,时间缩短了10倍还多。


2 RedisTemplate使用pipeline批量set需要注意的坑

当然使用RedisTemplate也可以使用pipeline,但是注意,有一个比较大的坑。

我看网上给出的批量set值的方法都如下:

  public void setPipe(Map<String, String> map) {

        List list = redisTemplate.executePipelined((RedisCallback<String>) connection -> {
            Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, String> next = iterator.next();

                connection.set(next.getKey().getBytes(), next.getValue().getBytes());
            }
            return null;
        });

        System.out.println("setPipe" + list);
    }

但是经过我测试发现,这种方式可以把值存入到redis服务器,但是读取数据时,无论使不用使用pipeline都会报如下错误:

org.springframework.data.redis.serializer.SerializationException: Could not read JSON: Unrecognized token 'value': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"value:0"; line: 1, column: 7]; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'value': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"value:0"; line: 1, column: 7]

	at org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer.deserialize(Jackson2JsonRedisSerializer.java:75)
	at org.springframework.data.redis.core.RedisTemplate.deserializeMixedResults(RedisTemplate.java:617)
	at org.springframework.data.redis.core.RedisTemplate.lambda$executePipelined$1(RedisTemplate.java:335)
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228)
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188)
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:175)
	at org.springframework.data.redis.core.RedisTemplate.executePipelined(RedisTemplate.java:324)
	at org.springframework.data.redis.core.RedisTemplate.executePipelined(RedisTemplate.java:314)
	at com.nrsc.redis.learning.pipeline.RedisTemplateTest1.getPipe(RedisTemplateTest1.java:86)
	at com.nrsc.redis.learning.pipeline.RedisTemplateTest1.test(RedisTemplateTest1.java:36)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
	at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
	at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
	at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
	at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
	at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'value': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"value:0"; line: 1, column: 7]
	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:717)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3585)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2680)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:865)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:757)
	at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4620)
	at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4469)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3538)
	at org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer.deserialize(Jackson2JsonRedisSerializer.java:73)
	... 40 more

2020-05-31 11:54:11.942  INFO 4912 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
2020-05-31 11:54:11.984  WARN 4912 --- [extShutdownHook] d.r.c.l.LettucePoolingConnectionProvider : LettucePoolingConnectionProvider contains unreleased connections

Process finished with exit code -1

其实错误提示的很明显,就是序列化问题,百度了一下,找到了如下文章,测试发现可行。

Redis使用Pipeline时对象序列化失败org.springframework.data.redis.serializer.SerializationException

这里我贴一个完整的 RedisTemplate使用pipeline的demo,有兴趣的可以运行一下:

@SpringBootTest
@RunWith(SpringRunner.class)
@Slf4j
public class RedisTemplateTest2 {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    private int arrayLength = 10000;
    private String[] keys = new String[arrayLength];
    private List<String> keys2 = Lists.newArrayList();

    @Test
    public void test() {
        Map<String, String> map = initData();
        long t = System.currentTimeMillis();
        setPipe(map);
        getPipe(keys);
        System.out.println(System.currentTimeMillis() - t);
    }

    /***
     * 初始化数据
     * @return
     */
    public Map<String, String> initData() {
        Map<String, String> map = Maps.newHashMap();

        for (int i = 0; i < arrayLength; i++) {
            String key = "key:" + i;
            String value = "value:" + i;
            map.put(key, value);
            keys[i] = key;
            keys2.add(key);
        }
        return map;
    }


    /****
     * 通过pipeline进行批量设置
     * 报了一个错误,参考下面的文章得到了问题的解决方案:
     * https://blog.csdn.net/myfwjy/article/details/100776426
     * 《Redis使用Pipeline时对象序列化失败org.springframework.data.redis.serializer.SerializationException》
     * @param map
     */
    public void setPipe(Map<String, String> map) {

        RedisSerializer keySerializer = redisTemplate.getKeySerializer();
        RedisSerializer valueSerializer = redisTemplate.getValueSerializer();

        List list = redisTemplate.executePipelined((RedisCallback<String>) connection -> {
            Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, String> next = iterator.next();

                connection.set(keySerializer.serialize(next.getKey()), valueSerializer.serialize(next.getValue()));
            }
            return null;
            //加不加下面的这一行代码应该都可以
        }, redisTemplate.getValueSerializer());

        System.out.println("setPipe" + list);
    }


    public void getPipe(String... keys) {
        List<Object> list = redisTemplate.executePipelined((RedisCallback<?>) connection -> {
            for (String s : keys) {
                connection.get(s.getBytes());
            }
            return null;
        });

        System.out.println("pipeline获取结果" + list);

        List<Object> list1 = redisTemplate.opsForValue().multiGet(keys2);
        System.out.println("multiGet获取结果" + list1);
    }
}

简单提一下运行结果
在这里插入图片描述


end!!!


版权声明:本文为nrsc272420199原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。