前言
scan是redis扫描数据库的命令,相比于keys命令来说有更好的性能,本来scan应该放到dict那一篇取讲的,但是之前忘了就新开一篇文章来分析一下scan命令的源码实现
正文
scan命令是用于迭代数据库中的key,是基于游标的迭代命令,第一次迭代的时候游标指定为0,在每次一迭代后会返回新的游标,下一次使用scan命令的时候需要使用新游标。
SCAN cursor [MATCH pattern] [COUNT count] [Type typename]
复制代码
- cursor 游标
- pattern 匹配模式
- count 返回数量
- typename 指定扫描的value的类型
类似于scan命令的还有:
- SSCAN 命令用于迭代集合键中的元素。
- HSCAN 命令用于迭代哈希键中的键值对。
- ZSCAN 命令用于迭代有序集合中的元素(包括元素成员和元素score)
这些命令区别就在于scan命令是作用于整个db,而这些命令是需要通过key查询出对应的数据结构然后进行迭代。
void scanCommand(client *c) {
unsigned long cursor;
if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
scanGenericCommand(c,NULL,cursor);
}
void zscanCommand(client *c) {
robj *o;
unsigned long cursor;
if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
checkType(c,o,OBJ_ZSET)) return;
scanGenericCommand(c,o,cursor);
}
复制代码
可以通过源码看出来,zscan需要通过key获取对象,然后判断是否为ZSET类型,最终都是调用scanGenericCommand方法
void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
int i, j;
list *keys = listCreate();
listNode *node, *nextnode;
long count = 10;
sds pat = NULL;
sds typename = NULL;
int patlen = 0, use_pattern = 0;
dict *ht;
serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH ||
o->type == OBJ_ZSET);
//如果o是null,表示整个db 所以不用指定key 参数索引有所变化
i = (o == NULL) ? 2 : 3;
/* Step 1: Parse options. */
while (i < c->argc) {
j = c->argc - i;
if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
!= C_OK)
{
goto cleanup;
}
if (count < 1) {
addReply(c,shared.syntaxerr);
goto cleanup;
}
i += 2;
} else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
pat = c->argv[i+1]->ptr;
patlen = sdslen(pat);
use_pattern = !(pat[0] == '*' && patlen == 1);
i += 2;
} else if (!strcasecmp(c->argv[i]->ptr, "type") && o == NULL && j >= 2) {
/* SCAN for a particular type only applies to the db dict */
typename = c->argv[i+1]->ptr;
i+= 2;
} else {
addReply(c,shared.syntaxerr);
goto cleanup;
}
}
复制代码
首先构造了list来存放迭代的key,然后是解析参数
ht = NULL;
//没指定对象使用默认db
if (o == NULL) {
ht = c->db->dict;
//使用hash结果的hashset
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
ht = o->ptr;
//迭代hash表和skiplist的count需要乘2 要返回key-val
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
ht = o->ptr;
count *= 2;
} else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
ht = zs->dict;
count *= 2;
}
复制代码
这里是根据传入o的类型来初始化ht,是为了适应zscan、hscan等命令,默认null表示扫描整个db
if (ht) {
void *privdata[2];
//迭代次数为hash的10倍 为了防止hashtable比较稀疏导致了遍历次数太多
long maxiterations = count*10;
privdata[0] = keys;
privdata[1] = o;
do {
cursor = dictScan(ht, cursor, scanCallback, NULL, privdata);
} while (cursor &&
maxiterations-- &&
listLength(keys) < (unsigned long)count);
复制代码
通过dictScan方法扫描hashtable
unsigned long dictScan(dict *d,
unsigned long v,
dictScanFunction *fn,
dictScanBucketFunction* bucketfn,
void *privdata)
{
dictht *t0, *t1;
const dictEntry *de, *next;
unsigned long m0, m1;
if (dictSize(d) == 0) return 0;
d->iterators++;
if (!dictIsRehashing(d)) {
t0 = &(d->ht[0]);
m0 = t0->sizemask;
if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
de = t0->table[v & m0];
while (de) {
next = de->next;
fn(privdata, de);
de = next;
}
/**
* m0 = 2^n - 1 ~m0 = 0
* 这里v应该是不变的
*/
v |= ~m0;
/**
* 将v二进制翻转 然后增加 再次翻转回来
* v = 1000
* 翻转 0001
* 增加 0010
* 翻转 0100
*
* 高位+1
*/
v = rev(v);
v++;
v = rev(v);
复制代码
hashtable的扫描分为两种情况,第一种是没有rehash的情况。通过游标获取需要扫描的槽,然后将一个槽下所有的数据添加进list。
注意游标的跟新方式,游标的跟新是每一次高位+1,顺序是:
二进制:000 100 010 110 001 101 111 十进制:0 4 2 6 1 5 7
采用高位进一而不是普通顺序是因为:hashtable的扩容总是以2n2^n2n扩容的,如果原始的hashtable有4个槽,如下分布:
00 10 01 11
扩容到8个槽以后,就会变成这样:
000 100 010 110 001 101 011 111
原来的数据会分割一部分到高位上,如果我们从高位增加1,那么扩容的时候不会发生重复遍历,但是缩容的时候有可能会发生
else {
t0 = &d->ht[0];
t1 = &d->ht[1];
//确保t0是小表
if (t0->size > t1->size) {
t0 = &d->ht[1];
t1 = &d->ht[0];
}
m0 = t0->sizemask;
m1 = t1->sizemask;
if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
de = t0->table[v & m0];
while (de) {
next = de->next;
fn(privdata, de);
de = next;
}
do {
if (bucketfn) bucketfn(privdata, &t1->table[v & m1]);
de = t1->table[v & m1];
while (de) {
next = de->next;
fn(privdata, de);
de = next;
}
/* Increment the reverse cursor not covered by the smaller mask.*/
v |= ~m1;
v = rev(v);
v++;
v = rev(v);
/**
* 由于m1是大表 所以在进行 v |= ~m1的时候 v最高位会多出0
* 在进行rev(v) 取反再增加就会出现不一致
* 而 (m0 ^ m1) & v 是判断v的高位是否为1
* 00100 传进来的游标,第一次遍历的桶
* 10100 第二次遍历的桶
* 01100 第三次遍历的桶
* 11100 第四次遍历的桶
* 00010 返回的游标
*/
} while (v & (m0 ^ m1));
}
d->iterators--;
return v;
复制代码
发生rehash的时候,会先将原表数据收集,然后收集新表,具体可见注释
else if (o->type == OBJ_SET) {
int pos = 0;
int64_t ll;
while(intsetGet(o->ptr,pos++,&ll))
listAddNodeTail(keys,createStringObjectFromLongLong(ll));
cursor = 0;
} else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) {
unsigned char *p = ziplistIndex(o->ptr,0);
unsigned char *vstr;
unsigned int vlen;
long long vll;
//循环遍历ziplist 放入list
while(p) {
ziplistGet(p,&vstr,&vlen,&vll);
listAddNodeTail(keys,
(vstr != NULL) ? createStringObject((char*)vstr,vlen) :
createStringObjectFromLongLong(vll));
p = ziplistNext(o->ptr,p);
}
cursor = 0;
} else {
serverPanic("Not handled encoding in SCAN.");
}
复制代码
继续接着scanGenericCommand代码,set和ziplist都很容易,单向遍历所有节点,cursor最终还是为0
node = listFirst(keys);
while (node) {
robj *kobj = listNodeValue(node);
nextnode = listNextNode(node);
int filter = 0;
//filter为1的时候过滤该元素 从keys中删除
if (!filter && use_pattern) {
if (sdsEncodedObject(kobj)) {
//正则匹配失败
if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0))
filter = 1;
} else {
char buf[LONG_STR_SIZE];
int len;
serverAssert(kobj->encoding == OBJ_ENCODING_INT);
len = ll2string(buf,sizeof(buf),(long)kobj->ptr);
if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1;
}
}
//过滤不匹配的type
if (!filter && o == NULL && typename){
robj* typecheck = lookupKeyReadWithFlags(c->db, kobj, LOOKUP_NOTOUCH);
char* type = getObjectTypeName(typecheck);
if (strcasecmp((char*) typename, type)) filter = 1;
}
//过滤过期key
if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1;
//删除节点
if (filter) {
decrRefCount(kobj);
listDelNode(keys, node);
}
//因为hash或者zset包含了key-val 所以要跳过其value节点 如果被过滤了 那么删除它
if (o && (o->type == OBJ_ZSET || o->type == OBJ_HASH)) {
node = nextnode;
nextnode = listNextNode(node);
if (filter) {
kobj = listNodeValue(node);
decrRefCount(kobj);
listDelNode(keys, node);
}
}
node = nextnode;
}
//返回客户端
addReplyArrayLen(c, 2);
addReplyBulkLongLong(c,cursor);
addReplyArrayLen(c, listLength(keys));
while ((node = listFirst(keys)) != NULL) {
robj *kobj = listNodeValue(node);
addReplyBulk(c, kobj);
decrRefCount(kobj);
listDelNode(keys, node);
}
复制代码
最后一步就是根据match,type对数据进行筛选,将不符合条件的数据filter标记为true,删除节点后最终剩余的数据返回客户端。
总结
cursor反应的是当前hashtable的下标,使用高位增加的方式避免了扩容后rehash数据位置重新分配,导致下一次scan命令会出现重复key,但是缩容可能会出现重复key,所以不要太依赖scan命令。