在链表模型中,编码基本都是基于quicklist的了,不信?来一段api代码 掌掌眼
void listTypePush(robj *subject, robj *value, int where) { if (subject->encoding == OBJ_ENCODING_QUICKLIST) { int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL; value = getDecodedObject(value);// 转换成未编码时的sds字符串 size_t len = sdslen(value->ptr); quicklistPush(subject->ptr, value->ptr, len, pos); decrRefCount(value); } else { serverPanic("Unknown list encoding"); } }
再比如pop和length函数,也是使用的OBJ_ENCODING_QUICKLIST编码
robj *listTypePop(robj *subject, int where) { long long vlong; robj *value = NULL; int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL; if (subject->encoding == OBJ_ENCODING_QUICKLIST) { if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value, NULL, &vlong, listPopSaver)) { if (!value) value = createStringObjectFromLongLong(vlong); } } else { serverPanic("Unknown list encoding"); } return value; } unsigned long listTypeLength(const robj *subject) { if (subject->encoding == OBJ_ENCODING_QUICKLIST) { return quicklistCount(subject->ptr); } else { serverPanic("Unknown list encoding"); } }
其余对外的api函数均是这样的,底层实现以quicklist为主
list对于数据库的操作代码,逻辑上跟string类型差不多:
1)先在client结构中找到对应的参数;
2)判断参数的合法性;
3)操作list对象,发送事件
这里先给出push的通常实现
void pushGenericCommand(client *c, int where) { int j, pushed = 0; robj *lobj = lookupKeyWrite(c->db,c->argv[1]); if (lobj && lobj->type != OBJ_LIST) { addReply(c,shared.wrongtypeerr); return; } for (j = 2; j < c->argc; j++) { if (!lobj) { lobj = createQuicklistObject(); quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size, server.list_compress_depth); dbAdd(c->db,c->argv[1],lobj); } listTypePush(lobj,c->argv[j],where); pushed++; } addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0)); if (pushed) { char *event = (where == LIST_HEAD) ? "lpush" : "rpush"; signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id); } server.dirty += pushed; }
对于x类的命令,它的逻辑在判断为空或者编码非OBJ_LIST时直接返回,并不往下再走了,代码就不贴出来了~
之后有点意思的代码就是rpoplpush了,它不是直接pop,然后push,而是保存pop的值,增加引用计数,push成功后,减引用计数,这属于历史遗留问题了,之前的版本需要保护客户端的命令参数向量~
void rpoplpushCommand(client *c) { robj *sobj, *value; if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,sobj,OBJ_LIST)) return; if (listTypeLength(sobj) == 0) { /* This may only happen after loading very old RDB files. Recent * versions of Redis delete keys of empty lists. */ addReply(c,shared.nullbulk); } else { robj *dobj = lookupKeyWrite(c->db,c->argv[2]); robj *touchedkey = c->argv[1]; if (dobj && checkType(c,dobj,OBJ_LIST)) return; value = listTypePop(sobj,LIST_TAIL); /* We saved touched key, and protect it, since rpoplpushHandlePush * may change the client command argument vector (it does not * currently). */ incrRefCount(touchedkey); rpoplpushHandlePush(c,c->argv[2],dobj,value); /* listTypePop returns an object with its refcount incremented */ decrRefCount(value); /* Delete the source list when it is empty */ notifyKeyspaceEvent(NOTIFY_LIST,"rpop",touchedkey,c->db->id); if (listTypeLength(sobj) == 0) { dbDelete(c->db,touchedkey); notifyKeyspaceEvent(NOTIFY_GENERIC,"del", touchedkey,c->db->id); } signalModifiedKey(c->db,touchedkey); decrRefCount(touchedkey); server.dirty++; } }
再之后就是BLPOP等阻塞操作也需要说一下:
1)如果链表存在且不为空,则正常pop
2)如果链表不存在或者为空,需要先将请求的客户端挂起,不进行服务,并将所有阻塞在某个key的客户端放进一个链表里,直到进来新的数据,会依次被serve,代码如下:
void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) { dictEntry *de; list *l; int j; c->bpop.timeout = timeout; c->bpop.target = target; if (target != NULL) incrRefCount(target); for (j = 0; j < numkeys; j++) { /* If the key already exists in the dict ignore it. */ if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ de = dictFind(c->db->blocking_keys,keys[j]); if (de == NULL) { int retval; /* For every key we take a list of clients blocked for it */ l = listCreate(); retval = dictAdd(c->db->blocking_keys,keys[j],l); incrRefCount(keys[j]); serverAssertWithInfo(c,keys[j],retval == DICT_OK); } else { l = dictGetVal(de); } listAddNodeTail(l,c); } blockClient(c,BLOCKED_LIST); }
解除阻塞操作如下:
void unblockClientWaitingData(client *c) { dictEntry *de; dictIterator *di; list *l; serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0); di = dictGetIterator(c->bpop.keys); /* The client may wait for multiple keys, so unblock it for every key. */ while((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); /* Remove this client from the list of clients waiting for this key. */ l = dictFetchValue(c->db->blocking_keys,key); serverAssertWithInfo(c,key,l != NULL); listDelNode(l,listSearchKey(l,c)); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) dictDelete(c->db->blocking_keys,key); } dictReleaseIterator(di); /* Cleanup the client structure */ dictEmpty(c->bpop.keys,NULL); if (c->bpop.target) { decrRefCount(c->bpop.target); c->bpop.target = NULL; } }
发送ready信息代码如下:
void signalListAsReady(redisDb *db, robj *key) { readyList *rl; /* No clients blocking for this key? No need to queue it. */ if (dictFind(db->blocking_keys,key) == NULL) return; /* Key was already signaled? No need to queue it again. */ if (dictFind(db->ready_keys,key) != NULL) return; /* Ok, we need to queue this key into server.ready_keys. */ rl = zmalloc(sizeof(*rl)); rl->key = key; rl->db = db; incrRefCount(key); listAddNodeTail(server.ready_keys,rl); /* We also add the key in the db->ready_keys dictionary in order * to avoid adding it multiple times into a list with a simple O(1) * check. */ incrRefCount(key); serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK); }