本篇主要讲的是 AOF 持久化,了解 AOF 的数据组织方式和运作机制。redis 主要在 aof.c 中实现 AOF 的操作。 数据结构 rio redis AOF 持久化同样借助了 struct rio. 详细内容在《深入剖析 redis RDB 持久化策略》中有介绍。 AOF 数据组织方式 假设 redis 内存
本篇主要讲的是 AOF 持久化,了解 AOF 的数据组织方式和运作机制。redis 主要在 aof.c 中实现 AOF 的操作。
数据结构 rio
redis AOF 持久化同样借助了 struct rio. 详细内容在《深入剖析 redis RDB 持久化策略》中有介绍。
AOF 数据组织方式
假设 redis 内存有「name:Jhon」的键值对,那么进行 AOF 持久化后,AOF 文件有如下内容:
*2 # 2个参数$6 # 第一个参数长度为 6SELECT # 第一个参数$1 # 第二参数长度为 18 # 第二参数*3 # 3个参数$3 # 第一个参数长度为 4SET # 第一个参数$4 # 第二参数长度为 4name # 第二个参数$4 # 第三个参数长度为 4Jhon # 第二参数长度为 4
所以对上面的内容进行恢复,能得到熟悉的一条 redis 命令:SELECT 8;SET name Jhon.
可以想象的是,redis 遍历内存数据集中的每个 key-value 对,依次写入磁盘中;redis 启动的时候,从 AOF 文件中读取数据,恢复数据。
AOF 持久化运作机制
和 redis RDB 持久化运作机制不同,redis AOF 有后台执行和边服务边备份两种方式。
1)AOF 后台执行的方式和 RDB 有类似的地方,fork 一个子进程,主进程仍进行服务,子进程执行 AOF 持久化,数据被 dump 到磁盘上。与 RDB 不同的是,后台子进程持久化过程中,主进程会记录期间的所有数据变更(主进程还在服务),并存储在 server.aof_rewrite_buf_blocks 中;后台子进程结束后,redis 更新缓存追加到 AOF 文件中,是 RDB 持久化所不具备的。
来说说更新缓存这个东西。redis 服务器产生数据变更的时候,譬如 set name Jhon,不仅仅会修改内存数据集,也会记录此更新(修改)操作,记录的方式就是上面所说的数据组织方式。
更新缓存可以存储在 server.aof_buf 中,你可以把它理解为一个小型临时中转站,所有累积的更新缓存都会先放入这里,它会在特定时机写入文件或者插入到 server.aof_rewrite_buf_blocks 下链表(下面会详述);server.aof_buf 中的数据在 propagrate() 添加,在涉及数据更新的地方都会调用 propagrate() 以累积变更。更新缓存也可以存储在 server.aof_rewrite_buf_blocks,这是一个元素类型为 struct aofrwblock 的链表,你可以把它理解为一个仓库,当后台有 AOF 子进程的时候,会将累积的更新缓存(在 server.aof_buf 中)插入到链表中,而当 AOF 子进程结束,它会被整个写入到文件。两者是有关联的。
下面是后台执行的主要代码:
// 启动后台子进程,执行 AOF 持久化操作。bgrewriteaofCommand(),startAppendOnly(),serverCron() 中会调用此函数/* This is how rewriting of the append only file in background works:** 1) The user calls BGREWRITEAOF* 2) Redis calls this function, that forks():* 2a) the child rewrite the append only file in a temp file.* 2b) the parent accumulates differences in server.aof_rewrite_buf.* 3) When the child finished '2a' exists.* 4) The parent will trap the exit code, if it's OK, will append the* data accumulated into server.aof_rewrite_buf into the temp file, and* finally will rename(2) the temp file in the actual file name.* The the new file is reopened as the new append only file. Profit!*/int r<div style="color:transparent">本文来源gaodai.ma#com搞##代!^码@网*</div>ewriteAppendOnlyFileBackground(void) { pid_t childpid; long long start; // 已经有正在执行备份的子进程 if (server.aof_child_pid != -1) return REDIS_ERR; start = ustime(); if ((childpid = fork()) == 0) { char tmpfile[256]; // 子进程 /* Child */ // 关闭监听 closeListeningSockets(0); // 设置进程 title redisSetProcTitle("redis-aof-rewrite"); // 临时文件名 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); // 脏数据,其实就是子进程所消耗的内存大小 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) { // 获取脏数据大小 size_t private_dirty = zmalloc_get_private_dirty(); // 记录脏数据 if (private_dirty) { redisLog(REDIS_NOTICE, "AOF rewrite: %zu MB of memory used by copy-on-write", private_dirty/(1024*1024)); } exitFromChild(0); } else { exitFromChild(1); } } else { /* Parent */ server.stat_fork_time = ustime()-start; if (childpid == -1) { redisLog(REDIS_WARNING, "Can't rewrite append only file in background: fork: %s", strerror(errno)); return REDIS_ERR; } redisLog(REDIS_NOTICE, "Background append only file rewriting started by pid %d",childpid); // AOF 已经开始执行,取消 AOF 计划 server.aof_rewrite_scheduled = 0; // AOF 最近一次执行的起始时间 server.aof_rewrite_time_start = time(NULL); // 子进程 ID server.aof_child_pid = childpid; updateDictResizePolicy(); // 因为更新缓存都将写入文件,要强制产生选择数据集的指令 SELECT ,以防出现数据合并错误。 /* We set appendseldb to -1 in order to force the next call to the * feedAppendOnlyFile() to issue a SELECT command, so the differences * accumulated by the parent into server.aof_rewrite_buf will start * with a SELECT statement and it will be safe to merge. */ server.aof_selected_db = -1; replicationScriptCacheFlush(); return REDIS_OK; } return REDIS_OK; /* unreached */}// AOF 持久化主函数。只在 rewriteAppendOnlyFileBackground() 中会调用此函数/* Write a sequence of commands able to fully rebuild the dataset into* "filename". Used both by REWRITEAOF and BGREWRITEAOF.** In order to minimize the number of commands needed in the rewritten* log Redis uses variadic commands when possible, such as RPUSH, SADD* and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time* are inserted using a single command. */int rewriteAppendOnlyFile(char *filename) { dictIterator *di = NULL; dictEntry *de; rio aof; FILE *fp; char tmpfile[256]; int j; long long now = mstime(); /* Note that we have to use a different temp name here compared to the * one used by rewriteAppendOnlyFileBackground() function. */ snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); // 打开文件 fp = fopen(tmpfile,"w"); if (!fp) { redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno)); return REDIS_ERR; } // 初始化 rio 结构体 rioInitWithFile(&aof,fp); // 如果设置了自动备份参数,将进行设置 if (server.aof_rewrite_incremental_fsync) rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES); // 备份每一个数据集 for (j = 0; j dict; if (dictSize(d) == 0) continue; // 获取数据集的迭代器 di = dictGetSafeIterator(d); if (!di) { fclose(fp); return REDIS_ERR; } // 写入 AOF 操作码 /* SELECT the new DB */ if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; // 写入数据集序号 if (rioWriteBulkLongLong(&aof,j) == 0) goto werr; // 写入数据集中每一个数据项 /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { sds keystr; robj key, *o; long long expiretime; keystr = dictGetKey(de); o = dictGetVal(de); // 将 keystr 封装在 robj 里 initStaticStringObject(key,keystr); // 获取过期时间 expiretime = getExpire(db,&key); // 如果已经过期,放弃存储 /* If this key is already expired skip it */ if (expiretime != -1 && expiretime type == REDIS_STRING) { /* Emit a SET command */ char cmd[]="*3\r\n$3\r\nSET\r\n"; if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; /* Key and value */ if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkObject(&aof,o) == 0) goto werr; } else if (o->type == REDIS_LIST) { if (rewriteListObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_SET) { if (rewriteSetObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_ZSET) { if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_HASH) { if (rewriteHashObject(&aof,&key,o) == 0) goto werr; } else { redisPanic("Unknown object type"); } // 写入过期时间 /* Save the expire time */ if (expiretime != -1) { char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr; } } // 释放迭代器 dictReleaseIterator(di); } // 写入磁盘 /* Make sure data will not remain on the OS's output buffers */ fflush(fp); aof_fsync(fileno(fp)); fclose(fp); // 重写文件名 /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ if (rename(tmpfile,filename) == -1) { redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); unlink(tmpfile); return REDIS_ERR; } redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed"); return REDIS_OK;werr: // 清理工作 fclose(fp); unlink(tmpfile); redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); if (di) dictReleaseIterator(di); return REDIS_ERR;}// 后台子进程结束后,redis 更新缓存 server.aof_rewrite_buf_blocks 追加到 AOF 文件中// 在 AOF 持久化结束后会执行这个函数, backgroundRewriteDoneHandler() 主要工作是将 server.aof_rewrite_buf_blocks,即 AOF 缓存写入文件/* A background append only file rewriting (BGREWRITEAOF) terminated its work. * Handle this. */void backgroundRewriteDoneHandler(int exitcode, int bysignal) { ...... // 将 AOF 缓存 server.aof_rewrite_buf_blocks 的 AOF 写入磁盘 if (aofRewriteBufferWrite(newfd) == -1) { redisLog(REDIS_WARNING, "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno)); close(newfd); goto cleanup; } ......}// 将累积的更新缓存 server.aof_rewrite_buf_blocks 同步到磁盘/* Write the buffer (possibly composed of multiple blocks) into the specified* fd. If no short write or any other error happens -1 is returned,* otherwise the number of bytes written is returned. */ssize_t aofRewriteBufferWrite(int fd) { listNode *ln; listIter li; ssize_t count = 0; listRewind(server.aof_rewrite_buf_blocks,&li); while((ln = listNext(&li))) { aofrwblock *block = listNodeValue(ln); ssize_t nwritten; if (block->used) { nwritten = write(fd,block->buf,block->used); if (nwritten != block->used) { if (nwritten == 0) errno = EIO; return -1; } count += nwritten; } } return count;}