阅读 186

Redis Server 连接管理

连接初始化

接收连接

Redis 服务支持以下几种连接方式:

  • TCP

  • TLS

  • Unix Socket

收到不同方式的连接请求之后,Redis 会调用启动时绑定的处理函数。TCP 连接使用 acceptTcpHandler 处理,TLS 连接使用 acceptTLSHandler 处理,Unix socket 连接使用 acceptUnixHandler 处理。这几个处理函数最终都会调用通用函数 acceptCommonHandler

acceptCommonHandler 步骤

  1. 检查连接数,如果超过了配置的最大连接数会拒绝请求,并向客户端发送错误信息。最大连接数可以在 redis.conf 的 maxclients 参数修改。

  2. 创建客户端对象

  3. 保存 flag 参数,主要用于之后区分使用 Unix Socket 连接的客户端

  4. 调用客户端连接请求处理函数 clientAcceptHandler,主要用于处理默认保护模式下,拒绝处理没有设置密码的外部设备(非 localhost)连接请求。

// networking.c static void acceptCommonHandler(connection *conn, int flags, char *ip) {     client *c;     UNUSED(ip);     // 1. 连接数检查     if (listLength(server.clients) >= server.maxclients) {         // 错误处理 // ...         return;     } // 2. 创建客户端对象     if ((c = createClient(conn)) == NULL) {         // 错误处理 // ...  return;     } // 3. 保存接收连接的相关参数     c->flags |= flags;     // 4. 调用 clientAcceptHandler 处理客户端连接请求     if (connAccept(conn, clientAcceptHandler) == C_ERR) {         // 错误处理 // ...         return;     } } 复制代码

创建客户端对象

client *createClient(connection *conn) {     client *c = zmalloc(sizeof(client)); // 连接初始化     if (conn) { // 1. 将连接设为非阻塞模式         connNonBlock(conn); // 2. 禁用 nagel 算法         connEnableTcpNoDelay(conn); // 3. 设置 TCP keepalive         if (server.tcpkeepalive)             connKeepAlive(conn,server.tcpkeepalive); // 4. 设置请求处理函数         connSetReadHandler(conn, readQueryFromClient); // 5. 让 conn->private_data 指向 client 对象         connSetPrivateData(conn, c);     } ... 复制代码

步骤:

  1. 将连接设为非阻塞模式。

    https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/59e60246228f4a4db9fd67bc3cbaf2e7~tplv-k3u1fbpfcp-zoom-1.image

    如果程序 A 不断调用 send 将数据将数据拷贝到内核缓冲区,而应用程序 B 不调用 recv ,则 B 的内核缓冲区被填满后 A 的内核缓冲也会被填满,此时 A 继续调用 send 函数结果与 socket 模式有关

    • 阻塞模式:继续调用 send/recv 时会阻塞在调用处

    • 非阻塞模式:立即出错并退出,得到错误码 EWOULDBLOCK 或 EAGAIN

  2. 设置 TCP_NODELAY,禁用 nagle 算法,存放到内核缓冲区中的数据会立即发出。否则如果一次放到内核缓冲区中的数据数据包太小,则会在多个小的数据包凑成一个足够大的数据包后才会将数据发出。

  3. 设置 TCP keepalive,作用如下:

    1. 检测因服务停止、网络波动、宕机、应用重启等原因挂掉的连接

    2. 防止因为网络不活动而断连(使用NAT代理或者防火墙的时候,经常会出现这种问题)

    3. TCP层面的心跳检测

  4. 将请求处理函数设为 readQueryFromClient ,用于解析和处理客户端发来的请求命令。

  5. conn->private_data 指向 client 对象,使 client 对象与 conn 对象相互引用

 // 初始化 client 属性     // ... //  // 6. 保存 client 对象 if (conn) linkClient(c); // 7. 初始化 MULTI/EXEC 相关的参数     initClientMultiState(c);     return c; } 复制代码

  1. linkClient  保存 client 对象

    反转 client id 字节序时使用 memrev64 函数,先将 64 位的 unsigned int 转换成 char*,然后在 char* 内部交换字符的顺序:

    void memrev64(void *p) {     unsigned char *x = p, t;     t = x[0];     x[0] = x[7];     x[7] = t;     t = x[1];     x[1] = x[6];     x[6] = t;     t = x[2];     x[2] = x[5];     x[5] = t;     t = x[3];     x[3] = x[4];     x[4] = t; } 复制代码

    对于长整型数据的映射,利用基数树可以根据一个长整型快速查找到其对应的对象指针。避免了使用 hash 映射 hash 函数难以设计,不恰当的 hash 函数可能增大冲突,或浪费空间。

    https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/04cb1ab60dbd41deb14c43e4c6ae4553~tplv-k3u1fbpfcp-zoom-1.image

    1. 将 client 对象存到双向链表 server.clients 尾部的节点

    2. server.clients 尾部节点保存到 client 对象的 client_list_node 字段

    3. 反转 client id 字节序,将转换后的 id 作为 key,client 对象作为 value,保存到基数树 server.clients_index。当后续需要通过 id 获取 client 对象时会(例如 CLIENT UNBLOCK 命令)从基数树中查询。

基数树可视化工具:www.cs.usfca.edu/~galles/vis…

执行 createClient 命令时支持传入 NULL,是因为 redis 中所有命令的执行都依赖一个 client 上下文,但是在 Lua 解释器中执行脚本等情况下并没有活跃的连接,因此需要用到 conn 为 NULL 的 client。

conn 为 NULL 的 client 不会被添加到 server.clientsserver.clients_index

事件处理

读事件

当一个新的客户端连接到服务器时, 服务器会给为该客户端绑定读事件, 直到客户端断开连接之后, 这个读事件才会被移除。

  • 当客户端只是连接到服务器,但并没有向服务器发送命令时,读事件就处于等待状态

  • 当客户端给服务器发送命令请求,并且请求已到达时,该客户端的读事件处于就绪状态

写事件

当服务器有命令结果要传回给客户端时, 会为客户端关联写事件, 在命令结果传送完毕之后, 客户端和写事件的关联就会被移除。

  • 当服务器有命令结果需要返回给客户端,但客户端还未能执行无阻塞写,那么写事件处于等待状态

  • 当服务器有命令结果需要返回给客户端,并且客户端可以进行无阻塞写,那么写事件处于就绪状态

当出现读事件和写事件同时就绪的情况时, 优先处理读事件

client 对象与事件循环

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/138e697f0c6b429d9b315ae6f1ff1fa3~tplv-k3u1fbpfcp-zoom-1.image

Redis server 启动时,会在全局对象 aeEventLoop 中使用 eventsfired 两个字段保存了事件相关的对象:

另外当事件的到来,就将所有就绪的事件从内核事件表中复制到 apidata→events

events, fired, apidata→events 数组的大小相同,下标是 clientfd

当 clientfd 的读事件触发后,redis server 执行 connSocketSetReadHandler 函数,然后触发 aeCreateFileEvent 函数

// 读事件调用 aeCreateFileEvent 时参数依次为  // * 全局 eventLoop 对象 // * client fd // * AE_READABLE // * connSocketEventHandler 函数 // * connection 对象 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,         aeFileProc *proc, void *clientData) {     // ... // 读取 redis server 初始化时预留的空 aeFileEvent     aeFileEvent *fe = &eventLoop->events[fd]; // aeApiAddEvent 会根据 OS 分别调用 select, epoll, kqueue, evport 4 种实现 // 其中会生成一个 epoll_event 对象 ee,将 EPOLLIN 添加到 ee.events,并将 clientfd  // 添加到 ee.data.fd     if (aeApiAddEvent(eventLoop, fd, mask) == -1)         return AE_ERR;     fe->mask |= mask; // 设置读事件的回调函数     if (mask & AE_READABLE) fe->rfileProc = proc;     if (mask & AE_WRITABLE) fe->wfileProc = proc;     fe->clientData = clientData;     if (fd > eventLoop->maxfd)         eventLoop->maxfd = fd;     return AE_OK; } 复制代码

epoll 的 aeApiAddEvent  实现如下:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {     aeApiState *state = eventLoop->apidata; // 判断是新增还是修改     int op = eventLoop->events[fd].mask == AE_NONE ?             EPOLL_CTL_ADD : EPOLL_CTL_MOD;     ee.events = 0; // 保留原有的 mask     mask |= eventLoop->events[fd].mask;     if (mask & AE_READABLE) ee.events |= EPOLLIN;     if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; // 将 clientfd 添加到 epoll_event 的 data     ee.data.fd = fd;     if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;     return 0; } 复制代码

事件循环中aeApiPoll 调用 epoll_waitepoll_wait 将触发的事件复制到 apidate->events ,然后由 aeApiPoll 中的逻辑将本次触发事件的序号作为数组下标,将 fd、事件掩码记录到 eventLoop->fired 数组对应的位置上。

aeProcessEvents 在 aeApiPoll 返回后遍历 eventLoop->fired 数组,取出有效的数组元素,得到有事件的 fd 和事件掩码 mask

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {     // tvp 是后续定时任务允许等待的最大事件 aeApiState *state = eventLoop->apidata;     int retval, numevents = 0; // epoll_wait 将所有就绪的事件从内核事件表中复制到 state->events      retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,             tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);     if (retval > 0) {         int j;         numevents = retval; // 将触发的事件复制到 eventLoop->fired         for (j = 0; j < numevents; j++) {             int mask = 0;             struct epoll_event *e = state->events+j;             if (e->events & EPOLLIN) mask |= AE_READABLE;             if (e->events & EPOLLOUT) mask |= AE_WRITABLE;             if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;             if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;             eventLoop->fired[j].fd = e->data.fd;             eventLoop->fired[j].mask = mask;         }     }     return numevents; } 复制代码

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0fd48f5b01d64dc5be888360ae7dfdf9~tplv-k3u1fbpfcp-zoom-1.image

clientfd 为下标从 eventLoop->events 中取出 aeFileEvent 对象,然后通过 aeFileEvent 的 clientData 取出 connection 对象,进而通过 connection 对象的 private_data 得到 client 对象,用于后续处理。


作者:Tsonglew
链接:https://juejin.cn/post/7034550365852794917


文章分类
代码人生
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐