在block操作底层的文件中,第一句话就是
/* blocked.c - generic support for blocking operations like BLPOP & WAIT. */
这边是提供了一些api,供外界调用
1. getTimeoutFromObjectOrReply(),在object当中获取timeout参数,具体如下
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) { long long tval; if (getLongLongFromObjectOrReply(c,object,&tval, "timeout is not an integer or out of range") != C_OK) return C_ERR; if (tval < 0) { addReplyError(c,"timeout is negative"); return C_ERR; } if (tval > 0) { if (unit == UNIT_SECONDS) tval *= 1000; tval += mstime(); } *timeout = tval; return C_OK; }
2. blockClient(), 将一个client的CLIENT_BLOCKED位置位,并设置具体阻塞的类型
void blockClient(client *c, int btype) { c->flags |= CLIENT_BLOCKED; c->btype = btype; server.bpop_blocked_clients++; }
3. processUnblockedClients(),当一个客户端被解出阻塞之后,在event loop当中的beforeSleep()函数中会被调用,用于处理这些client的input buffer
void processUnblockedClients(void) { listNode *ln; client *c; while (listLength(server.unblocked_clients)) { ln = listFirst(server.unblocked_clients); serverAssert(ln != NULL); c = ln->value; listDelNode(server.unblocked_clients,ln); c->flags &= ~CLIENT_UNBLOCKED; /* Process remaining data in the input buffer, unless the client * is blocked again. Actually processInputBuffer() checks that the * client is not blocked before to proceed, but things may change and * the code is conceptually more correct this way. */ if (!(c->flags & CLIENT_BLOCKED)) { if (c->querybuf && sdslen(c->querybuf) > 0) { processInputBuffer(c); } } } }
4. unblockClient(), 根据当前client被阻塞的类型进行unblock,
void unblockClient(client *c) { if (c->btype == BLOCKED_LIST) { unblockClientWaitingData(c); } else if (c->btype == BLOCKED_WAIT) { unblockClientWaitingReplicas(c); } else if (c->btype == BLOCKED_MODULE) { unblockClientFromModule(c); } else { serverPanic("Unknown btype in unblockClient()."); } /* Clear the flags, and put the client in the unblocked list so that * we'll process new commands in its query buffer ASAP. */ c->flags &= ~CLIENT_BLOCKED; c->btype = BLOCKED_NONE; server.bpop_blocked_clients--; /* The client may already be into the unblocked list because of a previous * blocking operation, don't add back it into the list multiple times. */ if (!(c->flags & CLIENT_UNBLOCKED)) { c->flags |= CLIENT_UNBLOCKED; listAddNodeTail(server.unblocked_clients,c); } }
5. replyToBlockedClientTimedOut(),当被阻塞的client到达time out时间后,给其发送响应
void replyToBlockedClientTimedOut(client *c) { if (c->btype == BLOCKED_LIST) { addReply(c,shared.nullmultibulk); } else if (c->btype == BLOCKED_WAIT) { addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); } else if (c->btype == BLOCKED_MODULE) { moduleBlockedClientTimedOut(c); } else { serverPanic("Unknown btype in replyToBlockedClientTimedOut()."); } }
6. disconnectAllBlockedClients(), 这个函数只有在一个节点由主节点变为从节点,的时候,会给客户端发送一个unblocked error,并且断开与客户端的连接
void disconnectAllBlockedClients(void) { listNode *ln; listIter li; listRewind(server.clients,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); if (c->flags & CLIENT_BLOCKED) { addReplySds(c,sdsnew( "-UNBLOCKED force unblock from blocking operation, " "instance state changed (master -> slave?)\r\n")); unblockClient(c); c->flags |= CLIENT_CLOSE_AFTER_REPLY; } } }