阅读 144

Redis 源码分析 RedisServer 启动过程

数据结构

image.png

在 Redis 的 server.h 源码中 redisServer 数据结构可以看出,其功能得有多复杂。因此在这里我会将其拆成一个个小块,有些内容会在后面单独进行详细分析。而且都做了详细的注释,在这里我就不贴出来。大致分为如下几个部分。

  • 通用参数(General)

  • 模块(Modules)

  • 网络(Networking)

  • 常见的命令回调函数

  • 统计相关(stat)

  • 配置信息(Configuration)

  • AOF持久化相关(包括aof rewrite过程中,父子进程用于消除数据差异的管道)

  • RDB持久化相关(包括RDB过程中,父子进程用于通信的管道)

  • AOF或者主从复制下,命令传播相关

  • 日志相关(logging)

  • 主从复制(Replication (master)+Replication (slave))

  • 主从复制的脚本缓存(Replication script cache)

  • 主从同步相关(Synchronous replication)

  • 系统限制(Limits)

  • 阻塞客户端(Blocked clients)

  • sort命令相关(Sort)

  • 数据结构转换参数

  • 时间缓存(time cache)

  • 发布订阅(Pubsub)

  • 集群(Cluster)

  • Lazy free(表示删除过期键,是否启用后台线程异步删除)

  • LUA脚本

  • 延迟监控相关

  • 服务端中互斥锁信息

  • 系统硬件信息

启动过程

redis 服务端启动时调用 server.c 文件中的 int main(int argc, char **argv) 方法完成的,代码如下:

image.png

下面我们一起来分析一下,redis-server 的启动过程。

通用设置

main 函数中,通用设置包括时区设置,hash 种子

// 时区设置 setlocale(LC_COLLATE,""); tzset(); /* Populates 'timezone' global. */ zmalloc_set_oom_handler(redisOutOfMemoryHandler); srand(time(NULL)^getpid()); srandom(time(NULL)^getpid()); gettimeofday(&tv,NULL); init_genrand64(((long long) tv.tv_sec * 1000000 + tv.tv_usec) ^ getpid()); crc64_init(); 复制代码

  • 生成 hash 种子

void dictSetHashFunctionSeed(uint8_t *seed) {     memcpy(dict_hash_function_seed,seed,sizeof(dict_hash_function_seed)); } /* The default hashing function uses SipHash implementation  * in siphash.c. */ uint64_t siphash(const uint8_t *in, const size_t inlen, const uint8_t *k); uint64_t siphash_nocase(const uint8_t *in, const size_t inlen, const uint8_t *k); uint64_t dictGenHashFunction(const void *key, int len) {     return siphash(key,len,dict_hash_function_seed); } 复制代码

模式选择

根据参数判断,是否是哨兵模式

server.sentinel_mode = checkForSentinelMode(argc,argv); /* Returns 1 if there is --sentinel among the arguments or if  * argv[0] contains "redis-sentinel". */ int checkForSentinelMode(int argc, char **argv) {     int j;     if (strstr(argv[0],"redis-sentinel") != NULL) return 1;     for (j = 1; j < argc; j++)         if (!strcmp(argv[j],"--sentinel")) return 1;     return 0; } 复制代码

初始化服务端

服务端的初始化,主要是函数 initServerConfig。 主要是的职责就是初始化 redisServer 数据库结构中各个成员变量。

void initServerConfig(void) {     int j;     updateCachedTime(1);     getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);     server.runid[CONFIG_RUN_ID_SIZE] = '\0';     changeReplicationId();     clearReplicationId2();     server.hz = CONFIG_DEFAULT_HZ; /* Initialize it ASAP, even if it may get                                       updated later after loading the config.                                       This value may be used before the server                                       is initialized. */     server.timezone = getTimeZone(); /* Initialized by tzset(). */     server.configfile = NULL;     server.executable = NULL;     server.arch_bits = (sizeof(long) == 8) ? 64 : 32;     server.bindaddr_count = 0;     server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;     server.ipfd.count = 0;     server.tlsfd.count = 0;     server.sofd = -1;     server.active_expire_enabled = 1;     server.skip_checksum_validation = 0;     server.saveparams = NULL;     server.loading = 0;     server.loading_rdb_used_mem = 0;     server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE);     server.aof_state = AOF_OFF;     server.aof_rewrite_base_size = 0;     server.aof_rewrite_scheduled = 0;     server.aof_flush_sleep = 0;     server.aof_last_fsync = time(NULL);     atomicSet(server.aof_bio_fsync_status,C_OK);     server.aof_rewrite_time_last = -1;     server.aof_rewrite_time_start = -1;     server.aof_lastbgrewrite_status = C_OK;     server.aof_delayed_fsync = 0;     server.aof_fd = -1;     server.aof_selected_db = -1; /* Make sure the first time will not match */     server.aof_flush_postponed_start = 0;     server.pidfile = NULL;     server.active_defrag_running = 0;     server.notify_keyspace_events = 0;     server.blocked_clients = 0;     memset(server.blocked_clients_by_type,0,            sizeof(server.blocked_clients_by_type));     server.shutdown_asap = 0;     server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE);     server.cluster_module_flags = CLUSTER_MODULE_FLAG_NONE;     server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);     server.next_client_id = 1; /* Client IDs, start from 1 .*/     server.loading_process_events_interval_bytes = (1024*1024*2);     unsigned int lruclock = getLRUClock();     atomicSet(server.lruclock,lruclock);     resetServerSaveParams();     appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */     appendServerSaveParams(300,100);  /* save after 5 minutes and 100 changes */     appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */     /* Replication related */     server.masterauth = NULL;     server.masterhost = NULL;     server.masterport = 6379;     server.master = NULL;     server.cached_master = NULL;     server.master_initial_offset = -1;     server.repl_state = REPL_STATE_NONE;     server.repl_transfer_tmpfile = NULL;     server.repl_transfer_fd = -1;     server.repl_transfer_s = NULL;     server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;     server.repl_down_since = 0; /* Never connected, repl is down since EVER. */     server.master_repl_offset = 0;     /* Replication partial resync backlog */     server.repl_backlog = NULL;     server.repl_backlog_histlen = 0;     server.repl_backlog_idx = 0;     server.repl_backlog_off = 0;     server.repl_no_slaves_since = time(NULL);     /* Failover related */     server.failover_end_time = 0;     server.force_failover = 0;     server.target_replica_host = NULL;     server.target_replica_port = 0;     server.failover_state = NO_FAILOVER;     /* Client output buffer limits */     for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++)         server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];     /* Linux OOM Score config */     for (j = 0; j < CONFIG_OOM_COUNT; j++)         server.oom_score_adj_values[j] = configOOMScoreAdjValuesDefaults[j];     /* Double constants initialization */     R_Zero = 0.0;     R_PosInf = 1.0/R_Zero;     R_NegInf = -1.0/R_Zero;     R_Nan = R_Zero/R_Zero;     /* Command table -- we initialize it here as it is part of the      * initial configuration, since command names may be changed via      * redis.conf using the rename-command directive. */     server.commands = dictCreate(&commandTableDictType,NULL);     server.orig_commands = dictCreate(&commandTableDictType,NULL);     populateCommandTable();     server.delCommand = lookupCommandByCString("del");     server.multiCommand = lookupCommandByCString("multi");     server.lpushCommand = lookupCommandByCString("lpush");     server.lpopCommand = lookupCommandByCString("lpop");     server.rpopCommand = lookupCommandByCString("rpop");     server.zpopminCommand = lookupCommandByCString("zpopmin");     server.zpopmaxCommand = lookupCommandByCString("zpopmax");     server.sremCommand = lookupCommandByCString("srem");     server.execCommand = lookupCommandByCString("exec");     server.expireCommand = lookupCommandByCString("expire");     server.pexpireCommand = lookupCommandByCString("pexpire");     server.xclaimCommand = lookupCommandByCString("xclaim");     server.xgroupCommand = lookupCommandByCString("xgroup");     server.rpoplpushCommand = lookupCommandByCString("rpoplpush");     server.lmoveCommand = lookupCommandByCString("lmove");     /* Debugging */     server.watchdog_period = 0;     /* By default we want scripts to be always replicated by effects      * (single commands executed by the script), and not by sending the      * script to the slave / AOF. This is the new way starting from      * Redis 5. However it is possible to revert it via redis.conf. */     server.lua_always_replicate_commands = 1;     /* Client Pause related */     server.client_pause_type = CLIENT_PAUSE_OFF;     server.client_pause_end_time = 0;        initConfigValues(); } 复制代码

哨兵设置

根据前文所选择的模式,如果选择的是哨兵模式,这里就进入哨兵初始化流程。

是否需要 rdb/aof 校验

    /* Check if we need to start in redis-check-rdb/aof mode. We just execute      * the program main. However the program is part of the Redis executable      * so that we can easily execute an RDB check on loading errors. */     if (strstr(argv[0],"redis-check-rdb") != NULL)         redis_check_rdb_main(argc,argv,NULL);     else if (strstr(argv[0],"redis-check-aof") != NULL)         redis_check_aof_main(argc,argv); 复制代码

解析命令行参数

根据命令行参数,如配置文件中已有设置,那就就覆盖默认参数设置

   if (argc >= 2) {         j = 1; /* First option to parse in argv[] */         sds options = sdsempty();         /* Handle special options --help and --version */         if (strcmp(argv[1], "-v") == 0 ||             strcmp(argv[1], "--version") == 0) version();         if (strcmp(argv[1], "--help") == 0 ||             strcmp(argv[1], "-h") == 0) usage();         if (strcmp(argv[1], "--test-memory") == 0) {             if (argc == 3) {                 memtest(atoi(argv[2]),50);                 exit(0);             } else {                 fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");                 fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");                 exit(1);             }         }         /* Parse command line options          * Precedence wise, File, stdin, explicit options -- last config is the one that matters.          *          * First argument is the config file name? */         if (argv[1][0] != '-') {             /* Replace the config file in server.exec_argv with its absolute path. */             server.configfile = getAbsolutePath(argv[1]);             zfree(server.exec_argv[1]);             server.exec_argv[1] = zstrdup(server.configfile);             j = 2; // Skip this arg when parsing options         }         while(j < argc) {             /* Either first or last argument - Should we read config from stdin? */             if (argv[j][0] == '-' && argv[j][1] == '\0' && (j == 1 || j == argc-1)) {                 config_from_stdin = 1;             }             /* All the other options are parsed and conceptually appended to the              * configuration file. For instance --port 6380 will generate the              * string "port 6380\n" to be parsed after the actual config file              * and stdin input are parsed (if they exist). */             else if (argv[j][0] == '-' && argv[j][1] == '-') {                 /* Option name */                 if (sdslen(options)) options = sdscat(options,"\n");                 options = sdscat(options,argv[j]+2);                 options = sdscat(options," ");             } else {                 /* Option argument */                 options = sdscatrepr(options,argv[j],strlen(argv[j]));                 options = sdscat(options," ");             }             j++;         }         // 读取配置文件         loadServerConfig(server.configfile, config_from_stdin, options);         if (server.sentinel_mode) loadSentinelConfigFromQueue();         sdsfree(options);     } 复制代码

读取配置文件

主要是读取 redis.conf 配置参数。

/* Load the server configuration from the specified filename.  * The function appends the additional configuration directives stored  * in the 'options' string to the config file before loading.  *  * Both filename and options can be NULL, in such a case are considered  * empty. This way loadServerConfig can be used to just load a file or  * just load a string. */ void loadServerConfig(char *filename, char config_from_stdin, char *options) {     sds config = sdsempty();     char buf[CONFIG_MAX_LINE+1];     FILE *fp;     /* Load the file content */     if (filename) {         if ((fp = fopen(filename,"r")) == NULL) {             serverLog(LL_WARNING,                     "Fatal error, can't open config file '%s': %s",                     filename, strerror(errno));             exit(1);         }         while(fgets(buf,CONFIG_MAX_LINE+1,fp) != NULL)             config = sdscat(config,buf);         fclose(fp);     }     /* Append content from stdin */     if (config_from_stdin) {         serverLog(LL_WARNING,"Reading config from stdin");         fp = stdin;         while(fgets(buf,CONFIG_MAX_LINE+1,fp) != NULL)             config = sdscat(config,buf);     }     /* Append the additional options */     if (options) {         config = sdscat(config,"\n");         config = sdscat(config,options);     }     loadServerConfigFromString(config);     sdsfree(config); } 复制代码

是否后台运行

int background = server.daemonize && !server.supervised; // 是否是后台运行 if (background) daemonize(); // 执行后台运行的方法 void daemonize(void) {     int fd;     if (fork() != 0) exit(0); /* parent exits */     setsid(); /* create a new session */     /* Every output goes to /dev/null. If Redis is daemonized but      * the 'logfile' is set to 'stdout' in the configuration file      * it will not log at all. */     if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {         dup2(fd, STDIN_FILENO);         dup2(fd, STDOUT_FILENO);         dup2(fd, STDERR_FILENO);         if (fd > STDERR_FILENO) close(fd);     } } 复制代码

这里需要说一下(经常面试问到),当然 Redis 中台进程,貌似常见的问题比较简单,我们看来纯正的后台进程:

int daemon_init(const char *pname, int facility) {         int                i;         pid_t        pid;         if ( (pid = Fork()) < 0) //创建一个子进程                 return (-1);         else if (pid)                 _exit(0);                        /* parent terminates 父进程退出*/         //子进程继续运行         //此进程是子进程,所以肯定不是组长进程         /* child 1 continues... */         if (setsid() < 0)                        /* become session leader 创建会话,成为会话首进程,新的进程组的组长进程*/                 return (-1);         Signal(SIGHUP, SIG_IGN); //把挂起信号设置为忽略         if ( (pid = Fork()) < 0) //再创建一个子进程                 return (-1);         else if (pid) //父进程退出                 _exit(0);                        /* child 1 terminates */         //第二个子进程继续运行,因为第二个子进程已经不是会话首进程了,所以永远不会获得控制终端         /* child 2 continues... */         daemon_proc = 1;                        /* for err_XXX() functions 再error.c中定义的变量*/         chdir("/");                                /* change working directory 调整工作目录到根目录 */         /* close off file descriptors */         for (i = 0; i < MAXFD; i++) //关闭所有文件描述符                 close(i);         /* redirect stdin, stdout, and stderr to /dev/null 定义标准输入,标准输出和标准错误到/dev/null */         open("/dev/null", O_RDONLY);         open("/dev/null", O_RDWR);         open("/dev/null", O_RDWR);         openlog(pname, LOG_PID, facility); //打开日志文件         return (0);                                /* success 函数运行成功 */ } 复制代码

  • setsid(): 确保当前进程编程新绘画的会话头进程组的进程组头进程,从而不在受终端控制。

  • 忽略 SIGHUP 信号之后再次 fork 目的是确保本守护京城来即使打开一个终端设备,也不会自动获取终端。当乜有控制终端的一个会话头进程打开一个控制终端时,该终端自动成为这个会话头进程的控制终端。然后再次 fork 之后,我们确保新的子进程不在是一个会话头进程,从而不能自动获取一控制器终端。这里必须要忽略 SIGHUP 信号,因为会话头进程(即首次 fork 产生的子进程)终止时,其会话中的所有进程(即再次 fork 的子进程)都会收到 SIGHUP 信号

  • 更改跟路径

  • 关闭继承来的所有套接字

  • 重定向 stdin ,stdout 和 stderr ,否则他会输出到屏幕上。

初始化服务

信号量

服务端打交道最多的是 SIGHUP 和 SIGPIPE,这两个信号默认行为是终止。而服务器随便就终止那是不行的,我么需要进行忽略。

    signal(SIGHUP, SIG_IGN);     signal(SIGPIPE, SIG_IGN); 复制代码

针对其他的信号,比如 SIGINT 信号,我们对其进行捕获后 redis 进行收尾工作,避免进程呗暴力 kill

void setupSignalHandlers(void) {     struct sigaction act;     /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.      * Otherwise, sa_handler is used. */     sigemptyset(&act.sa_mask);     act.sa_flags = 0;     act.sa_handler = sigShutdownHandler;     sigaction(SIGTERM, &act, NULL);     sigaction(SIGINT, &act, NULL);     sigemptyset(&act.sa_mask);     act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO;     act.sa_sigaction = sigsegvHandler;     if(server.crashlog_enabled) {         sigaction(SIGSEGV, &act, NULL);         sigaction(SIGBUS, &act, NULL);         sigaction(SIGFPE, &act, NULL);         sigaction(SIGILL, &act, NULL);         sigaction(SIGABRT, &act, NULL);     }     return; } 复制代码

syslog 设置

syslog 是 linux 系统自带的,主要是为了 daemon 进程提供日志服务,我们在前面讲过。deamon 进程无法打印到终端,那么如何方便的接受输出的日志呢?linux 提供了 syslog 服务,该服务支持打印各种级别的日志以及输出位置(本地或者远程均可)

    if (server.syslog_enabled) {         openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,             server.syslog_facility);     } 复制代码

其他常用参数设置

    /* Initialization after setting defaults from the config system. */     server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;     server.hz = server.config_hz;     server.pid = getpid();     server.in_fork_child = CHILD_TYPE_NONE;     server.main_thread_id = pthread_self();     server.current_client = NULL;     server.errors = raxNew();     server.fixed_time_expire = 0;     server.clients = listCreate();     server.clients_index = raxNew();     server.clients_to_close = listCreate();     server.slaves = listCreate();     server.monitors = listCreate();     server.clients_pending_write = listCreate();     server.clients_pending_read = listCreate();     server.clients_timeout_table = raxNew();     server.replication_allowed = 1;     server.slaveseldb = -1; /* Force to emit the first SELECT command. */     server.unblocked_clients = listCreate();     server.ready_keys = listCreate();     server.clients_waiting_acks = listCreate();     server.get_ack_from_slaves = 0;     server.client_pause_type = 0;     server.paused_clients = listCreate();     server.events_processed_while_blocked = 0;     server.system_memory_size = zmalloc_get_memory_size();     server.blocked_last_cron = 0;     server.blocking_op_nesting = 0; 复制代码

创建共享对象

redis 非常注重内存消耗的,有些常用的对象,采用引用计数的方式进行复用

    createSharedObjects();     /* Shared command responses */     shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n"));     shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));     shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n"));     shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n"));     shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n"));     shared.emptyarray = createObject(OBJ_STRING,sdsnew("*0\r\n"));     shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n"));     shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n"));     shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n"));     shared.space = createObject(OBJ_STRING,sdsnew(" "));     shared.colon = createObject(OBJ_STRING,sdsnew(":"));     shared.plus = createObject(OBJ_STRING,sdsnew("+")); 复制代码

根据系统限制调整你打开的文件数

    adjustOpenFilesLimit(); 复制代码

网络模型 reactor 初始化

redis 支持 Unix 和 tcp 两种模型,当服务端和客户端都在本机的时候, Unix 域套接字更快,因为不需要协议头解析等

    const char *clk_msg = monotonicInit();     serverLog(LL_NOTICE, "monotonic clock: %s", clk_msg);     server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);     if (server.el == NULL) {         serverLog(LL_WARNING,             "Failed creating the event loop. Error message: '%s'",             strerror(errno));         exit(1);     }     server.db = zmalloc(sizeof(redisDb)*server.dbnum);     /* Open the TCP listening socket for the user commands. */     if (server.port != 0 &&         listenToPort(server.port,&server.ipfd) == C_ERR) {         serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port);         exit(1);     }     if (server.tls_port != 0 &&         listenToPort(server.tls_port,&server.tlsfd) == C_ERR) {         serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.", server.tls_port);         exit(1);     }     /* Open the listening Unix domain socket. */     if (server.unixsocket != NULL) {         unlink(server.unixsocket); /* don't care if this fails */         server.sofd = anetUnixServer(server.neterr,server.unixsocket,             server.unixsocketperm, server.tcp_backlog);         if (server.sofd == ANET_ERR) {             serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);             exit(1);         }         anetNonBlock(NULL,server.sofd);         anetCloexec(server.sofd);     } 复制代码

LRU 策略中过期池初始化

/* Create a new eviction pool. */ void evictionPoolAlloc(void) {     struct evictionPoolEntry *ep;     int j;     ep = zmalloc(sizeof(*ep)*EVPOOL_SIZE);     for (j = 0; j < EVPOOL_SIZE; j++) {         ep[j].idle = 0;         ep[j].key = NULL;         ep[j].cached = sdsnewlen(NULL,EVPOOL_CACHED_SDS_SIZE);         ep[j].dbid = 0;     }     EvictionPoolLRU = ep; } 复制代码

初始化 rdb 和 aof 信息

    server.rdb_child_type = RDB_CHILD_TYPE_NONE;     server.rdb_pipe_conns = NULL;     server.rdb_pipe_numconns = 0;     server.rdb_pipe_numconns_writing = 0;     server.rdb_pipe_buff = NULL;     server.rdb_pipe_bufflen = 0;     server.rdb_bgsave_scheduled = 0;     server.child_info_pipe[0] = -1;     server.child_info_pipe[1] = -1;     server.child_info_nread = 0;     aofRewriteBufferReset();     server.aof_buf = sdsempty();     server.lastsave = time(NULL); /* At startup we consider the DB saved. */     server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */     server.rdb_save_time_last = -1;     server.rdb_save_time_start = -1;     server.dirty = 0; 复制代码

初始化状态信息

    resetServerStats();     /* A few stats we don't want to reset: server startup time, and peak mem. */     server.stat_starttime = time(NULL);     server.stat_peak_memory = 0;     server.stat_current_cow_bytes = 0;     server.stat_current_cow_updated = 0;     server.stat_current_save_keys_processed = 0;     server.stat_current_save_keys_total = 0;     server.stat_rdb_cow_bytes = 0;     server.stat_aof_cow_bytes = 0;     server.stat_module_cow_bytes = 0;     server.stat_module_progress = 0;     for (int j = 0; j < CLIENT_TYPE_COUNT; j++)         server.stat_clients_type_memory[j] = 0;     server.cron_malloc_stats.zmalloc_used = 0;     server.cron_malloc_stats.process_rss = 0;     server.cron_malloc_stats.allocator_allocated = 0;     server.cron_malloc_stats.allocator_active = 0;     server.cron_malloc_stats.allocator_resident = 0;     server.lastbgsave_status = C_OK;     server.aof_last_write_status = C_OK;     server.aof_last_write_errno = 0;     server.repl_good_slaves_count = 0; 复制代码

注册事件

网络编程中,包括三类事件:文件事件、定时时间和信号事件

    /* Create the timer callback, this is our way to process many background      * operations incrementally, like clients timeout, eviction of unaccessed      * expired keys and so forth. */     if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {         serverPanic("Can't create event loop timers.");         exit(1);     }     /* Create an event handler for accepting new connections in TCP and Unix      * domain sockets. */     if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {         serverPanic("Unrecoverable error creating TCP socket accept handler.");     }     if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) {         serverPanic("Unrecoverable error creating TLS socket accept handler.");     }     if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,         acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");     /* Register a readable event for the pipe used to awake the event loop      * when a blocked client in a module needs attention. */     if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,         moduleBlockedClientPipeReadable,NULL) == AE_ERR) {             serverPanic(                 "Error registering the readable event for the module "                 "blocked clients subsystem.");     } 复制代码

这里采用回调的方式来注册事件、结合 IO 复用技术时间高效的网络模型。

  • 复制初始化等等

  • 一些 lua 虚拟机脚本添加

慢日志初始化

/* Initialize the slow log. This function should be called a single time  * at server startup. */ void slowlogInit(void) {     server.slowlog = listCreate();     server.slowlog_entry_id = 0;     listSetFreeMethod(server.slowlog,slowlogFreeEntry); } 复制代码

后台任务线程创建

这里后台线程主要包含三类

  • 异步关闭文件描述符

  • AOF 异步刷盘

  • 过期键异步删除

/* Initialize the background system, spawning the thread. */ void bioInit(void) {     pthread_attr_t attr;     pthread_t thread;     size_t stacksize;     int j;     /* Initialization of state vars and objects */     for (j = 0; j < BIO_NUM_OPS; j++) {         pthread_mutex_init(&bio_mutex[j],NULL);         pthread_cond_init(&bio_newjob_cond[j],NULL);         pthread_cond_init(&bio_step_cond[j],NULL);         bio_jobs[j] = listCreate();         bio_pending[j] = 0;     }     /* Set the stack size as by default it may be small in some system */     pthread_attr_init(&attr);     pthread_attr_getstacksize(&attr,&stacksize);     if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */     while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;     pthread_attr_setstacksize(&attr, stacksize);     /* Ready to spawn our threads. We use the single argument the thread      * function accepts in order to pass the job ID the thread is      * responsible of. */     for (j = 0; j < BIO_NUM_OPS; j++) {         void *arg = (void*)(unsigned long) j;         if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {             serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");             exit(1);         }         bio_threads[j] = thread;     } } 复制代码

lua 脚本

其他

外部模块加载

void moduleLoadFromQueue(void) {     listIter li;     listNode *ln;     listRewind(server.loadmodule_queue,&li);     while((ln = listNext(&li))) {         struct moduleLoadQueueEntry *loadmod = ln->value;         if (moduleLoad(loadmod->path,(void **)loadmod->argv,loadmod->argc)             == C_ERR)         {             serverLog(LL_WARNING,                 "Can't load module from %s: server aborting",                 loadmod->path);             exit(1);         }     } 复制代码

磁盘加载数据

/* Function called at startup to load RDB or AOF file in memory. */ void loadDataFromDisk(void) {     long long start = ustime();     if (server.aof_state == AOF_ON) {         if (loadAppendOnlyFile(server.aof_filename) == C_OK)             serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);     } else {         rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;         errno = 0; /* Prevent a stale value from affecting error checking */         if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) {             serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",                 (float)(ustime()-start)/1000000);             /* Restore the replication ID / offset from the RDB file. */             if ((server.masterhost ||                 (server.cluster_enabled &&                 nodeIsSlave(server.cluster->myself))) &&                 rsi.repl_id_is_set &&                 rsi.repl_offset != -1 &&                 /* Note that older implementations may save a repl_stream_db                  * of -1 inside the RDB file in a wrong way, see more                  * information in function rdbPopulateSaveInfo. */                 rsi.repl_stream_db != -1)             {                 memcpy(server.replid,rsi.repl_id,sizeof(server.replid));                 server.master_repl_offset = rsi.repl_offset;                 /* If we are a slave, create a cached master from this                  * information, in order to allow partial resynchronizations                  * with masters. */                 replicationCacheMasterUsingMyself();                 selectDb(server.cached_master,rsi.repl_stream_db);             }         } else if (errno != ENOENT) {             serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));             exit(1);         }     } } 复制代码

事件循环前准备

    aeSetBeforeSleepProc(server.el,beforeSleep);     aeSetAfterSleepProc(server.el,afterSleep); 复制代码

beforeSleep

beforeSleep 包括一下几个步骤:

  • 运行 fast cycle 模式,进行过期处理

  • 向所有从节点发送 ack 请求

  • 接阻塞从节点

  • 处理阻塞的客户端

  • 将 AOF 刷盘

  • 处理挂起的请求

代码如下:

void beforeSleep(struct aeEventLoop *eventLoop) {     UNUSED(eventLoop);     size_t zmalloc_used = zmalloc_used_memory();     if (zmalloc_used > server.stat_peak_memory)         server.stat_peak_memory = zmalloc_used;     /* Just call a subset of vital functions in case we are re-entering      * the event loop from processEventsWhileBlocked(). Note that in this      * case we keep track of the number of events we are processing, since      * processEventsWhileBlocked() wants to stop ASAP if there are no longer      * events to handle. */     if (ProcessingEventsWhileBlocked) {         uint64_t processed = 0;         processed += handleClientsWithPendingReadsUsingThreads();         processed += tlsProcessPendingData();         processed += handleClientsWithPendingWrites();         processed += freeClientsInAsyncFreeQueue();         server.events_processed_while_blocked += processed;         return;     }     /* Handle precise timeouts of blocked clients. */     handleBlockedClientsTimeout();     /* We should handle pending reads clients ASAP after event loop. */     handleClientsWithPendingReadsUsingThreads();     /* Handle TLS pending data. (must be done before flushAppendOnlyFile) */     tlsProcessPendingData();     /* If tls still has pending unread data don't sleep at all. */     aeSetDontWait(server.el, tlsHasPendingData());     /* Call the Redis Cluster before sleep function. Note that this function      * may change the state of Redis Cluster (from ok to fail or vice versa),      * so it's a good idea to call it before serving the unblocked clients      * later in this function. */     if (server.cluster_enabled) clusterBeforeSleep();     /* Run a fast expire cycle (the called function will return      * ASAP if a fast cycle is not needed). */     if (server.active_expire_enabled && server.masterhost == NULL)         activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);     /* Unblock all the clients blocked for synchronous replication      * in WAIT. */     if (listLength(server.clients_waiting_acks))         processClientsWaitingReplicas();     /* Check if there are clients unblocked by modules that implement      * blocking commands. */     if (moduleCount()) moduleHandleBlockedClients();     /* Try to process pending commands for clients that were just unblocked. */     if (listLength(server.unblocked_clients))         processUnblockedClients();     /* Send all the slaves an ACK request if at least one client blocked      * during the previous event loop iteration. Note that we do this after      * processUnblockedClients(), so if there are multiple pipelined WAITs      * and the just unblocked WAIT gets blocked again, we don't have to wait      * a server cron cycle in absence of other event loop events. See #6623.      *       * We also don't send the ACKs while clients are paused, since it can      * increment the replication backlog, they'll be sent after the pause      * if we are still the master. */     if (server.get_ack_from_slaves && !checkClientPauseTimeoutAndReturnIfPaused()) {         robj *argv[3];         argv[0] = shared.replconf;         argv[1] = shared.getack;         argv[2] = shared.special_asterick; /* Not used argument. */         replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);         server.get_ack_from_slaves = 0;     }     /* We may have recieved updates from clients about their current offset. NOTE:      * this can't be done where the ACK is recieved since failover will disconnect       * our clients. */     updateFailoverStatus();     /* Send the invalidation messages to clients participating to the      * client side caching protocol in broadcasting (BCAST) mode. */     trackingBroadcastInvalidationMessages();     /* Write the AOF buffer on disk */     if (server.aof_state == AOF_ON)         flushAppendOnlyFile(0);     /* Handle writes with pending output buffers. */     handleClientsWithPendingWritesUsingThreads();     /* Close clients that need to be closed asynchronous */     freeClientsInAsyncFreeQueue();     /* Try to process blocked clients every once in while. Example: A module      * calls RM_SignalKeyAsReady from within a timer callback (So we don't      * visit processCommand() at all). */     handleClientsBlockedOnKeys();     /* Before we are going to sleep, let the threads access the dataset by      * releasing the GIL. Redis main thread will not touch anything at this      * time. */     if (moduleCount()) moduleReleaseGIL();     /* Do NOT add anything below moduleReleaseGIL !!! */ } 复制代码

进入事件主循环

void aeMain(aeEventLoop *eventLoop) {     eventLoop->stop = 0;     while (!eventLoop->stop) {         aeProcessEvents(eventLoop, AE_ALL_EVENTS|                                    AE_CALL_BEFORE_SLEEP|                                    AE_CALL_AFTER_SLEEP);     } } /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)     eventLoop->aftersleep(eventLoop); // 写:sendReplyToClient // 读:readQueryFromClient 复制代码

aeProcessEvents中包含网络时间以及定时事件,这两类时间通过 IO 复用很好的继承在一起。

请求整体流程

图片来源于网络

参考与引用

  • 《Redis 设计与实现》 黄健宏

  • 图片来源于网络


作者:心城以北
链接:https://juejin.cn/post/7072347362382839821


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐