Redis 源码分析 RedisServer 启动过程
数据结构
在 Redis 的 server.h
源码中 redisServer 数据结构可以看出,其功能得有多复杂。因此在这里我会将其拆成一个个小块,有些内容会在后面单独进行详细分析。而且都做了详细的注释,在这里我就不贴出来。大致分为如下几个部分。
通用参数(General)
模块(Modules)
网络(Networking)
常见的命令回调函数
统计相关(stat)
配置信息(Configuration)
AOF持久化相关(包括aof rewrite过程中,父子进程用于消除数据差异的管道)
RDB持久化相关(包括RDB过程中,父子进程用于通信的管道)
AOF或者主从复制下,命令传播相关
日志相关(logging)
主从复制(Replication (master)+Replication (slave))
主从复制的脚本缓存(Replication script cache)
主从同步相关(Synchronous replication)
系统限制(Limits)
阻塞客户端(Blocked clients)
sort命令相关(Sort)
数据结构转换参数
时间缓存(time cache)
发布订阅(Pubsub)
集群(Cluster)
Lazy free(表示删除过期键,是否启用后台线程异步删除)
LUA脚本
延迟监控相关
服务端中互斥锁信息
系统硬件信息
启动过程
redis 服务端启动时调用 server.c
文件中的 int main(int argc, char **argv)
方法完成的,代码如下:
下面我们一起来分析一下,redis-server
的启动过程。
通用设置
main 函数中,通用设置包括时区设置,hash 种子
// 时区设置 setlocale(LC_COLLATE,""); tzset(); /* Populates 'timezone' global. */ zmalloc_set_oom_handler(redisOutOfMemoryHandler); srand(time(NULL)^getpid()); srandom(time(NULL)^getpid()); gettimeofday(&tv,NULL); init_genrand64(((long long) tv.tv_sec * 1000000 + tv.tv_usec) ^ getpid()); crc64_init(); 复制代码
生成 hash 种子
void dictSetHashFunctionSeed(uint8_t *seed) { memcpy(dict_hash_function_seed,seed,sizeof(dict_hash_function_seed)); } /* The default hashing function uses SipHash implementation * in siphash.c. */ uint64_t siphash(const uint8_t *in, const size_t inlen, const uint8_t *k); uint64_t siphash_nocase(const uint8_t *in, const size_t inlen, const uint8_t *k); uint64_t dictGenHashFunction(const void *key, int len) { return siphash(key,len,dict_hash_function_seed); } 复制代码
模式选择
根据参数判断,是否是哨兵模式
server.sentinel_mode = checkForSentinelMode(argc,argv); /* Returns 1 if there is --sentinel among the arguments or if * argv[0] contains "redis-sentinel". */ int checkForSentinelMode(int argc, char **argv) { int j; if (strstr(argv[0],"redis-sentinel") != NULL) return 1; for (j = 1; j < argc; j++) if (!strcmp(argv[j],"--sentinel")) return 1; return 0; } 复制代码
初始化服务端
服务端的初始化,主要是函数 initServerConfig
。 主要是的职责就是初始化 redisServer
数据库结构中各个成员变量。
void initServerConfig(void) { int j; updateCachedTime(1); getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE); server.runid[CONFIG_RUN_ID_SIZE] = '\0'; changeReplicationId(); clearReplicationId2(); server.hz = CONFIG_DEFAULT_HZ; /* Initialize it ASAP, even if it may get updated later after loading the config. This value may be used before the server is initialized. */ server.timezone = getTimeZone(); /* Initialized by tzset(). */ server.configfile = NULL; server.executable = NULL; server.arch_bits = (sizeof(long) == 8) ? 64 : 32; server.bindaddr_count = 0; server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM; server.ipfd.count = 0; server.tlsfd.count = 0; server.sofd = -1; server.active_expire_enabled = 1; server.skip_checksum_validation = 0; server.saveparams = NULL; server.loading = 0; server.loading_rdb_used_mem = 0; server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE); server.aof_state = AOF_OFF; server.aof_rewrite_base_size = 0; server.aof_rewrite_scheduled = 0; server.aof_flush_sleep = 0; server.aof_last_fsync = time(NULL); atomicSet(server.aof_bio_fsync_status,C_OK); server.aof_rewrite_time_last = -1; server.aof_rewrite_time_start = -1; server.aof_lastbgrewrite_status = C_OK; server.aof_delayed_fsync = 0; server.aof_fd = -1; server.aof_selected_db = -1; /* Make sure the first time will not match */ server.aof_flush_postponed_start = 0; server.pidfile = NULL; server.active_defrag_running = 0; server.notify_keyspace_events = 0; server.blocked_clients = 0; memset(server.blocked_clients_by_type,0, sizeof(server.blocked_clients_by_type)); server.shutdown_asap = 0; server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE); server.cluster_module_flags = CLUSTER_MODULE_FLAG_NONE; server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL); server.next_client_id = 1; /* Client IDs, start from 1 .*/ server.loading_process_events_interval_bytes = (1024*1024*2); unsigned int lruclock = getLRUClock(); atomicSet(server.lruclock,lruclock); resetServerSaveParams(); appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */ appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */ /* Replication related */ server.masterauth = NULL; server.masterhost = NULL; server.masterport = 6379; server.master = NULL; server.cached_master = NULL; server.master_initial_offset = -1; server.repl_state = REPL_STATE_NONE; server.repl_transfer_tmpfile = NULL; server.repl_transfer_fd = -1; server.repl_transfer_s = NULL; server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.master_repl_offset = 0; /* Replication partial resync backlog */ server.repl_backlog = NULL; server.repl_backlog_histlen = 0; server.repl_backlog_idx = 0; server.repl_backlog_off = 0; server.repl_no_slaves_since = time(NULL); /* Failover related */ server.failover_end_time = 0; server.force_failover = 0; server.target_replica_host = NULL; server.target_replica_port = 0; server.failover_state = NO_FAILOVER; /* Client output buffer limits */ for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++) server.client_obuf_limits[j] = clientBufferLimitsDefaults[j]; /* Linux OOM Score config */ for (j = 0; j < CONFIG_OOM_COUNT; j++) server.oom_score_adj_values[j] = configOOMScoreAdjValuesDefaults[j]; /* Double constants initialization */ R_Zero = 0.0; R_PosInf = 1.0/R_Zero; R_NegInf = -1.0/R_Zero; R_Nan = R_Zero/R_Zero; /* Command table -- we initialize it here as it is part of the * initial configuration, since command names may be changed via * redis.conf using the rename-command directive. */ server.commands = dictCreate(&commandTableDictType,NULL); server.orig_commands = dictCreate(&commandTableDictType,NULL); populateCommandTable(); server.delCommand = lookupCommandByCString("del"); server.multiCommand = lookupCommandByCString("multi"); server.lpushCommand = lookupCommandByCString("lpush"); server.lpopCommand = lookupCommandByCString("lpop"); server.rpopCommand = lookupCommandByCString("rpop"); server.zpopminCommand = lookupCommandByCString("zpopmin"); server.zpopmaxCommand = lookupCommandByCString("zpopmax"); server.sremCommand = lookupCommandByCString("srem"); server.execCommand = lookupCommandByCString("exec"); server.expireCommand = lookupCommandByCString("expire"); server.pexpireCommand = lookupCommandByCString("pexpire"); server.xclaimCommand = lookupCommandByCString("xclaim"); server.xgroupCommand = lookupCommandByCString("xgroup"); server.rpoplpushCommand = lookupCommandByCString("rpoplpush"); server.lmoveCommand = lookupCommandByCString("lmove"); /* Debugging */ server.watchdog_period = 0; /* By default we want scripts to be always replicated by effects * (single commands executed by the script), and not by sending the * script to the slave / AOF. This is the new way starting from * Redis 5. However it is possible to revert it via redis.conf. */ server.lua_always_replicate_commands = 1; /* Client Pause related */ server.client_pause_type = CLIENT_PAUSE_OFF; server.client_pause_end_time = 0; initConfigValues(); } 复制代码
哨兵设置
根据前文所选择的模式,如果选择的是哨兵模式,这里就进入哨兵初始化流程。
是否需要 rdb/aof 校验
/* Check if we need to start in redis-check-rdb/aof mode. We just execute * the program main. However the program is part of the Redis executable * so that we can easily execute an RDB check on loading errors. */ if (strstr(argv[0],"redis-check-rdb") != NULL) redis_check_rdb_main(argc,argv,NULL); else if (strstr(argv[0],"redis-check-aof") != NULL) redis_check_aof_main(argc,argv); 复制代码
解析命令行参数
根据命令行参数,如配置文件中已有设置,那就就覆盖默认参数设置
if (argc >= 2) { j = 1; /* First option to parse in argv[] */ sds options = sdsempty(); /* Handle special options --help and --version */ if (strcmp(argv[1], "-v") == 0 || strcmp(argv[1], "--version") == 0) version(); if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-h") == 0) usage(); if (strcmp(argv[1], "--test-memory") == 0) { if (argc == 3) { memtest(atoi(argv[2]),50); exit(0); } else { fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n"); fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n"); exit(1); } } /* Parse command line options * Precedence wise, File, stdin, explicit options -- last config is the one that matters. * * First argument is the config file name? */ if (argv[1][0] != '-') { /* Replace the config file in server.exec_argv with its absolute path. */ server.configfile = getAbsolutePath(argv[1]); zfree(server.exec_argv[1]); server.exec_argv[1] = zstrdup(server.configfile); j = 2; // Skip this arg when parsing options } while(j < argc) { /* Either first or last argument - Should we read config from stdin? */ if (argv[j][0] == '-' && argv[j][1] == '\0' && (j == 1 || j == argc-1)) { config_from_stdin = 1; } /* All the other options are parsed and conceptually appended to the * configuration file. For instance --port 6380 will generate the * string "port 6380\n" to be parsed after the actual config file * and stdin input are parsed (if they exist). */ else if (argv[j][0] == '-' && argv[j][1] == '-') { /* Option name */ if (sdslen(options)) options = sdscat(options,"\n"); options = sdscat(options,argv[j]+2); options = sdscat(options," "); } else { /* Option argument */ options = sdscatrepr(options,argv[j],strlen(argv[j])); options = sdscat(options," "); } j++; } // 读取配置文件 loadServerConfig(server.configfile, config_from_stdin, options); if (server.sentinel_mode) loadSentinelConfigFromQueue(); sdsfree(options); } 复制代码
读取配置文件
主要是读取 redis.conf
配置参数。
/* Load the server configuration from the specified filename. * The function appends the additional configuration directives stored * in the 'options' string to the config file before loading. * * Both filename and options can be NULL, in such a case are considered * empty. This way loadServerConfig can be used to just load a file or * just load a string. */ void loadServerConfig(char *filename, char config_from_stdin, char *options) { sds config = sdsempty(); char buf[CONFIG_MAX_LINE+1]; FILE *fp; /* Load the file content */ if (filename) { if ((fp = fopen(filename,"r")) == NULL) { serverLog(LL_WARNING, "Fatal error, can't open config file '%s': %s", filename, strerror(errno)); exit(1); } while(fgets(buf,CONFIG_MAX_LINE+1,fp) != NULL) config = sdscat(config,buf); fclose(fp); } /* Append content from stdin */ if (config_from_stdin) { serverLog(LL_WARNING,"Reading config from stdin"); fp = stdin; while(fgets(buf,CONFIG_MAX_LINE+1,fp) != NULL) config = sdscat(config,buf); } /* Append the additional options */ if (options) { config = sdscat(config,"\n"); config = sdscat(config,options); } loadServerConfigFromString(config); sdsfree(config); } 复制代码
是否后台运行
int background = server.daemonize && !server.supervised; // 是否是后台运行 if (background) daemonize(); // 执行后台运行的方法 void daemonize(void) { int fd; if (fork() != 0) exit(0); /* parent exits */ setsid(); /* create a new session */ /* Every output goes to /dev/null. If Redis is daemonized but * the 'logfile' is set to 'stdout' in the configuration file * it will not log at all. */ if ((fd = open("/dev/null", O_RDWR, 0)) != -1) { dup2(fd, STDIN_FILENO); dup2(fd, STDOUT_FILENO); dup2(fd, STDERR_FILENO); if (fd > STDERR_FILENO) close(fd); } } 复制代码
这里需要说一下(经常面试问到),当然 Redis 中台进程,貌似常见的问题比较简单,我们看来纯正的后台进程:
int daemon_init(const char *pname, int facility) { int i; pid_t pid; if ( (pid = Fork()) < 0) //创建一个子进程 return (-1); else if (pid) _exit(0); /* parent terminates 父进程退出*/ //子进程继续运行 //此进程是子进程,所以肯定不是组长进程 /* child 1 continues... */ if (setsid() < 0) /* become session leader 创建会话,成为会话首进程,新的进程组的组长进程*/ return (-1); Signal(SIGHUP, SIG_IGN); //把挂起信号设置为忽略 if ( (pid = Fork()) < 0) //再创建一个子进程 return (-1); else if (pid) //父进程退出 _exit(0); /* child 1 terminates */ //第二个子进程继续运行,因为第二个子进程已经不是会话首进程了,所以永远不会获得控制终端 /* child 2 continues... */ daemon_proc = 1; /* for err_XXX() functions 再error.c中定义的变量*/ chdir("/"); /* change working directory 调整工作目录到根目录 */ /* close off file descriptors */ for (i = 0; i < MAXFD; i++) //关闭所有文件描述符 close(i); /* redirect stdin, stdout, and stderr to /dev/null 定义标准输入,标准输出和标准错误到/dev/null */ open("/dev/null", O_RDONLY); open("/dev/null", O_RDWR); open("/dev/null", O_RDWR); openlog(pname, LOG_PID, facility); //打开日志文件 return (0); /* success 函数运行成功 */ } 复制代码
setsid(): 确保当前进程编程新绘画的会话头进程组的进程组头进程,从而不在受终端控制。
忽略 SIGHUP 信号之后再次 fork 目的是确保本守护京城来即使打开一个终端设备,也不会自动获取终端。当乜有控制终端的一个会话头进程打开一个控制终端时,该终端自动成为这个会话头进程的控制终端。然后再次 fork 之后,我们确保新的子进程不在是一个会话头进程,从而不能自动获取一控制器终端。这里必须要忽略 SIGHUP 信号,因为会话头进程(即首次 fork 产生的子进程)终止时,其会话中的所有进程(即再次 fork 的子进程)都会收到 SIGHUP 信号
更改跟路径
关闭继承来的所有套接字
重定向 stdin ,stdout 和 stderr ,否则他会输出到屏幕上。
初始化服务
信号量
服务端打交道最多的是 SIGHUP 和 SIGPIPE,这两个信号默认行为是终止。而服务器随便就终止那是不行的,我么需要进行忽略。
signal(SIGHUP, SIG_IGN); signal(SIGPIPE, SIG_IGN); 复制代码
针对其他的信号,比如 SIGINT 信号,我们对其进行捕获后 redis 进行收尾工作,避免进程呗暴力 kill
void setupSignalHandlers(void) { struct sigaction act; /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used. * Otherwise, sa_handler is used. */ sigemptyset(&act.sa_mask); act.sa_flags = 0; act.sa_handler = sigShutdownHandler; sigaction(SIGTERM, &act, NULL); sigaction(SIGINT, &act, NULL); sigemptyset(&act.sa_mask); act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO; act.sa_sigaction = sigsegvHandler; if(server.crashlog_enabled) { sigaction(SIGSEGV, &act, NULL); sigaction(SIGBUS, &act, NULL); sigaction(SIGFPE, &act, NULL); sigaction(SIGILL, &act, NULL); sigaction(SIGABRT, &act, NULL); } return; } 复制代码
syslog 设置
syslog 是 linux 系统自带的,主要是为了 daemon 进程提供日志服务,我们在前面讲过。deamon 进程无法打印到终端,那么如何方便的接受输出的日志呢?linux 提供了 syslog 服务,该服务支持打印各种级别的日志以及输出位置(本地或者远程均可)
if (server.syslog_enabled) { openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT, server.syslog_facility); } 复制代码
其他常用参数设置
/* Initialization after setting defaults from the config system. */ server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF; server.hz = server.config_hz; server.pid = getpid(); server.in_fork_child = CHILD_TYPE_NONE; server.main_thread_id = pthread_self(); server.current_client = NULL; server.errors = raxNew(); server.fixed_time_expire = 0; server.clients = listCreate(); server.clients_index = raxNew(); server.clients_to_close = listCreate(); server.slaves = listCreate(); server.monitors = listCreate(); server.clients_pending_write = listCreate(); server.clients_pending_read = listCreate(); server.clients_timeout_table = raxNew(); server.replication_allowed = 1; server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); server.clients_waiting_acks = listCreate(); server.get_ack_from_slaves = 0; server.client_pause_type = 0; server.paused_clients = listCreate(); server.events_processed_while_blocked = 0; server.system_memory_size = zmalloc_get_memory_size(); server.blocked_last_cron = 0; server.blocking_op_nesting = 0; 复制代码
创建共享对象
redis 非常注重内存消耗的,有些常用的对象,采用引用计数的方式进行复用
createSharedObjects(); /* Shared command responses */ shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n")); shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n")); shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n")); shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n")); shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n")); shared.emptyarray = createObject(OBJ_STRING,sdsnew("*0\r\n")); shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n")); shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n")); shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n")); shared.space = createObject(OBJ_STRING,sdsnew(" ")); shared.colon = createObject(OBJ_STRING,sdsnew(":")); shared.plus = createObject(OBJ_STRING,sdsnew("+")); 复制代码
根据系统限制调整你打开的文件数
adjustOpenFilesLimit(); 复制代码
网络模型 reactor 初始化
redis 支持 Unix 和 tcp 两种模型,当服务端和客户端都在本机的时候, Unix 域套接字更快,因为不需要协议头解析等
const char *clk_msg = monotonicInit(); serverLog(LL_NOTICE, "monotonic clock: %s", clk_msg); server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); if (server.el == NULL) { serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", strerror(errno)); exit(1); } server.db = zmalloc(sizeof(redisDb)*server.dbnum); /* Open the TCP listening socket for the user commands. */ if (server.port != 0 && listenToPort(server.port,&server.ipfd) == C_ERR) { serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port); exit(1); } if (server.tls_port != 0 && listenToPort(server.tls_port,&server.tlsfd) == C_ERR) { serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.", server.tls_port); exit(1); } /* Open the listening Unix domain socket. */ if (server.unixsocket != NULL) { unlink(server.unixsocket); /* don't care if this fails */ server.sofd = anetUnixServer(server.neterr,server.unixsocket, server.unixsocketperm, server.tcp_backlog); if (server.sofd == ANET_ERR) { serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr); exit(1); } anetNonBlock(NULL,server.sofd); anetCloexec(server.sofd); } 复制代码
LRU 策略中过期池初始化
/* Create a new eviction pool. */ void evictionPoolAlloc(void) { struct evictionPoolEntry *ep; int j; ep = zmalloc(sizeof(*ep)*EVPOOL_SIZE); for (j = 0; j < EVPOOL_SIZE; j++) { ep[j].idle = 0; ep[j].key = NULL; ep[j].cached = sdsnewlen(NULL,EVPOOL_CACHED_SDS_SIZE); ep[j].dbid = 0; } EvictionPoolLRU = ep; } 复制代码
初始化 rdb 和 aof 信息
server.rdb_child_type = RDB_CHILD_TYPE_NONE; server.rdb_pipe_conns = NULL; server.rdb_pipe_numconns = 0; server.rdb_pipe_numconns_writing = 0; server.rdb_pipe_buff = NULL; server.rdb_pipe_bufflen = 0; server.rdb_bgsave_scheduled = 0; server.child_info_pipe[0] = -1; server.child_info_pipe[1] = -1; server.child_info_nread = 0; aofRewriteBufferReset(); server.aof_buf = sdsempty(); server.lastsave = time(NULL); /* At startup we consider the DB saved. */ server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */ server.rdb_save_time_last = -1; server.rdb_save_time_start = -1; server.dirty = 0; 复制代码
初始化状态信息
resetServerStats(); /* A few stats we don't want to reset: server startup time, and peak mem. */ server.stat_starttime = time(NULL); server.stat_peak_memory = 0; server.stat_current_cow_bytes = 0; server.stat_current_cow_updated = 0; server.stat_current_save_keys_processed = 0; server.stat_current_save_keys_total = 0; server.stat_rdb_cow_bytes = 0; server.stat_aof_cow_bytes = 0; server.stat_module_cow_bytes = 0; server.stat_module_progress = 0; for (int j = 0; j < CLIENT_TYPE_COUNT; j++) server.stat_clients_type_memory[j] = 0; server.cron_malloc_stats.zmalloc_used = 0; server.cron_malloc_stats.process_rss = 0; server.cron_malloc_stats.allocator_allocated = 0; server.cron_malloc_stats.allocator_active = 0; server.cron_malloc_stats.allocator_resident = 0; server.lastbgsave_status = C_OK; server.aof_last_write_status = C_OK; server.aof_last_write_errno = 0; server.repl_good_slaves_count = 0; 复制代码
注册事件
网络编程中,包括三类事件:文件事件、定时时间和信号事件
/* Create the timer callback, this is our way to process many background * operations incrementally, like clients timeout, eviction of unaccessed * expired keys and so forth. */ if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1); } /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) { serverPanic("Unrecoverable error creating TCP socket accept handler."); } if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) { serverPanic("Unrecoverable error creating TLS socket accept handler."); } if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); /* Register a readable event for the pipe used to awake the event loop * when a blocked client in a module needs attention. */ if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE, moduleBlockedClientPipeReadable,NULL) == AE_ERR) { serverPanic( "Error registering the readable event for the module " "blocked clients subsystem."); } 复制代码
这里采用回调的方式来注册事件、结合 IO 复用技术时间高效的网络模型。
复制初始化等等
一些 lua 虚拟机脚本添加
慢日志初始化
/* Initialize the slow log. This function should be called a single time * at server startup. */ void slowlogInit(void) { server.slowlog = listCreate(); server.slowlog_entry_id = 0; listSetFreeMethod(server.slowlog,slowlogFreeEntry); } 复制代码
后台任务线程创建
这里后台线程主要包含三类
异步关闭文件描述符
AOF 异步刷盘
过期键异步删除
/* Initialize the background system, spawning the thread. */ void bioInit(void) { pthread_attr_t attr; pthread_t thread; size_t stacksize; int j; /* Initialization of state vars and objects */ for (j = 0; j < BIO_NUM_OPS; j++) { pthread_mutex_init(&bio_mutex[j],NULL); pthread_cond_init(&bio_newjob_cond[j],NULL); pthread_cond_init(&bio_step_cond[j],NULL); bio_jobs[j] = listCreate(); bio_pending[j] = 0; } /* Set the stack size as by default it may be small in some system */ pthread_attr_init(&attr); pthread_attr_getstacksize(&attr,&stacksize); if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; pthread_attr_setstacksize(&attr, stacksize); /* Ready to spawn our threads. We use the single argument the thread * function accepts in order to pass the job ID the thread is * responsible of. */ for (j = 0; j < BIO_NUM_OPS; j++) { void *arg = (void*)(unsigned long) j; if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs."); exit(1); } bio_threads[j] = thread; } } 复制代码
lua 脚本
其他
外部模块加载
void moduleLoadFromQueue(void) { listIter li; listNode *ln; listRewind(server.loadmodule_queue,&li); while((ln = listNext(&li))) { struct moduleLoadQueueEntry *loadmod = ln->value; if (moduleLoad(loadmod->path,(void **)loadmod->argv,loadmod->argc) == C_ERR) { serverLog(LL_WARNING, "Can't load module from %s: server aborting", loadmod->path); exit(1); } } 复制代码
磁盘加载数据
/* Function called at startup to load RDB or AOF file in memory. */ void loadDataFromDisk(void) { long long start = ustime(); if (server.aof_state == AOF_ON) { if (loadAppendOnlyFile(server.aof_filename) == C_OK) serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); } else { rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; errno = 0; /* Prevent a stale value from affecting error checking */ if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) { serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000); /* Restore the replication ID / offset from the RDB file. */ if ((server.masterhost || (server.cluster_enabled && nodeIsSlave(server.cluster->myself))) && rsi.repl_id_is_set && rsi.repl_offset != -1 && /* Note that older implementations may save a repl_stream_db * of -1 inside the RDB file in a wrong way, see more * information in function rdbPopulateSaveInfo. */ rsi.repl_stream_db != -1) { memcpy(server.replid,rsi.repl_id,sizeof(server.replid)); server.master_repl_offset = rsi.repl_offset; /* If we are a slave, create a cached master from this * information, in order to allow partial resynchronizations * with masters. */ replicationCacheMasterUsingMyself(); selectDb(server.cached_master,rsi.repl_stream_db); } } else if (errno != ENOENT) { serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); exit(1); } } } 复制代码
事件循环前准备
aeSetBeforeSleepProc(server.el,beforeSleep); aeSetAfterSleepProc(server.el,afterSleep); 复制代码
beforeSleep
beforeSleep 包括一下几个步骤:
运行 fast cycle 模式,进行过期处理
向所有从节点发送 ack 请求
接阻塞从节点
处理阻塞的客户端
将 AOF 刷盘
处理挂起的请求
代码如下:
void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); size_t zmalloc_used = zmalloc_used_memory(); if (zmalloc_used > server.stat_peak_memory) server.stat_peak_memory = zmalloc_used; /* Just call a subset of vital functions in case we are re-entering * the event loop from processEventsWhileBlocked(). Note that in this * case we keep track of the number of events we are processing, since * processEventsWhileBlocked() wants to stop ASAP if there are no longer * events to handle. */ if (ProcessingEventsWhileBlocked) { uint64_t processed = 0; processed += handleClientsWithPendingReadsUsingThreads(); processed += tlsProcessPendingData(); processed += handleClientsWithPendingWrites(); processed += freeClientsInAsyncFreeQueue(); server.events_processed_while_blocked += processed; return; } /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); /* Handle TLS pending data. (must be done before flushAppendOnlyFile) */ tlsProcessPendingData(); /* If tls still has pending unread data don't sleep at all. */ aeSetDontWait(server.el, tlsHasPendingData()); /* Call the Redis Cluster before sleep function. Note that this function * may change the state of Redis Cluster (from ok to fail or vice versa), * so it's a good idea to call it before serving the unblocked clients * later in this function. */ if (server.cluster_enabled) clusterBeforeSleep(); /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ if (server.active_expire_enabled && server.masterhost == NULL) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); /* Unblock all the clients blocked for synchronous replication * in WAIT. */ if (listLength(server.clients_waiting_acks)) processClientsWaitingReplicas(); /* Check if there are clients unblocked by modules that implement * blocking commands. */ if (moduleCount()) moduleHandleBlockedClients(); /* Try to process pending commands for clients that were just unblocked. */ if (listLength(server.unblocked_clients)) processUnblockedClients(); /* Send all the slaves an ACK request if at least one client blocked * during the previous event loop iteration. Note that we do this after * processUnblockedClients(), so if there are multiple pipelined WAITs * and the just unblocked WAIT gets blocked again, we don't have to wait * a server cron cycle in absence of other event loop events. See #6623. * * We also don't send the ACKs while clients are paused, since it can * increment the replication backlog, they'll be sent after the pause * if we are still the master. */ if (server.get_ack_from_slaves && !checkClientPauseTimeoutAndReturnIfPaused()) { robj *argv[3]; argv[0] = shared.replconf; argv[1] = shared.getack; argv[2] = shared.special_asterick; /* Not used argument. */ replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); server.get_ack_from_slaves = 0; } /* We may have recieved updates from clients about their current offset. NOTE: * this can't be done where the ACK is recieved since failover will disconnect * our clients. */ updateFailoverStatus(); /* Send the invalidation messages to clients participating to the * client side caching protocol in broadcasting (BCAST) mode. */ trackingBroadcastInvalidationMessages(); /* Write the AOF buffer on disk */ if (server.aof_state == AOF_ON) flushAppendOnlyFile(0); /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(); /* Try to process blocked clients every once in while. Example: A module * calls RM_SignalKeyAsReady from within a timer callback (So we don't * visit processCommand() at all). */ handleClientsBlockedOnKeys(); /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */ if (moduleCount()) moduleReleaseGIL(); /* Do NOT add anything below moduleReleaseGIL !!! */ } 复制代码
进入事件主循环
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { aeProcessEvents(eventLoop, AE_ALL_EVENTS| AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP); } } /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); // 写:sendReplyToClient // 读:readQueryFromClient 复制代码
aeProcessEvents
中包含网络时间以及定时事件,这两类时间通过 IO 复用很好的继承在一起。
请求整体流程
参考与引用
《Redis 设计与实现》 黄健宏
图片来源于网络
作者:心城以北
链接:https://juejin.cn/post/7072347362382839821