• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

redis源代码分析20–发布/订阅

mysql 搞代码 4年前 (2022-01-09) 19次浏览 已收录 0个评论

redis的发布/订阅(publish/subscribe)功能类似于传统的消息路由功能,发布者发布消息,订阅者接收消息,沟通发布者和订阅者之间的桥梁是订阅的channel或者pattern。发布者向指定的publish或者pattern发布消息,订阅者阻塞在订阅的channel或者pattern。可以

redis的发布/订阅(publish/subscribe)功能类似于传统的消息路由功能,发布者发布消息,订阅者接收消息,沟通发布者和订阅者之间的桥梁是订阅的channel或者pattern。发布者向指定的publish或者pattern发布消息,订阅者阻塞在订阅的channel或者pattern。可以看到,发布者不会指定哪个订阅者才能接收消息,订阅者也无法只接收特定发布者的消息。这种订阅者和发布者之间的关系是松耦合的,订阅者不知道是谁发布的消息,发布者也不知道谁会接收消息。

redis的发布/订阅功能主要通过SUBSCRIBE、UNSUBSCRIBE、PSUBSCRIBE、PUNSUBSCRIBE 、PUBLISH五个命令来表现。其中SUBSCRIBE、UNSUBSCRIBE用于订阅或者取消订阅channel,而PSUBSCRIBE、PUNSUBSCRIBE用于订阅或者取消订阅pattern,发布消息则通过publish命令。

对于发布/订阅功能的实现,我们先来看看几个与此相关的结构。

struct redisServer {    ---   /* Pubsub */   dict *pubsub_channels;/* Map channels to list of subscribed clients */   list *pubsub_patterns;/* A list of pubsub_patterns */   ---}typedef struct redisClient {   ---   dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */   list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */} redisClient;

在redis的全局server变量(redisServer类型)中,channel和订阅者之间的关系用字典pubsub_channels来保存,特定channel和所有订阅者组成的链表构成pubsub_channels字典中的一项,即字典中的每一项可表示为(channel,订阅者链表);pattern和订阅者之间的关系用链表pubsub_patterns来保存,链表中的每一项可表示成(pattern,redisClient)组成的字典。

在特定订阅者redisClient的结构中,pubsub_channels保存着它所订阅的channel的字典,而订阅的模式则保存在链表pubsub_patterns中。

从上面的解释,我们再来看看订阅/发布命令的最坏时间复杂度(注意字典增删查改一项的复杂度为O(1),而链表的查删复杂度为O(N),从链表尾部增加一项的复杂度为O(1))。

SUBSCRIBE:

订阅者用SUBSCRIBE订阅特定channel,这需要在订阅者的redisClient结构中的pubsub_channels增加一项(复杂度为 O(1)),然后在redisServer 的pubsub_channels找到该channel(复杂度为本文来源gaodaima#com搞(代@码$网6O(1)),并在该channel的订阅者链表的尾部增加一项(复杂度为O(1),注意,如果pubsub_channels中没找到该channel,则插入的复杂度也同为O(1)),因此订阅者用SUBSCRIBE订阅特定 channel的最坏时间复杂度为O(1)。

UNSUBSCRIBE:

订阅者取消订阅时,需要先从订阅者的redisClient结构中的pubsub_channels删除一项(复杂度为O(1)),然后在 redisServer 的pubsub_channels找到该channel(复杂度为O(1)),然后在channel的订阅者链表中删除该订阅者(复杂度为O(1)),因此总的复杂度为O(N),N为特定channel的订阅者数。

PSUBSCRIBE:

订阅者用PSUBSCRIBE订阅pattern时,需要先在redisClient结构中的pubsub_patterns先查找是否已存在该 pattern(复杂度为O(N)),并在不存在的情况下往redisClient结构中的pubsub_patterns和redisServer结构中的pubsub_patterns链表尾部各增加一项(复杂度都为O(1)),因此,总的复杂度为O(N),其中N为订阅者已订阅的模式。

PUNSUBSCRIBE:

订阅者用PUNSUBSCRIBE取消对pattern的订阅时,需要先在redisClient结构中的pubsub_patterns链表中删除该 pattern(复杂度为O(N)),并在redisServer结构中的pubsub_patterns链表中删除订阅者和pattern组成的映射(复杂度为O(M),因此,总的复杂度为O(N+M),其中N为订阅者已订阅的模式,而M为系统中所有订阅者和所有pattern组成的映射数。

PUBLISH:

发布消息时,只会向特定channel发布,但该channel可能会匹配某个pattern。因此,需要先在redisServer结构中的 pubsub_channels找到该channel的订阅者链表(O(1)),然后发送给所有订阅者(复杂度为O(N)),然后查看 redisServer结构中的pubsub_patterns链表中的所有项,看channel是否和该项中的pattern匹配(复杂度为O(M))(注意,这并不包括模式匹配的复杂度),因此,总的复杂度为O(N+M),。其中N为该channel的订阅者数,而M为系统中所有订阅者和所有 pattern组成的映射数。另外,从这也可以看出,一个订阅者是可能多次收到同一个消息的。

解释了发布/订阅的算法后,其代码就好理解了,这里仅给出PUBLISH命令的处理函数publishCommand的代码,更多相关命令的代码请参看redis的源代码。

static void publishCommand(redisClient *c) {    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);    addReplyLongLong(c,receivers);}/* Publish a message */static int pubsubPublishMessage(robj *channel, robj *message) {    int receivers = 0;    struct dictEntry *de;    listNode *ln;    listIter li;    /* Send to clients listening for that channel */    de = dictFind(server.pubsub_channels,channel);    if (de) {        list *list = dictGetEntryVal(de);        listNode *ln;        listIter li;        listRewind(list,&li);        while ((ln = listNext(&li)) != NULL) {            redisClient *c = ln->value;            addReply(c,shared.mbulk3);            addReply(c,shared.messagebulk);            addReplyBulk(c,channel);            addReplyBulk(c,message);            receivers++;        }    }    /* Send to clients listening to matching channels */    if (listLength(server.pubsub_patterns)) {        listRewind(server.pubsub_patterns,&li);        channel = getDecodedObject(channel);        while ((ln = listNext(&li)) != NULL) {            pubsubPattern *pat = ln->value;            if (stringmatchlen((char*)pat->pattern->ptr,                                sdslen(pat->pattern->ptr),                                (char*)channel->ptr,                                sdslen(channel->ptr),0)) {                addReply(pat->client,shared.mbulk4);                addReply(pat->client,shared.pmessagebulk);                addReplyBulk(pat->client,pat->pattern);                addReplyBulk(pat->client,channel);                addReplyBulk(pat->client,message);                receivers++;            }        }        decrRefCount(channel);    }    return receivers;}

最后提醒一下,处于发布/订阅模式的client,是无法发布上述五种命令之外的命令(quit除外),这是在processCommand函数中检查的,可以参看前面命令处理章节对该函数的解释。


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:redis源代码分析20–发布/订阅

喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址