redis——AOF持久化

标签: redis

redis除了提供RDB持久化之外还提供AOF持久化(append only file),从名字可以看出,这个文件的写入方式是append,即,追加,追加的内容就是每次客户端输入的写命令,例如:

来看看在输入这一条命令之后生成的appendonly.aof文件:

*2
$6
SELECT
$1
0
*3
$3
SET
$3
msg
$11
hello world

这其中除了我们输入的写命令之外,服务器会自动添加一条选择数据库的命令 SELECT 0,将数据库切换到正在使用的数据库中(因为不同的数据库之间的键空间是私有的、不共享的)

文件中例如 $1 、$3 这样的符号是用来表示字节长度的。


那么现在来看看aof持久化的实现。aof持久化的实现步骤是:命令追加 —> 文件写入 —> 文件同步

命令追加

在redisServer结构中有这样一个SDS对象作为aof的缓冲区:

struct redisServer {
    sds aof_buf;
    //...
};

我们在执行例子中的命令时,服务器会将命令内容先追加到这个缓冲区

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    sds buf = sdsempty();
    robj *tmpargv[3];

    /* 服务器会自动加上一条select db的语句,以确保数据库正确 */
    if (dictid != server.aof_selected_db) {
        char seldb[64];

        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.aof_selected_db = dictid;
    }

    /* 以下就是把不同的命令转换成aof的内容格式 */
    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
       buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setCommand && argc > 3) {
        int i;
        robj *exarg = NULL, *pxarg = NULL;
        buf = catAppendOnlyGenericCommand(buf,3,argv);
        for (i = 3; i < argc; i ++) {
            if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
            if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
        }
        serverAssert(!(exarg && pxarg));
        if (exarg)
            buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
                                               exarg);
        if (pxarg)
            buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
                                               pxarg);
    } else {
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }

    /* 追加到aof_buf中去 */
    if (server.aof_state == AOF_ON)
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

    /* 如果后台只追加文件重写正在进行中,
     * 我们希望在缓冲区中积累子DB和当前DB之间的差异,
     * 以便当子进程执行其工作时,
     * 我们可以将差异追加到新的只追加文件。 
     */
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
    /* 释放空间 */
    sdsfree(buf);
}

文件写入和同步

redis服务器的进程是一个事件循环,每次循环的时候会调用flushAppendOnlyFile方法将aof_buf中的内容写到AOF文件中去。这一步中,服务器配置的appendfsync选项来做不同的同步操作,不同的appendfsync决定了不同的aof持久化的效率和安全性:

配置 说明 效率 安全性
always 将aof_buf中的内容写到AOF文件中
everysec(默认) 将aof_buf中的内容写到AOF文件中;如果上一次同步AOF文件的时间距离当前时间超过1秒,那么再次同步AOF文件(另起线程) 较高 较高
no 将aof_buf中的内容写到AOF文件中;但是并不同步AOF文件,啥时候同步由操作系统决定

与RDB相同,服务器也可以通过载入AOF文件来还原数据库。

载入

以下是载入的流程图:

来逐步看看具体实现:

首先会创建一个伪客户端,以下是伪客户端的结构:

/* 有关客户端的相关属性以及操作这里不详细说 */
struct client *createFakeClient(void) {
    struct client *c = zmalloc(sizeof(*c));

    selectDb(c,0); /* 选择0号数据库 */
    c->fd = -1;
    c->name = NULL;
    c->querybuf = sdsempty();
    c->querybuf_peak = 0;
    c->argc = 0;
    c->argv = NULL;
    c->bufpos = 0;
    c->flags = 0;
    c->btype = BLOCKED_NONE;
    /* 将这个伪客户端也作为一个等待同步的从节点,redis不会向伪客户端发送回复信息 */
    c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
    c->reply = listCreate();
    c->reply_bytes = 0;
    c->obuf_soft_limit_reached_time = 0;
    c->watched_keys = listCreate();
    c->peerid = NULL;
    listSetFreeMethod(c->reply,decrRefCountVoid);
    listSetDupMethod(c->reply,dupClientReplyValue);
    initClientMultiState(c);
    return c;
}

接下来是使用loadAppendOnlyFile方法进行load操作:

int loadAppendOnlyFile(char *filename) {
    struct client *fakeClient; 
    FILE *fp = fopen(filename,"r"); /* 打开AOF文件 */
    struct redis_stat sb;
    int old_aof_state = server.aof_state;
    long loops = 0;
    off_t valid_up_to = 0; 

    if (fp == NULL) {
        serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
        exit(1);
    }

    /* 空AOF文件的特殊情况(这种情况是合理的,最初的AOF文件就是空的) */
    if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
        server.aof_current_size = 0;
        fclose(fp);
        return C_ERR;
    }

    /* 临时关闭AOF功能,以免并发 */
    server.aof_state = AOF_OFF;

    /* 创建伪客户端 */
    fakeClient = createFakeClient();
    /* load AOF文件 */
    startLoading(fp);

    /* 如果AOF文件中有RDB前言,那么会先去load RDB文件,然后再load AOF文件 */
    char sig[5]; /* "REDIS" */
    if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
        if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
    } else {
        rio rdb;

        serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
        if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
        rioInitWithFile(&rdb,fp);
        if (rdbLoadRio(&rdb,NULL,1) != C_OK) {
            serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
            goto readerr;
        } else {
            serverLog(LL_NOTICE,"Reading the remaining AOF tail...");
        }
    }

    /* 读取AOF文件,直到文件读取完毕 */
    while(1) {
        int argc, j;
        unsigned long len;
        robj **argv;
        char buf[128];
        sds argsds;
        struct redisCommand *cmd;

        if (!(loops++ % 1000)) {
            loadingProgress(ftello(fp));
            processEventsWhileBlocked();
        }

        if (fgets(buf,sizeof(buf),fp) == NULL) {
            if (feof(fp))
                break;
            else
                goto readerr;
        }
        if (buf[0] != '*') goto fmterr;
        if (buf[1] == '\0') goto readerr;
        argc = atoi(buf+1);
        if (argc < 1) goto fmterr;

        argv = zmalloc(sizeof(robj*)*argc);
        fakeClient->argc = argc;
        fakeClient->argv = argv;

        for (j = 0; j < argc; j++) {
            if (fgets(buf,sizeof(buf),fp) == NULL) {
                fakeClient->argc = j; 
                freeFakeClientArgv(fakeClient);
                goto readerr;
            }
            if (buf[0] != '$') goto fmterr;
            len = strtol(buf+1,NULL,10);
            argsds = sdsnewlen(NULL,len);
            if (len && fread(argsds,len,1,fp) == 0) {
                sdsfree(argsds);
                fakeClient->argc = j; 
                freeFakeClientArgv(fakeClient);
                goto readerr;
            }
            argv[j] = createObject(OBJ_STRING,argsds);
            if (fread(buf,2,1,fp) == 0) {
                fakeClient->argc = j+1; 
                freeFakeClientArgv(fakeClient);
                goto readerr; 
            }
        }

        cmd = lookupCommand(argv[0]->ptr);
        if (!cmd) {
            serverLog(LL_WARNING,"Unknown command '%s' reading the append only file", (char*)argv[0]->ptr);
            exit(1);
        }

        /* 在伪客户端执行读取到的命令 */
        fakeClient->cmd = cmd;
        cmd->proc(fakeClient);

        /* 伪客户端不会收到回复信息 */
        serverAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
        /* 伪客户端不会被阻塞 */
        serverAssert((fakeClient->flags & CLIENT_BLOCKED) == 0);

        freeFakeClientArgv(fakeClient);
        fakeClient->cmd = NULL;
        if (server.aof_load_truncated) valid_up_to = ftello(fp);
    }

    if (fakeClient->flags & CLIENT_MULTI) goto uxeof;

    //...
}

重写

开启AOF功能之后,服务器会按照一定频率追加“写命令”到AOF文件中去,例如下面这几条写命令:

按照上面所说,AOF文件中会有三条写操作语句。其实这三条写操作可以通过一条写操作来完成:

为了避免AOF文件中记录过多冗余的写操作语句而造成文件膨胀,redis提供了重写功能。

来看看重写功能的实现:

int rewriteAppendOnlyFile(char *filename) {
    //...

    /* 创建新的AOF文件 */
    fp = fopen(tmpfile,"w");
    if (!fp) {
        serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return C_ERR;
    }

    //...

    /* 重写 */
    if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;

    //...
    return C_OK;

werr:
    serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
    fclose(fp);
    unlink(tmpfile);
    return C_ERR;
}

关注rewriteAppendOnlyFileRio方法: 

int rewriteAppendOnlyFileRio(rio *aof) {
    dictIterator *di = NULL;
    dictEntry *de;
    size_t processed = 0;
    long long now = mstime();
    int j;

    for (j = 0; j < server.dbnum; j++) {
        /* 遍历数据库 */
        char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
        redisDb *db = server.db+j;
        dict *d = db->dict;
        /* 忽略空的数据库 */
        if (dictSize(d) == 0) continue;
        di = dictGetSafeIterator(d);

        /* select 数据库 */
        if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
        if (rioWriteBulkLongLong(aof,j) == 0) goto werr;

        /* 遍历数据库中所有的键 */
        while((de = dictNext(di)) != NULL) {
            sds keystr;
            robj key, *o;
            long long expiretime;

            keystr = dictGetKey(de);
            o = dictGetVal(de);
            initStaticStringObject(key,keystr);

            expiretime = getExpire(db,&key);

            /* 忽略已经过期的键 */
            if (expiretime != -1 && expiretime < now) continue;

            /* 以下就是重写,根据不同的类型分别重写 */
            if (o->type == OBJ_STRING) {/* String对象 */
                /* set命令 */
                char cmd[]="*3\r\n$3\r\nSET\r\n";
                if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                if (rioWriteBulkObject(aof,&key) == 0) goto werr;
                if (rioWriteBulkObject(aof,o) == 0) goto werr;
            } else if (o->type == OBJ_LIST) {/* list对象 */
                if (rewriteListObject(aof,&key,o) == 0) goto werr;
            } else if (o->type == OBJ_SET) {/* set对象 */
                if (rewriteSetObject(aof,&key,o) == 0) goto werr;
            } else if (o->type == OBJ_ZSET) {/* sorted set对象 */
                if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
            } else if (o->type == OBJ_HASH) {/* hash对象 */
                if (rewriteHashObject(aof,&key,o) == 0) goto werr;
            } else if (o->type == OBJ_MODULE) {/* module对象 */
                if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
            } else {/* 未知对象 */
                serverPanic("Unknown object type");
            }
            /* 保存过期时间 */
            if (expiretime != -1) {
                char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
                if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                if (rioWriteBulkObject(aof,&key) == 0) goto werr;
                if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
            }
            if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
                processed = aof->processed_bytes;
                aofReadDiffFromParent();
            }
        }
        dictReleaseIterator(di);
        di = NULL;
    }
    return C_OK;

werr:
    if (di) dictReleaseIterator(di);
    return C_ERR;
}

在重写的过程中,对于list、has等对象的重写会有点不同之处,比如list重写:

int rewriteListObject(rio *r, robj *key, robj *o) {
    long long count = 0, items = listTypeLength(o);

    if (o->encoding == OBJ_ENCODING_QUICKLIST) {
        quicklist *list = o->ptr;
        quicklistIter *li = quicklistGetIterator(list, AL_START_HEAD);
        quicklistEntry entry;

        while (quicklistNext(li,&entry)) {
            if (count == 0) {
                /* 关注这里。
                 * 如果元素总数超过了64,为了避免aof_buf溢出,
                 * 会使用多条命令来记录键的值
                 */
                int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
                    AOF_REWRITE_ITEMS_PER_CMD : items;
                if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
                if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0;
                if (rioWriteBulkObject(r,key) == 0) return 0;
            }

            if (entry.value) {
                if (rioWriteBulkString(r,(char*)entry.value,entry.sz) == 0) return 0;
            } else {
                if (rioWriteBulkLongLong(r,entry.longval) == 0) return 0;
            }
            if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
            items--;
        }
        quicklistReleaseIterator(li);
    } else {
        serverPanic("Unknown list encoding");
    }
    return 1;
}

以上是阻塞式的重写,服务器会阻塞直到重写结束,在此过程中不会处理其他任何命令请求。为了避免这种情况,redis提供了后台式的重写:

int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;
    long long start;

    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
    if (aofCreatePipes() != C_OK) return C_ERR;
    openChildInfoPipe();
    start = ustime();
    if ((childpid = fork()) == 0) {
        char tmpfile[256];

        /* fork出的子进程进行重写操作 */
        closeListeningSockets(0);
        redisSetProcTitle("redis-aof-rewrite");
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            size_t private_dirty = zmalloc_get_private_dirty(-1);

            if (private_dirty) {
                serverLog(LL_NOTICE,
                    "AOF rewrite: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }

            server.child_info_data.cow_size = private_dirty;
            sendChildInfo(CHILD_INFO_TYPE_AOF);
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
    } else {
        /* 父进程继续处理命令请求 */
        server.stat_fork_time = ustime()-start;
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
        latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
        if (childpid == -1) {
            closeChildInfoPipe();
            serverLog(LL_WARNING,
                "Can't rewrite append only file in background: fork: %s",
                strerror(errno));
            aofClosePipes();
            return C_ERR;
        }
        serverLog(LL_NOTICE,
            "Background append only file rewriting started by pid %d",childpid);
        server.aof_rewrite_scheduled = 0;
        server.aof_rewrite_time_start = time(NULL);
        server.aof_child_pid = childpid;
        updateDictResizePolicy();
        server.aof_selected_db = -1;
        replicationScriptCacheFlush();
        return C_OK;
    }
    return C_OK; 
}

后台式的重写还存在一个问题:在子进程重写时,父进程处理的命令请求可能会对AOF文件进行修改,这样一来,子进程重写的AOF文件和当前的AOF文件就存在差异了。

解决这个问题的方法是:redis提供一个aof重写缓冲区,当子进程重写时,会把写命令同时发送给aof缓冲区和aof重写缓冲区:

重写完成后,由父进程将AOF重写缓冲区的内容写到新的AOF文件中,然后覆盖原来的AOF文件。

原文链接:加载失败,请重新获取