什么是跳表

Redis有序集合是复合数据结构,它是由一个双hashmap构成的字典和跳表实现的,本文将为大家详细介绍Redis有序集合实现的原理以及使用场景和案例。


有序集合概述

Redis有序集合(sorted set)是复合数据结构,它是由一个双hashmap构成的字典和跳表实现的。

跟集合一样,有序集合也是string类型元素的集合,不同的是每个元素都会关联一个权。通过权值可以有序的获取集合中的元素。

在这里插入图片描述

Redis有序集合跟列表有些类似,例如排序,也都可以获取某一范围的元素,但是也有几点区别

  • 列表类型是通过链表实现的,获取靠近两端的数据速度极快,而当元素增多后,访问中间数据的速度会较慢,所以它更适合较少访问中间元素的应用。
  • 有序集合类型是使用散列表和跳表(Skip list)实现的,所以即使取位于中间部分的数据速度也很快(时间复杂度是O(log(N)))。
  • 列表中不能简单的调整某个元素的位置,但是有序集合可以。
  • 有序集合要比列表类型更耗费内存。

跳表数据结构小结

  • 跳表结合了链表和二分查找的思想
  • 由原始链表和一些通过“跳跃”生成的链表组成
  • 第0层是原始链表,越上层“跳跃”的越高,元素越少
  • 上层链表是下层链表的子序列
  • 查找时从顶层向下,不断缩小搜索范围

Redis有序集合实现

从Redis的git提交历史中可以查到,在1.050版本Redis开始支持可排序集。从Redis 1.0开始就提供了集合(Set)这种数据结构,集合就跟数学上的集合概念是一个道理:无序性,确定性,互异性,集合里的元素无法保证元素的顺序。而业务上的需求,可能不止是一个集合,而且还要求能够快速地对集合元素进行排序,于是,Redis中提供了可排序集这么一种数据结构,无非就是在集合的基础上增加了排序功能。

也许有人会问,Redis中本身是有Sort命令的,它的操作也是同样可以达到对无序集的排序功能,为什么还需要可排序集这样的数据结构呢?这是因为在速度上,Sort命令的时间复杂度为O(N+M*Log(M)),而可排序集获取一定范围内元素的时间复杂度为O(log(N) + M)。


跳表结构及原理

数据结构跳表(Skip List)与AVL、红黑树等相比,数据结构简单,算法易懂,但查询的时间复杂度与平衡二叉树/红黑树相当,跳表的基本结构如下图所示:

在这里插入图片描述

上图中整个跳表结构存放了4个元素:5->10->20->30,图中的红色线表示查找元素30时走的查找路线,从Head指针数组里最顶层的指针所指的20开始比较,与普通的链表查找相比,跳表的查询可以跳跃元素,上图中查询30,发现30比20大,则查找就从20开始,而普通链表的查询必须一个元素一个元素的比较,时间复杂度为O(n)。

如何向跳表中插入元素呢?向跳表中插入元素,由于元素所在层级的随机性,平均起来也是O(logn),也就是查找元素应该插入在什么位置,然后就是普通的移动指针问题。下图所示是往跳表中插入元素28的过程,图中红色线表示查找插入位置的过程,绿色线表示进行指针的移动,将该元素插入。

在这里插入图片描述
如何向跳表中删除元素?首先查找要删除的元素,找到后进行指针的移动,下图所示的是删除元素30的过程:

在这里插入图片描述


跳表实现

跳表基本数据结构及精简后的相关代码如下所示

#include<stdio.h>
#include<stdlib.h>
 
#define ZSKIPLIST_MAXLEVEL 32
#define ZSKIPLIST_P 0.25
#include <math.h>
 
//跳表节点
typedef struct zskiplistNode {
    int key;
    int value;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
    } level[1];
} zskiplistNode;
 
//跳表
typedef struct zskiplist {
    struct zskiplistNode *header;
    int level;
} zskiplist;

在代码中定义了跳表结构中保存的数据为Key->Value这种形式的键值对,注意的是skiplistNode里面内含了一个结构体,代表的是层级,并且定义了跳表的最大层级为32级,下面的代码是创建空跳表,以及层级的获取方式:

//创建跳表的节点
zskiplistNode *zslCreateNode(int level, int key, int value) {
    zskiplistNode *zn = (zskiplistNode *)malloc(sizeof(*zn)+level*sizeof(zn->level));
    zn->key = key;
    zn->value = value;
    return zn;
}
 
//初始化跳表
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;
    zsl = (zskiplist *) malloc(sizeof(*zsl));
    zsl->level = 1;//将层级设置为1
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,NULL,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
    }
    return zsl;
}
 
//向跳表中插入元素时,随机一个层级,表示插入在哪一层
int zslRandomLevel(void) {
    int level = 1;
    while ((rand()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

在这段代码中,使用了随机函数获取元素所在的层级,下面就是向跳表中插入元素,插入元素之前先查找插入的位置,代码如下所示:

//向跳表中插入元素
zskiplistNode *zslInsert(zskiplist *zsl, int key, int value) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i, level;
    x = zsl->header;
    //在跳表中寻找合适的位置并插入元素
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->key < key ||
                (x->level[i].forward->key == key &&
                x->level[i].forward->value < value))) {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    //获取元素所在的随机层数
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            update[i] = zsl->header;
        }
        zsl->level = level;
    }
    //为新创建的元素创建数据节点
    x = zslCreateNode(level,key,value);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;
    }
    return x;
}

下面展示的是代码中删除节点的操作,和插入节点类似:

//跳表中删除节点的操作
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].forward = x->level[i].forward;
        }
    }
    //如果层数变了,相应的将层数进行减1操作
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
}
 
//从跳表中删除元素
int zslDelete(zskiplist *zsl, int key, int value) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;
    x = zsl->header;
    //寻找待删除元素
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->key < key ||
                (x->level[i].forward->key == key &&
                x->level[i].forward->value < value))) {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    x = x->level[0].forward;
    if (x && key == x->key && x->value == value) {
        zslDeleteNode(zsl, x, update);
        //别忘了释放节点所占用的存储空间
        free(x);
        return 1;
    } else {
        //未找到相应的元素
        return 0;
    }
    return 0;
}

跳表的基本数据结构与原理

Redis中跳表的基本数据结构定义为:与基本跳表数据结构相比,在Redis中实现的跳表其特点是不仅有前向指针,也存在后向指针,而且在前向指针的结构中存在span跨度字段,这个跨度字段的出现有助于快速计算元素在整个集合中的排名。

//定义跳表的基本数据节点
typedef struct zskiplistNode {
    robj *obj; // zset value
    double score;// zset score
    struct zskiplistNode *backward;//后向指针
    struct zskiplistLevel {//前向指针
        struct zskiplistNode *forward;
        unsigned int span;
    } level[];
} zskiplistNode;

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

//有序集数据结构
typedef struct zset {
    dict *dict;//字典存放value,以value为key
    zskiplist *zsl;
} zset;

将如上数据结构转化成更形式化的图形表示,如下图所示:

在这里插入图片描述

在上图中,可以看到header指针指向的是一个具有固定层级(32层)的表头节点,定义成32层理论上对于232-1个元素的查询最优,而232=4294967296个元素,对于绝大多数的应用来说,已经足够。

Redis中有序集另一个值得注意的地方就是当Score相同的时候是如何存储的?当集合中两个值的Score相同,这时在跳表中存储会比较这两个值,对这两个值按字典排序存储在跳表结构中。

Redis对zskiplist/zskiplistNode的相关操作,源码如下所示

//创建跳表结构的源码

//#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;
    //分配内存
    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;//默认层级为1
    zsl->length = 0;//跳表长度设置为0
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        //因为没有任何元素,将表头节点的前向指针均设置为0
        zsl->header->level[j].forward = NULL;
        //将表头节点前向指针结构中的跨度字段均设为0
        zsl->header->level[j].span = 0;
    }
    //表头后向指针设置成0
    zsl->header->backward = NULL;
    //表尾节点设置成NULL
    zsl->tail = NULL;
    return zsl;
}

上述代码调用了zslCreateNode这个函数,函数的源码如下:

zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
    zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->obj = obj;
    return zn;
}

执行完上述代码之后会创建如下图所示的跳表结构:

在这里插入图片描述
创建了跳表的基本结构,接下来就是插入操作,Redis中源码如下所示:

zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; //update[32]
    unsigned int rank[ZSKIPLIST_MAXLEVEL];//rank[32]
    int i, level;
    redisAssert(!isnan(score));
    x = zsl->header;
    //寻找元素插入的位置 
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
            (x->level[i].forward->score < score || //以下是得分相同的情况下,比较value的字典排序
                (x->level[i].forward->score == score &&compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    //产生随机层数
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        //记录最大层数
        zsl->level = level;
    }
    //产生跳表节点
    x = zslCreateNode(level,score,obj);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;
        //更新跨度
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }
    //此种情况只会出现在随机出来的层数小于最大层数时
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}

上述源码中,有一个产生随机层数的函数,源代码如下所示:

int zslRandomLevel(void) {
    int level = 1;
    //#define ZSKIPLIST_P 0.25 
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    //#ZSKIPLIST_MAXLEVEL 32
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

图形化的形式描述如下图所示:

在这里插入图片描述
理解了插入操作,其他查询,删除,求范围操作基本上类似。


有序集合使用场景

有序集合的使用场景与集合类似,但是集合不是自动有序的,而有序集合可以利用分数进行成员间的排序,而且是插入时就排序好。所以当需要一个有序且不重复的集合列表时,就可以选择有序集合数据结构作为选择方案。例如,获得热门帖子、话题等排行数据信息。


操作命令介绍

  • zadd key score member:添加元素到集合,元素在集合中存在则更新对应score;
  • zrem key member:删除指定元素,1表示成功,如果元素不存在返回0;
  • zincrby key incr member :按照incr幅度增加对应member的score值,返回score值;
  • zrank key member:返回指定元素在集合中的排名(下标),集合中元素是按score从小到大排序的;
  • zrevrank key member:同上,但是集合中元素是按score从大到小排序;
  • zrange key start end:类似lrange操作是从集合中去指定区间的元素。返回的是有序的结果;
  • zrevrange key start end:同上,返回结果是按score逆序的;
  • zcard key:返回集合中元素个数;
  • zscore key element:返回给定元素对应的score;
  • zremrangebyrank key min max:删除集合中排名在给定区间的元素(权值从小到大排序)。

使用案例

我们可以利用有序集合实现获取最热门话题前10信息

首先,做一个有序集合排序集合,里面只保留10个元素信息,该10个元素是评论量最高的话题。每个话题被评论的时候,都有机会进入该集合里面,但是只有评论量高的前10个话题会存在于该集合,评论量低的就被删除。

创建一个有序集合排序集合hotTopic的key,内部有10个元素

127.0.0.1:6379> zrevrange hotTopic 0 100
(empty list or set)
127.0.0.1:6379> zadd hotTopic 125 topicMsg1
(integer) 1
127.0.0.1:6379> zadd hotTopic 128 topicMsg3
(integer) 1
127.0.0.1:6379> zadd hotTopic 155 topicMsg4
(integer) 1
127.0.0.1:6379> zadd hotTopic 15 topicMsg40
(integer) 1
127.0.0.1:6379> zadd hotTopic 58 topicMsg14
(integer) 1
127.0.0.1:6379> zadd hotTopic 45 topicMsg20
(integer) 1
127.0.0.1:6379> zadd hotTopic 195 topicMsg48
(integer) 1
127.0.0.1:6379> zadd hotTopic 1055 topicMsg69
(integer) 1
127.0.0.1:6379> zadd hotTopic 205 topicMsg39
(integer) 1
127.0.0.1:6379> zadd hotTopic 605 topicMsg99
(integer) 1
127.0.0.1:6379>

然后第11个话题的信息加入集合:

127.0.0.1:6379> zadd hotTopic 100 topicMsg100
(integer) 1
127.0.0.1:6379>

按照权值从大到小逆序排序显示一下数据

127.0.0.1:6379>  zrevrange hotTopic 0 100
 1) "topicMsg69"
 2) "topicMsg99"
 3) "topicMsg39"
 4) "topicMsg48"
 5) "topicMsg4"
 6) "topicMsg3"
 7) "topicMsg1"
 8) "topicMsg100"
 9) "topicMsg14"
10) "topicMsg20"
11) "topicMsg40"
127.0.0.1:6379>

要删除回复量最低的数据(从小到大的排序后,删除区间为0到0的元素,也就是0本身)

127.0.0.1:6379> zremrangebyrank hotTopic 0 0
(integer) 1
127.0.0.1:6379> zrevrange hotTopic 0 100
 1) "topicMsg69"
 2) "topicMsg99"
 3) "topicMsg39"
 4) "topicMsg48"
 5) "topicMsg4"
 6) "topicMsg3"
 7) "topicMsg1"
 8) "topicMsg100"
 9) "topicMsg14"
10) "topicMsg20"
127.0.0.1:6379>

可以看到权值最小的topicMsg40被删除了。

想查看某一个权值数据的排名(zrank从小到大,zrevrank从大到小),使用zrank指令

127.0.0.1:6379> zrevrank hotTopic topicMsg99
(integer) 1
127.0.0.1:6379> zrank hotTopic topicMsg99
(integer) 8
127.0.0.1:6379>

zcard指令可以返回SortSet集合中的元素个数

127.0.0.1:6379> zcard hotTopic
(integer) 10
127.0.0.1:6379>

zincrby指令增加某个权值下的数值,例如,给权值topicMsg100的数据加200

127.0.0.1:6379> zincrby hotTopic 200 topicMsg100
"300"
127.0.0.1:6379> zrevrange hotTopic 0 100
 1) "topicMsg69"
 2) "topicMsg99"
 3) "topicMsg100"
 4) "topicMsg39"
 5) "topicMsg48"
 6) "topicMsg4"
 7) "topicMsg3"
 8) "topicMsg1"
 9) "topicMsg14"
10) "topicMsg20"
127.0.0.1:6379>

如果查看某个权值对应的数据值,使用zscore

127.0.0.1:6379> zrevrange hotTopic 0 100
 1) "topicMsg69"
 2) "topicMsg99"
 3) "topicMsg100"
 4) "topicMsg39"
 5) "topicMsg48"
 6) "topicMsg4"
 7) "topicMsg3"
 8) "topicMsg1"
 9) "topicMsg14"
10) "topicMsg20"
127.0.0.1:6379> zscore hotTopic topicMsg100
"300"
127.0.0.1:6379>

本文小结

本文为大家介绍了Redis有序集合的原理和特征,详细阐述了Redis中跳表的实现过程,最后通过一个案例展示了Redis有序集合的部分应用场景和效果,希望大家能有新的认识和收获。


版权声明:本文为qq_31960623原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。