本来今天想写点别的,但是心想之前一篇既然已经提到持久化这边了,而且也说了之后会讲到,索性,今天就说说这个。
所谓持久化,就是将内存中的内容同步到磁盘当中,redis提供了两种持久化机制:aof和rdb。今天的主角是aof。
aof持久化将被执行的命令写到AOF的末尾,以此来记录数据发生的变化,它一共有三个配置选项,在redis.conf这个配置文件中,具体如下:
# # If unsure, use "everysec". # appendfsync always appendfsync everysec # appendfsync no
其意义也比较明显了,就不再多说了。
redis开启aof的开关代码如下:
/* Called when the user switches from "appendonly no" to "appendonly yes" * at runtime using the CONFIG command. */ int startAppendOnly(void) { char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ int newfd; newfd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644); serverAssert(server.aof_state == AOF_OFF); if (newfd == -1) { char *cwdp = getcwd(cwd,MAXPATHLEN); serverLog(LL_WARNING, "Redis needs to enable the AOF but can't open the " "append only file %s (in server root dir %s): %s", server.aof_filename, cwdp ? cwdp : "unknown", strerror(errno)); return C_ERR; } if (server.rdb_child_pid != -1) { server.aof_rewrite_scheduled = 1; serverLog(LL_WARNING,"AOF was enabled but there is already a child process saving an RDB file on disk. An AOF background was scheduled to start when possible."); } else { /* If there is a pending AOF rewrite, we need to switch it off and * start a new one: the old one cannot be reused becuase it is not * accumulating the AOF buffer. */ if (server.aof_child_pid != -1) { serverLog(LL_WARNING,"AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now."); killAppendOnlyChild(); } if (rewriteAppendOnlyFileBackground() == C_ERR) { close(newfd); serverLog(LL_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error."); return C_ERR; } } /* We correctly switched on AOF, now wait for the rewrite to be complete * in order to append data on disk. */ server.aof_state = AOF_WAIT_REWRITE; server.aof_last_fsync = server.unixtime; server.aof_fd = newfd; return C_OK; }
aof写的代码如下:
ssize_t aofWrite(int fd, const char *buf, size_t len) { ssize_t nwritten = 0, totwritten = 0; while(len) { nwritten = write(fd, buf, len); if (nwritten < 0) { if (errno == EINTR) { continue; } return totwritten ? totwritten : -1; } len -= nwritten; buf += nwritten; totwritten += nwritten; } return totwritten; }
将aof的缓冲区(将aof选项置为 everysec的时候,中间会有缓冲),写入磁盘的代码如下,其中需要判断后台是否有fsync正在执行(如果正在执行,会阻塞write调用),如果有,则会延迟,但是如果force参数被设置的话,就啥都不管不顾了,直接开整~
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */ void flushAppendOnlyFile(int force) { ssize_t nwritten; int sync_in_progress = 0; mstime_t latency; if (sdslen(server.aof_buf) == 0) return; if (server.aof_fsync == AOF_FSYNC_EVERYSEC) sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;//bio有讲过 if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { /* With this append fsync policy we do background fsyncing. * If the fsync is still in progress we can try to delay * the write for a couple of seconds. */ if (sync_in_progress) { if (server.aof_flush_postponed_start == 0) { /* No previous write postponing, remember that we are * postponing the flush and return. */ server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start < 2) { /* We were already waiting for fsync to finish, but for less * than two seconds this is still ok. Postpone again. */ return; } /* Otherwise fall trough, and go write since we can't wait * over two seconds. */ server.aof_delayed_fsync++; serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); } } /* We want to perform a single write. This should be guaranteed atomic * at least if the filesystem we are writing is a real physical one. * While this will save us against the server being killed I don't think * there is much to do about the whole server stopping for power problems * or alike */ latencyStartMonitor(latency); nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); latencyEndMonitor(latency); /* We want to capture different events for delayed writes: * when the delay happens with a pending fsync, or with a saving child * active, and when the above two conditions are missing. * We also use an additional event name to save all samples which is * useful for graphing / monitoring purposes. */ if (sync_in_progress) { latencyAddSampleIfNeeded("aof-write-pending-fsync",latency); } else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) { latencyAddSampleIfNeeded("aof-write-active-child",latency); } else { latencyAddSampleIfNeeded("aof-write-alone",latency); } latencyAddSampleIfNeeded("aof-write",latency); /* We performed the write so reset the postponed flush sentinel to zero. */ server.aof_flush_postponed_start = 0; if (nwritten != (ssize_t)sdslen(server.aof_buf)) { static time_t last_write_error_log = 0; int can_log = 0; /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */ if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) { can_log = 1; last_write_error_log = server.unixtime; } /* Log the AOF write error and record the error code. */ if (nwritten == -1) { if (can_log) { serverLog(LL_WARNING,"Error writing to the AOF file: %s", strerror(errno)); server.aof_last_write_errno = errno; } } else { if (can_log) { serverLog(LL_WARNING,"Short write while writing to " "the AOF file: (nwritten=%lld, " "expected=%lld)", (long long)nwritten, (long long)sdslen(server.aof_buf)); } if (ftruncate(server.aof_fd, server.aof_current_size) == -1) { if (can_log) { serverLog(LL_WARNING, "Could not remove short write " "from the append-only file. Redis may refuse " "to load the AOF the next time it starts. " "ftruncate: %s", strerror(errno)); } } else { /* If the ftruncate() succeeded we can set nwritten to * -1 since there is no longer partial data into the AOF. */ nwritten = -1; } server.aof_last_write_errno = ENOSPC; } /* Handle the AOF write error. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* We can't recover when the fsync policy is ALWAYS since the * reply for the client is already in the output buffers, and we * have the contract with the user that on acknowledged write data * is synced on disk. */ serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting..."); exit(1); } else { /* Recover from failed write leaving data into the buffer. However * set an error to stop accepting writes as long as the error * condition is not cleared. */ server.aof_last_write_status = C_ERR; /* Trim the sds buffer if there was a partial write, and there * was no way to undo it with ftruncate(2). */ if (nwritten > 0) { server.aof_current_size += nwritten; sdsrange(server.aof_buf,nwritten,-1); } return; /* We'll try again on the next call... */ } } else { /* Successful write(2). If AOF was in error state, restore the * OK state and log the event. */ if (server.aof_last_write_status == C_ERR) { serverLog(LL_WARNING, "AOF write error looks solved, Redis can write again."); server.aof_last_write_status = C_OK; } } server.aof_current_size += nwritten; /* Re-use AOF buffer when it is small enough. The maximum comes from the * arena size of 4k minus some overhead (but is otherwise arbitrary). */ if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) { sdsclear(server.aof_buf); } else { sdsfree(server.aof_buf); server.aof_buf = sdsempty(); } /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are * children doing I/O in the background. */ if (server.aof_no_fsync_on_rewrite && (server.aof_child_pid != -1 || server.rdb_child_pid != -1)) return; /* Perform the fsync if needed. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* aof_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ latencyStartMonitor(latency); aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */ latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-fsync-always",latency); server.aof_last_fsync = server.unixtime; } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { if (!sync_in_progress) aof_background_fsync(server.aof_fd); server.aof_last_fsync = server.unixtime; } }
另一个需要注意的点就是BGREWRITEAOF命令会重写AOF文件,使AOF文件尽可能的小,其中的大部分操作都是尽可能的使用占用空间小的内存类型,在此不再赘述了。在此期间的命令缓存是通过如下机制实现的。
1. 使用多个缓存block而非一整块大缓存,每个block10M,如下所示
#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10) /* 10 MB per block */ typedef struct aofrwblock { unsigned long used, free; char buf[AOF_RW_BUF_BLOCK_SIZE]; } aofrwblock;
2. 向缓存中写数据的时候是先找到当前链表的最后一个元素,若满足,直接写,若不满足,填上空缺,重新建,继续写
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { listNode *ln = listLast(server.aof_rewrite_buf_blocks); aofrwblock *block = ln ? ln->value : NULL; while(len) { /* If we already got at least an allocated block, try appending * at least some piece into it. */ if (block) { unsigned long thislen = (block->free < len) ? block-