前言
上篇文章介绍了缓存和redis的安装,以及常见redis客户端在java中使用;这篇文章会紧接着上篇继续写一些redis框架的实用功能,在某些业务场景下应用可以考虑的;包括强大的基本数据结构,发布订阅功能,以及stream流这些,redis的设计者,为我们做了更加强大的框架
整个 spring-data-redis 的官方文档 使用api包括了许多使用方式等

Redis的数据结构
Redis命令十分丰富,包括的命令组有Cluster、Connection、Geo、Hashes、HyperLogLog、Keys、Lists、Pub/Sub、Scripting、Server、Sets、Sorted Sets、Strings、Transactions一共14个redis命令组两百多个redis命令,Redis中文命令大全。其中把常见的命令数据结构列出来。
数据结构-Hash

key对应着表中字段,而k-v则对应着变化的值,和关系型数据库中数据就很相似了
- Hget 获取存储在哈希表中指定字段的值
- Hset 将哈希表 key 中的字段 field 的值设为 value
- Hgetall 获取在哈希表中指定 key 的所有字段和值

所有的操作都是 hash开头的 hset map key1 value1 key2 value2 也包括hget hgetall

对于具体的设置,热门商品,就可以采用set的存储方式进行处理
对于java中的hashmap和redis中hash结构进行对比
@Autowired
StringRedisTemplate redisTemplate;
public void hash() {
HashMap<String, Object> user = new HashMap<>();
user.put("name", "hash");
user.put("age", 18);
user.put("userId", 10001);
System.out.println("map user 数据:" + user);
// redisTemplate.opsForHash().putAll("user", user);
redisTemplate.opsForHash().put("user", "name", "hash");
redisTemplate.opsForHash().put("user", "age", "18");
redisTemplate.opsForHash().put("user", "userId", "10001");
System.out.println("redis操作~~~~~");
Map<?, ?> map = redisTemplate.opsForHash().entries("user");
System.out.println("redis结果:" + map);
}可以存一个列表,这两种方式是一样的。 通过putall 或者 put达到的效果是一样的。
数据结构-List

它是类似一种简单队列,用类似链表的形式进行操作。

redisTemplate.opsForList().rightPush("queue_1", "1");
redisTemplate.opsForList().rightPush("queue_1", "2", "3");更加类似mq的方式

但是这个顺序是倒序的 , 也就是说你要左边插入,要从右边拿出 ,


而range 和 pop的区别就是拿出元素删除或者不删除的情况,从描述中就能看到,是阻塞的api
数据结构 - Set

在官方的文档常用的命令

用set实现(交集 并集) 交集示例: 共同关注的好友 并集示例:给同一个帖子,点赞的人+转发的人
public void set() {
redisTemplate.opsForSet().add("user_s", "userC", "userD", "userE");
redisTemplate.opsForSet().add("user_d", "userC", "userE", "userF");
Set<String> sinter = redisTemplate.opsForSet().intersect("user_s", "user_d");
System.out.println("共同关注的场景:" + sinter);
}可以找到公共的部分 userc就是set用的场景


移动集合等等,其他场景就具体使用
数据结构 - Sorted set


而客户端的代码实现
/**
* 排行榜
*/
public void zset() {
redisTemplate.opsForZSet().add("core", "redis", 100);
redisTemplate.opsForZSet().add("core", "java", 99);
redisTemplate.opsForZSet().add("core", "map", 20);
redisTemplate.opsForZSet().add("core", "c", 18);
redisTemplate.opsForZSet().add("core", "c++", 60);
redisTemplate.opsForZSet().add("core", "c#", 50);
// 返回前三名
Set<String> stringSet = redisTemplate.opsForZSet().reverseRange("core", 0, 2);
//返回85到100的数量
Long zcount = redisTemplate.opsForZSet().count("core", 85, 100);
}主要就是一个排行榜的业务场景
其实我感觉这个还有个提高,最好是能把自定义的排序顺序写进去。就更加好了,并且感觉数据排序 以分数的形式都太窄了。

数据结构 - GEO

在使用场景下

这些命令都是geo开头的 命令,用来计算两地位置距离
将指定的地理空间位置(纬度、经度、名称)添加到指定的key中。这些数据将会存储到sorted set这样的目的是为了方便使用GEORADIUS或者GEORADIUSBYMEMBER命令对数据进行半径查询等操作。
GEOADD该命令以采用标准格式的参数x,y,所以经度必须在纬度之前。这些坐标的限制是可以被编入索引的,区域面积可以很接近极点但是不能索引。具体的限制,由EPSG:900913 / EPSG:3785 / OSGEO:41001 规定如下:
- 有效的经度从-180度到180度。
- 有效的纬度从-85.05112878度到85.05112878度。
当坐标位置超出上述指定范围时,该命令将会返回一个错误。
通过命令行进行直接进行距离计算

并且 拿到数据时,会做一个位数补充

代码示例
/**
* 上传位置
*/
public void add(Point point, String userId) {
redisTemplate.opsForGeo().add("user_geo", new RedisGeoCommands.GeoLocation<>(userId, point));
}对于这个point是做了封装经纬度的。
以及求到附近位置的人这些功能操作
/**
* 附近的人
*
* @param point 用户自己的位置
* @param radius 范围,半径
*/
public GeoResults<RedisGeoCommands.GeoLocation> near(Point point, int radius) {
// 半径 100米
Distance distance = new Distance(radius, RedisGeoCommands.DistanceUnit.METERS);
Circle circle = new Circle(point, distance);
// 附近5个人
RedisGeoCommands.GeoRadiusCommandArgs geoRadiusCommandArgs = RedisGeoCommands.GeoRadiusCommandArgs
.newGeoRadiusArgs().includeDistance().limit(5);
GeoResults<RedisGeoCommands.GeoLocation> user_geo = redisTemplate.opsForGeo().radius("user_geo", circle,
geoRadiusCommandArgs);
return user_geo;
}发布订阅
这是redis提供命令实现的 发布订阅功能
- 生产者发布消息到频道
- 消费者订阅频道,从频道接收消息
- 生产者、消费者彼此相互不了解


订阅频道消息,订阅模式。
redis的发布订阅,这个和mq对比起来,还是很多不一样的差别,例如消息超时,消息补偿等等。都没实现,对于一些数据安全性的系统,还是需要使用mq中间件
下面 使用subscribe进行监听通道 数据,会对应着数据监听,有数据显示没

PSUBSCRIBE
订阅给定的模式(patterns)。
支持的模式(patterns)有:
h?llosubscribes tohello,halloandhxlloh*llosubscribes tohlloandheeeelloh[ae]llosubscribes tohelloandhallo,but nothillo
如果想输入普通的字符,可以在前面添加\
怎么样在客户端中使用,主要执行 execute 方法 并做监听 RedisConnection 进行publish 和 subscribe即可
@Autowired
RedisTemplate redisTemplate;
@PostConstruct
@SuppressWarnings({ "unchecked", "rawtypes" })
public void setup() {
redisTemplate.execute(new RedisCallback() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
connection.subscribe((message, pattern) -> {
System.out.println("使用redisTemplate收到消息:" + message);
}, PubsubRedisConfig.PUBSUB_CHANNEL_NAME.getBytes());
return null;
}
});
redisTemplate.execute(new RedisCallback() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
connection.publish(PubsubRedisConfig.PUBSUB_CHANNEL_NAME.getBytes(), "测试".getBytes());
return null;
}
});
}生产者和消费者,是可以隔离开的。
结合spring的监听
获取通过显示注入bean的方式,注入RedisMessageListenerContainer , 添加 MessageListener 消息方式
这种更符合 在spring的框架下做开发,将监听隔绝开
@Bean
public RedisMessageListenerContainer messageListener(RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerContainer rml = new RedisMessageListenerContainer();
rml.setConnectionFactory(redisConnectionFactory);
SendListener smsSendListener = new SendListener();
rml.addMessageListener(smsSendListener, Arrays.asList(new ChannelTopic(PubsubRedisConfig.PUBSUB_CHANNEL_NAME)));
return rml;
}
// 定义触发的方法
class SendListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
System.out.println("借助spring容器收到消息:" + message);
}
}Redis键空间通知
监听删除事件的 发布订阅
不只是监听通道 还有对事件 __keyspace@0__:hkkkk del 的监听
键空间通知功能自2.8.0版本开始可用。
键空间通知允许客户端订阅发布/订阅频道,以便以某种方式接收影响Redis数据集的事件。
可能接收的事件示例如下:
- 所有影响给定键的命令。
- 所有接收LPUSH操作的键。
- 所有在数据库0中到期的键。
事件使用Redis的普通发布/订阅层传递,因此实现了发布/订阅的客户端无需修改即可使用此功能。
由于Redis的发布/订阅是fire and forget,因此如果你的应用要求可靠的事件通知,目前还不能使用这个功能,也就是说,如果你的发布/订阅客户端断开连接,并在稍后重连,那么所有在客户端断开期间发送的事件将会丢失。
将来有计划允许更可靠的事件传递,但可能会在更一般的层面上解决,要么为发布/订阅本身带来可靠性,要么允许Lua脚本拦截发布/订阅的消息以执行推送等操作,就像往队列里推送事件一样。


这都是来自官方文档中,对于对于事件的监听。 相对于监听上面的内容,更加完整。
public void testKeyDelEventChannel() throws InterruptedException {
redisTemplate.execute(new RedisCallback<Long>() {
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
connection.subscribe((message, pattern) -> {
System.out.println("通过Key删除事件通道,收到消息:" + message);
}, "__keyspace@0__:h del".getBytes());
return null;
}
});
redisTemplate.opsForValue().set("h", "hash");
Thread.sleep(1000L);
redisTemplate.delete("hkkkk");
}默认情况下,键空间事件通知是不启用的,因为虽然不太明智,但该功能会消耗一些CPU。可以使用redis.conf中的notify-keyspace-events或者使用CONFIG SET命令来开启通知
Redis新特性Stream

官方文档中描述stream这一块的内存

Stream是Redis 5.0版本引入的一个新的数据类型,它以更抽象的方式模拟日志数据结构,但日志仍然是完整的:就像一个日志文件,通常实现为以只附加模式打开的文件,Redis流主要是一个仅附加数据结构。至少从概念上来讲,因为Redis流是一种在内存表示的抽象数据类型,他们实现了更加强大的操作,以此来克服日志文件本身的限制。
Stream是Redis的数据类型中最复杂的,尽管数据类型本身非常简单,它实现了额外的非强制性的特性:提供了一组允许消费者以阻塞的方式等待生产者向Stream中发送的新消息,此外还有一个名为消费者组的概念。
消费者组最早是由名为Kafka(TM)的流行消息系统引入的。Redis用完全不同的术语重新实现了一个相似的概念,但目标是相同的:允许一组客户端相互配合来消费同一个Stream的不同部分的消息。
在官方文档中

整个消息添加的过程
- XADD添加数据 对应的 key stringname
- value中存在 message:text id由时间+序号组成 自动生成id
遍历消息大小:xrevrganer 由大到小 XRANG 由大到小
消费者 XDEL删除消息 value
消费者 XREAD从指定位置开始读取 XREAD 可阻塞读取 可同时读取多个stream
由XGROUP创建分组group
消费者 除了指定key名称分组名读取XREADGROUP XACK 不同消费者读取同组 消息不会重复
没有ACK进入pending状态 重新认领XCLAIM命令 删除消息xtrim 命令 查看监控 streamXinfo命令
通过命令的形式去操作

遍历 和查看数据 增加数据
包括查看信息就用xinfo就可以了

以及 分组等。
// stream 流,5.0新特性,redisTemplate、jedis还没有支持,Redisson和Lettuce支持了
// 我们使用springboot中默认的redis客户端Lettuce
// 添加: XADD mystream * sensor-id 1234 temperature 19.8
// 遍历: XRANGE mystream - + COUNT 2
// 消费:XREAD COUNT 2 STREAMS mystream 0
// 阻塞式消费: XREAD BLOCK 0 STREAMS mystream $
// 创建消费者组: XGROUP CREATE mystream mygroup $
// 分组消费: XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
// 消费确认: XACK mystream mygroup 1526569495631-0
// 查看未确认的消息: XPENDING mystream mygroup - + 10
// 重新认领消费:XCLAIM mystream mygroup Alice 3600000 1526569498055-0
// XINFO 查看stream信息,监控
总结
整篇文章主要对整个redis的使用,有了一个大的学习,包括各种数据结构,以及各种应用场景。也许在开发中用不了这么多数据结构,根据应用场景来选择不同的数据结构吧。