首页
关于
友联
Search
1
CTF竞赛权威指南(PWN篇)下载地址
3,268 阅读
2
操作系统实现-异常
1,011 阅读
3
redis服务器
942 阅读
4
集群聊天服务器-一对一聊天
877 阅读
5
ret2shellcode
802 阅读
操作系统
Redis
PWN
muduo
MongoDB
集群聊天服务器
互联网面试
Go
旅途人生
登录
Search
一个爱代码的coder
累计撰写
38
篇文章
累计收到
113
条评论
首页
栏目
操作系统
Redis
PWN
muduo
MongoDB
集群聊天服务器
互联网面试
Go
旅途人生
页面
关于
友联
搜索到
4
篇与
Redis
的结果
2022-05-08
redis数据库
博客网址:www.shicoder.top微信:kj11011029欢迎加群聊天 :452380935这一次主要是接着redis服务器接着进行代码讲解,因为redis服务器中包含大量的数据库,因为redis也对每个数据库设计了结构体redis数据库在上面redisServer中,有一个数组redisDb *db,这个数组中就是存放的是该服务器所有的数据库,redisDb就是数据库字段,redisServer中的dbnum就是该数组的大小 // redis服务器中每一个数据库都是这样一个实例 typedef struct redisDb { // 数据库键空间,保存着数据库中的所有键值对 dict *dict; // 键的过期时间,字典的键为键,字典的值为过期事件 UNIX 时间戳 dict *expires; // 正处于阻塞状态的键 dict *blocking_keys; // 可以解除阻塞的键 dict *ready_keys; // 正在被 WATCH 命令监视的键 dict *watched_keys; struct evictionPoolEntry *eviction_pool; /* Eviction pool of keys */ // 数据库号码 int id; // 数据库的键的平均 TTL ,统计信息 long long avg_ttl; } redisDb;过期策略在redisDb中,有一个字段为键的过期时间,因此针对过期的键,redis有一套自己的过期策略,下面进行讲解:定时删除:在设置键过期时间的同时,创建一个定时器惰性删除:每次要用这个键的时候,先检查是否过期定期删除:每隔一段时间,对数据库键进行扫描,删除部分过期键redis是使用惰性删除和定期删除配合实现的过期策略惰性删除代码为db.c/expireIfNeeded,每次执行命令前都执行该函数/* * 检查 key 是否已经过期,如果是的话,将它从数据库中删除。 * * 返回 0 表示键没有过期时间,或者键未过期。 * * 返回 1 表示键已经因为过期而被删除了。 * * 惰性删除 所有读写数据库的命令在执行前都会进行检查 * */ int expireIfNeeded(redisDb *db, robj *key) { // 取出键的过期时间 mstime_t when = getExpire(db,key); mstime_t now; // 没有过期时间 if (when < 0) return 0; // 如果服务器正在进行载入,那么不进行任何过期检查 if (server.loading) return 0; /* If we are in the context of a Lua script, we claim that time is * blocked to when the Lua script started. This way a key can expire * only the first time it is accessed and not in the middle of the * script execution, making propagation to slaves / AOF consistent. * See issue #1525 on Github for more information. */ now = server.lua_caller ? server.lua_time_start : mstime(); // 当服务器运行在 replication 模式时 // 附属节点并不主动删除 key // 它只返回一个逻辑上正确的返回值 // 真正的删除操作要等待主节点发来删除命令时才执行 // 从而保证数据的同步 if (server.masterhost != NULL) return now > when; // 运行到这里,表示键带有过期时间,并且服务器为主节点 // 如果未过期,返回 0 if (now <= when) return 0; /* Delete the key */ server.stat_expiredkeys++; // 向 AOF 文件和附属节点传播过期信息 propagateExpire(db,key); // 发送事件通知 notifyKeyspaceEvent(REDIS_NOTIFY_EXPIRED, "expired",key,db->id); // 将过期键从数据库中删除 return dbDelete(db,key); }定期删除代码为redis.c/activeExpireCycle,每当redis周期性执行redis.c/serverCron时候,就会调用该函数,它在规定的时间内,遍历各个数据库,随机检查一部分键,若过期则删除/* * 函数尝试删除数据库中已经过期的键。 * 当带有过期时间的键比较少时,函数运行得比较保守, * 如果带有过期时间的键比较多,那么函数会以更积极的方式来删除过期键, * 从而可能地释放被过期键占用的内存。 * * * 每次循环中被测试的数据库数目不会超过 REDIS_DBCRON_DBS_PER_CALL 。 * * * 如果 timelimit_exit 为真,那么说明还有更多删除工作要做, * 那么在 beforeSleep() 函数调用时,程序会再次执行这个函数。 * * * 过期循环的类型: * * * 如果循环的类型为 ACTIVE_EXPIRE_CYCLE_FAST , * 那么函数会以“快速过期”模式执行, * 执行的时间不会长过 EXPIRE_FAST_CYCLE_DURATION 毫秒, * 并且在 EXPIRE_FAST_CYCLE_DURATION 毫秒之内不会再重新执行。 * * 如果循环的类型为 ACTIVE_EXPIRE_CYCLE_SLOW , * 那么函数会以“正常过期”模式执行, * 函数的执行时限为 REDIS_HS 常量的一个百分比, * 这个百分比由 REDIS_EXPIRELOOKUPS_TIME_PERC 定义。 * * 定期删除 服务器周期性执行redis.c/serverConn时候会执行该函数 * * 函数执行大概流程: * * 1、函数每次运行时,都从一定数量的数据库中取出一定数量的随机键进行检查,并删除其中的过期键。 * * 2、全局变量current_db会记录当前activeExpireCycle函数检查的进度, * 并在下一次activeExpireCycle函数调用时,接着上一次的进度进行处理。 * 比如说,如果当前activeExpirecycle函数在遍历10号数据库时返回了, * 那么下次activeExpirecycle函数执行时,将从11号数据库开始查找并删除过期键。 * * 3、随着activeExpireCycle函数的不断执行,服务器中的所有数据库都会被检查一遍, * 这时函数将current_db变量重置为0,然后再次开始新一轮的检查工作。 */ void activeExpireCycle(int type) { // 静态变量,用来累积函数连续执行时的数据 // 表明目前检测到哪个数据库了 static unsigned int current_db = 0; static int timelimit_exit = 0; static long long last_fast_cycle = 0; unsigned int j, iteration = 0; // 默认每次处理的数据库数量 unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL; // 函数开始的时间 long long start = ustime(), timelimit; // 快速模式 if (type == ACTIVE_EXPIRE_CYCLE_FAST) { // 如果上次函数没有触发 timelimit_exit ,那么不执行处理 if (!timelimit_exit) return; // 如果距离上次执行未够一定时间,那么不执行处理 if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return; // 运行到这里,说明执行快速处理,记录当前时间 last_fast_cycle = start; } /* * 一般情况下,函数只处理 REDIS_DBCRON_DBS_PER_CALL 个数据库, * 除非: * * 1) 当前数据库的数量小于 REDIS_DBCRON_DBS_PER_CALL * 2) 如果上次处理遇到了时间上限,那么这次需要对所有数据库进行扫描, * 这可以避免过多的过期键占用空间 */ if (dbs_per_call > server.dbnum || timelimit_exit) dbs_per_call = server.dbnum; /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time * per iteration. Since this function gets called with a frequency of * server.hz times per second, the following is the max amount of * microseconds we can spend in this function. */ // 函数处理的微秒时间上限 // ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 默认为 25 ,也即是 25 % 的 CPU 时间 timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100; timelimit_exit = 0; if (timelimit <= 0) timelimit = 1; // 如果是运行在快速模式之下 // 那么最多只能运行 FAST_DURATION 微秒 // 默认值为 1000 (微秒) if (type == ACTIVE_EXPIRE_CYCLE_FAST) timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */ // 遍历数据库 for (j = 0; j < dbs_per_call; j++) { int expired; // 指向要处理的数据库 redisDb *db = server.db+(current_db % server.dbnum); // 为 DB 计数器加一,如果进入 do 循环之后因为超时而跳出 // 那么下次会直接从下个 DB 开始处理 current_db++; /* Continue to expire if at the end of the cycle more than 25% * of the keys were expired. */ do { unsigned long num, slots; long long now, ttl_sum; int ttl_samples; // 获取数据库中带过期时间的键的数量 // 如果该数量为 0 ,直接跳过这个数据库 if ((num = dictSize(db->expires)) == 0) { db->avg_ttl = 0; break; } // 获取数据库中键值对的数量 slots = dictSlots(db->expires); // 当前时间 now = mstime(); // 这个数据库的使用率低于 1% ,扫描起来太费力了(大部分都会 MISS) // 跳过,等待字典收缩程序运行 if (num && slots > DICT_HT_INITIAL_SIZE && (num*100/slots < 1)) break; /* The main collection cycle. Sample random keys among keys * with an expire set, checking for expired ones. * * 样本计数器 */ // 已处理过期键计数器 expired = 0; // 键的总 TTL 计数器 ttl_sum = 0; // 总共处理的键计数器 ttl_samples = 0; // 每次最多只能检查 LOOKUPS_PER_LOOP 个键 // 默认每个数据库检查的键数量 if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP; // 开始遍历数据库 while (num--) { dictEntry *de; long long ttl; // 从 expires 中随机取出一个带过期时间的键 if ((de = dictGetRandomKey(db->expires)) == NULL) break; // 计算 TTL ttl = dictGetSignedIntegerVal(de)-now; // 如果键已经过期,那么删除它,并将 expired 计数器增一 if (activeExpireCycleTryExpire(db,de,now)) expired++; if (ttl < 0) ttl = 0; // 累积键的 TTL ttl_sum += ttl; // 累积处理键的个数 ttl_samples++; } // 为这个数据库更新平均 TTL 统计数据 if (ttl_samples) { // 计算当前平均值 long long avg_ttl = ttl_sum/ttl_samples; // 如果这是第一次设置数据库平均 TTL ,那么进行初始化 if (db->avg_ttl == 0) db->avg_ttl = avg_ttl; // 取数据库的上次平均 TTL 和今次平均 TTL 的平均值 db->avg_ttl = (db->avg_ttl+avg_ttl)/2; } // 更新遍历次数 iteration++; // 每遍历 16 次执行一次 if ((iteration & 0xf) == 0 && (ustime()-start) > timelimit) { // 如果遍历次数正好是 16 的倍数 // 并且遍历的时间超过了 timelimit // 那么断开 timelimit_exit timelimit_exit = 1; } // 已经超时了,返回 if (timelimit_exit) return; // 如果已删除的过期键占当前总数据库带过期时间的键数量的 25 % // 那么不再遍历 } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4); } }redis客户端因为redis可以和多个客户端进行连接,因此为了区分每个客户端,redis内部为每个连接客户端创建一个结构体redisClient,然后将多个结构体用链表连接在一起typedef struct redisClient { // 套接字描述符 int fd; // 当前正在使用的数据库 使用select可以切换数据库,因为服务器刚开始创建了16个 redisDb *db; // 当前正在使用的数据库的 id (号码) int dictid; // 客户端的名字 robj *name; // 查询缓冲区 sds querybuf; // 查询缓冲区长度峰值 size_t querybuf_peak; // 参数数量 int argc; // 参数对象数组 robj **argv; // 记录被客户端执行的命令 struct redisCommand *cmd, *lastcmd; // 请求的类型:内联命令还是多条命令 int reqtype; // 剩余未读取的命令内容数量 int multibulklen; // 命令内容的长度 long bulklen; // 回复链表 list *reply; // 回复链表中对象的总大小 unsigned long reply_bytes; / // 已发送字节,处理 short write 用 int sentlen; // 创建客户端的时间 time_t ctime; // 客户端最后一次和服务器互动的时间 time_t lastinteraction; // 客户端的输出缓冲区超过软性限制的时间 time_t obuf_soft_limit_reached_time; // 客户端状态标志 int flags; // 当 server.requirepass 不为 NULL 时 // 代表认证的状态 // 0 代表未认证, 1 代表已认证 int authenticated; // 复制状态 int replstate; // 用于保存主服务器传来的 RDB 文件的文件描述符 int repldbfd; // 读取主服务器传来的 RDB 文件的偏移量 off_t repldboff; // 主服务器传来的 RDB 文件的大小 off_t repldbsize; sds replpreamble; // 主服务器的复制偏移量 long long reploff; // 从服务器最后一次发送 REPLCONF ACK 时的偏移量 long long repl_ack_off; // 从服务器最后一次发送 REPLCONF ACK 的时间 long long repl_ack_time; // 主服务器的 master run ID // 保存在客户端,用于执行部分重同步 char replrunid[REDIS_RUN_ID_SIZE+1]; // 从服务器的监听端口号 int slave_listening_port; // 事务状态 multiState mstate; // 阻塞类型 int btype; // 阻塞状态 blockingState bpop; // 最后被写入的全局复制偏移量 long long woff; // 被监视的键 list *watched_keys; // 这个字典记录了客户端所有订阅的频道 // 键为频道名字,值为 NULL // 也即是,一个频道的集合 dict *pubsub_channels; // 链表,包含多个 pubsubPattern 结构 // 记录了所有订阅频道的客户端的信息 // 新 pubsubPattern 结构总是被添加到表尾 list *pubsub_patterns; sds peerid; // 回复偏移量 int bufpos; // 回复缓冲区 char buf[REDIS_REPLY_CHUNK_BYTES]; } redisClient;
2022年05月08日
134 阅读
0 评论
0 点赞
2022-05-08
redis数据结构
博客网址:www.shicoder.top微信:kj11011029欢迎加群聊天 :452380935引言从本次开始,对Redis设计与实现进行阅读及相关读书笔记的记录。Redis版本为3.0数据结构简单动态字符串SDSsds数据结构位于sds.h/sdshdr/* * 保存字符串对象的结构 */ struct sdshdr { // buf 中已占用空间的长度 int len; // buf 中剩余可用空间的长度 int free; // 数据空间 char buf[]; };相对于C语言的字符串,SDS的优点在于常数复杂度获取字符串长度杜绝缓冲区溢出减少修改字符串所带来的内存重新分配(注意,释放空间时候,不会真的释放,而是设置free的值)链表链表的相关代码在adlist.h中链表节点listNode/* * 双端链表节点 */ typedef struct listNode { // 前置节点 struct listNode *prev; // 后置节点 struct listNode *next; // 节点的值 void *value; } listNode;由多个listNode组成的双端链表链表结构list/* * 双端链表结构 */ typedef struct list { // 表头节点 listNode *head; // 表尾节点 listNode *tail; // 节点值复制函数 void *(*dup)(void *ptr); // 节点值释放函数 void (*free)(void *ptr); // 节点值对比函数 int (*match)(void *ptr, void *key); // 链表所包含的节点数量 unsigned long len; } list;字典redis中的字典使用哈希表实现,其代码在dict.h中哈希表结构dictht/* * 哈希表 * * 每个字典都使用两个哈希表,从而实现渐进式 rehash 。 */ typedef struct dictht { // 哈希表数组 dictEntry **table; // 哈希表大小 unsigned long size; // 哈希表大小掩码,用于计算索引值 // 总是等于 size - 1 比如7号,当计算索引时候, 7&sizemask就可以得到 unsigned long sizemask; // 该哈希表已有节点的数量 unsigned long used; } dictht;其中dictEntry为一个键值对/* * 哈希表节点 */ typedef struct dictEntry { // 键 void *key // 值 union { void *val; uint64_t u64; int64_t s64; } v; // 指向下个哈希表节点,形成链表 表明是一个链地址法解决哈希冲突 struct dictEntry *next; } dictEntry;下面为了形象表示一个哈希表,给出一个例子下面给出一个多个dictEntry连接的哈希表最终Redis中的字典数据结构如下/* * 字典 */ typedef struct dict { // 类型特定函数 dictType *type; // 私有数据 void *privdata; // 哈希表 dictht ht[2]; // rehash 索引 // 当 rehash 不在进行时,值为 -1 int rehashidx; /* rehashing not in progress if rehashidx == -1 */ // 目前正在运行的安全迭代器的数量 int iterators; /* number of iterators currently running */ } dict; /* * 字典类型特定函数 */ typedef struct dictType { // 计算哈希值的函数 redis默认的函数算法为murmurhash2 unsigned int (*hashFunction)(const void *key); // 复制键的函数 void *(*keyDup)(void *privdata, const void *key); // 复制值的函数 void *(*valDup)(void *privdata, const void *obj); // 对比键的函数 int (*keyCompare)(void *privdata, const void *key1, const void *key2); // 销毁键的函数 void (*keyDestructor)(void *privdata, void *key); // 销毁值的函数 void (*valDestructor)(void *privdata, void *obj); } dictType;跳跃表redis中的跳跃表结构代码为redis.h/zskiplistNode和redis.h/zskiplist/* ZSETs use a specialized version of Skiplists */ /* * 跳跃表节点 */ typedef struct zskiplistNode { // 成员对象 robj *obj; // 分值 注意redis跳跃表按照节点从小到大排列 double score; // 后退指针 struct zskiplistNode *backward; // 层 数组大小按照幂次定律(越大的数出现概率越小)1-32随机数字 struct zskiplistLevel { // 前进指针 struct zskiplistNode *forward; // 跨度 unsigned int span; } level[]; } zskiplistNode; /* * 跳跃表 */ typedef struct zskiplist { // 表头节点和表尾节点 struct zskiplistNode *header, *tail; // 表中节点的数量 unsigned long length; // 表中层数最大的节点的层数 int level; } zskiplist;下面给出一个简单的跳跃表例子前进指针用于遍历跳跃表,下面的虚线为遍历过程整数集合 intset当一个集合里面只有整数值元素时候,且元素数量不超过REDIS_SET_MAX_INTSET_ENTRIES时候,集合底层采用整数集合#define REDIS_SET_MAX_INTSET_ENTRIES 512 /*集合中元素个数小于该值,set底层使用intset*/redis中整数集合代码位于intset.h/intsettypedef struct intset { // 编码方式 uint32_t encoding; // 集合包含的元素数量 uint32_t length; // 保存元素的数组 按照从小到大的顺序,且不重复 int8_t contents[]; } intset;contents数组虽然是int8_t,但是里面存放的数据的真实类型由encoding字段决定升级操作假如往下面的整数集合中append类型为int32的65535,则会发生升级,升级的过程主要包括将每个元素所占空间进行扩充,然后设置encoding,升级完后为降级操作注意整数集合无法进行降级,升级之后,会一直持续该编码压缩列表 ziplist压缩列表其实就是一块连续内存,一个压缩列表包括多个节点(entry),每个entry保存一个字节数组或者整数值。在redis源码中, 压缩列表没有数据结构代码定义,压缩列表是一种通过内存特殊编码方式实现的数据结构。他是通过定义一些基地址,然后使用偏移量来定义ziplist,其中大量使用了宏函数定义/* * ziplist 属性宏 */ // 定位到 ziplist 的 bytes 属性,该属性记录了整个 ziplist 所占用的内存字节数 // 用于取出 bytes 属性的现有值,或者为 bytes 属性赋予新值 #define ZIPLIST_BYTES(zl) (*((uint32_t*)(zl))) // 定位到 ziplist 的 offset 属性,该属性记录了到达表尾节点的偏移量 // 用于取出 offset 属性的现有值,或者为 offset 属性赋予新值 #define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t)))) // 定位到 ziplist 的 length 属性,该属性记录了 ziplist 包含的节点数量 // 用于取出 length 属性的现有值,或者为 length 属性赋予新值 #define ZIPLIST_LENGTH(zl) (*((uint16_t*)((zl)+sizeof(uint32_t)*2))) // 返回 ziplist 表头的大小 #define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t)) // 返回指向 ziplist 第一个节点(的起始位置)的指针 #define ZIPLIST_ENTRY_HEAD(zl) ((zl)+ZIPLIST_HEADER_SIZE) // 返回指向 ziplist 最后一个节点(的起始位置)的指针 #define ZIPLIST_ENTRY_TAIL(zl) ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) // 返回指向 ziplist 末端 ZIP_END (的起始位置)的指针 #define ZIPLIST_ENTRY_END(zl) ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)其中,redis对entry使用了数据结构描述,如下代码ziplist.c/zlentry/* * 保存 ziplist 节点信息的结构 */ typedef struct zlentry { // prevrawlen :前置节点的长度 // prevrawlensize :编码 prevrawlen 所需的字节大小 unsigned int prevrawlensize, prevrawlen; // len :当前节点值的长度 // lensize :编码 len 所需的字节大小 unsigned int lensize, len; // 当前节点 header 的大小 // 等于 prevrawlensize + lensize unsigned int headersize; // 当前节点值所使用的编码类型 unsigned char encoding; // 指向当前节点的指针 unsigned char *p; } zlentry;ziplist的创建/* Create a new empty ziplist. * * 创建并返回一个新的 ziplist * * T = O(1) */ unsigned char *ziplistNew(void) { // ZIPLIST_HEADER_SIZE 是 ziplist 表头的大小 // 1 字节是表末端 ZIP_END 的大小 unsigned int bytes = ZIPLIST_HEADER_SIZE+1; // 为表头和表末端分配空间 unsigned char *zl = zmalloc(bytes); // 初始化表属性 ZIPLIST_BYTES(zl) = intrev32ifbe(bytes); ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE); ZIPLIST_LENGTH(zl) = 0; // 设置表末端 zl[bytes-1] = ZIP_END; return zl; }由于压缩列表主要就是为了节约内存,因此对于不同的数据,其编码方式不一样,前面我们已经知道,entry中主要放字节数组和整数,下表给出两种数据不同长度时候的编码 字节数组编码 整数编码 对象本次首先对Redis的相关数据结构进行介绍。Redis对象主要分为5种:REDIS_STRING、REDIS_LIST、REDIS_HASH、REDIS_SET、REDIS_ZSET。下面首先给出Redis中对对象的代码表示// 对象类型 #define REDIS_STRING 0 #define REDIS_LIST 1 #define REDIS_SET 2 #define REDIS_ZSET 3 #define REDIS_HASH 4#define REDIS_LRU_BITS 24 #define REDIS_LRU_CLOCK_MAX ((1<<REDIS_LRU_BITS)-1) /* Max value of obj->lru */ #define REDIS_LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */ typedef struct redisObject { // 类型 类型说明符 位域名:位域长度 标识type占4个二进制位 因为有可能不需要一个完整的字节 // 1个字节8位 unsigned type:4; // 编码 unsigned encoding:4; // 对象最后一次被访问的时间 unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */ // 引用计数 int refcount; // 指向实际值的指针 void *ptr; } robj;首先看到有2个字段,为类型和编码,类型就是redis的5种类型,编码就是这个类型底层是用什么编码方式实现但实际上,Redis的内部并不只是这5种对象,对于上面5种对象,都有几种底层实现方式,下面给出各数据结构底层实现的对应方式REDIS_STRING REDIS_STRING表示redis中的字符串类型,其底层由以下三种实现方式REDIS_ENCODING_INT如果一个字符串对象保存的是整数值,且这个整数值可以用long类型表示,则字符串对象会奖整数值保存在字符串对象的ptr属性中,此时会将ptr的void*转换为long127.0.0.1:6379> set number "1" OK 127.0.0.1:6379> object encoding number "int"REDIS_ENCODING_RAW 如果字符串保存的是一个字符串值,且长度大于32字节,redis的字符串对象就会采用简单动态字符串(SDS)实现127.0.0.1:6379> set longstr "Hello, my name is Shi Linkun, is a programmer who loves code, I hope that each blog can let myself consolidate their knowledge, but also let everyone get a little knowledge, thank you" OK 127.0.0.1:6379> object encoding longstr "raw"这里先不对SDS进行详细简介,后续单独对其进行描述REDIS_ENCODING_EMBSTR如果字符串对象保存的是一个字符串,且长度小于等于32字节,则使用embstr编码实现127.0.0.1:6379> set story "hello my name is shilinkun" OK 127.0.0.1:6379> object encoding story "embstr"注意redis3.0版本中实际间隔为39字节#define REDIS_ENCODING_EMBSTR_SIZE_LIMIT 39 robj *createStringObject(char *ptr, size_t len) { if (len <= REDIS_ENCODING_EMBSTR_SIZE_LIMIT) return createEmbeddedStringObject(ptr,len); else return createRawStringObject(ptr,len); }为什么是39字节,这里参考这个知乎的解释embstr是一块连续的内存区域,由redisObject和sdshdr组成。其中redisObject占16个字节,当buf内的字符串长度是39时,sdshdr的大小为8+39+1=48,那一个字节是'\0'。加起来刚好64。是不是发现了什么?typedef struct redisObject { unsigned type:4; unsigned encoding:4; unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */ int refcount; void *ptr; } robj; struct sdshdr { unsigned int len; unsigned int free; char buf[]; };从2.4版本开始,redis开始使用jemalloc内存分配器。这个比glibc的malloc要好不少,还省内存。在这里可以简单理解,jemalloc会分配8,16,32,64等字节的内存。embstr最小为16+8+8+1=33,所以最小分配64字节。当字符数小于39时,都会分配64字节。三个编码的转换int->raw向一个保存整数数值的字符串对象使用APPEND命令,就会使得int转变为rawembstr->raw对embstr类型的字符串,执行任何的修改命令,都会变为raw相关命令字符串命令的实现在t_string.c中REDIS_LIST列表对象底层主要由2种编码方式:REDIS_ENCODING_ZIPLIST、REDIS_ENCODING_LINKEDLISTREDIS_ENCODING_ZIPLISTziplist是指使用压缩列表实现REDIS_ENCODING_LINKLISTlinklist是使用双端链表实现编码转换redis.h#define REDIS_LIST_MAX_ZIPLIST_ENTRIES 512 /*list中元素个数小于该值,list底层使用ziplist*/ #define REDIS_LIST_MAX_ZIPLIST_VALUE 64 /*list中所有的字符串长度小于该值,list底层使用ziplist*/上述两个宏定义分别与redis的配置文件中list-max-ziplist-entries和list-max-ziplist-value对应REDIS_HASH哈希对象主要有2种编码方式,REDIS_ENCODING_ZIPLIST和REDIS_ENCODING_HTREDIS_ENCODING_ZIPLISTziplist作为底层实现,先放入键,后放入值REDIS_ENCODING_HT编码转换#define REDIS_HASH_MAX_ZIPLIST_ENTRIES 512 //哈希对象保存的键值对数量小于512个,使用ziplist; #define REDIS_HASH_MAX_ZIPLIST_VALUE 64 //哈希对象保存的所有键值对的键和值的字符串长度都小于64字节,使用ziplist;上述两个宏定义分别与redis的配置文件中hash-max-ziplist-entries和hash-max-ziplist-value对应REDIS_SET集合的底层编码方式也是两种:REDIS_ENCODING_INTSET和REDIS_ENCODING_HTREDIS_ENCODING_INTSET使用该编码方式作为集合的底层实现时候,一般是整数集合,比如REDIS_ENCODING_HT使用哈希表作为集合的底层实现方式时,所有的值作为键,但对应的值为null编码转换#define REDIS_SET_MAX_INTSET_ENTRIES 512 /*集合中元素个数小于该值,且全为整数,set底层使用intset*/对应的redis配置文件选项为set-max-intset-entriesREDIS_ZSET有序集合底层实现为:REDIS_ENCODING_ZIPLIST和REDIS_ENCODING_SKIPLISTREDIS_ENCODING_ZIPLIST当使用压缩列表作为有序集合的底层实现时候,压缩列表的entry有2个值,一个是值,一个是得分,同时按照得分由小到大进行排列REDIS_ENCODING_SKIPLIST当使用跳跃表进行底层实现时候,一个有序集合同时包括:一个跳跃表一个字典为什么有序集合需要同时使用跳跃表和字典来实现?在理论上,有序集合可以单独使用字典或者跳跃表的其中一种数据结构来实现,但无论单独使用字典还是跳跃表,在性能上对比起同时使用字典和跳跃表都会有所降低。举个例子,如果我们只使用字典来实现有序集合,那么虽然以O(1)复杂度查找成员的分值这一特性会被保留,但是,因为字典以无序的方式来保存集合元素,所以每次在执行范围型操作——比如ZRANK、ZRANGE等命令时,程序都需要对字典保存的所有元素进行排序,完成这种排序需要至少O(NlogN)时间复杂度,以及额外的O(N)内存空间(因为要创建一个数组来保存排序后的元素)。另一方面,如果我们只使用跳跃表来实现有序集合,那么跳跃表执行范围型操作的所有优点都会被保留,但因为没有了字典,所以根据成员查找分值这一操作的复杂度将从O(1)上升为O(logN)。因为以上原因,为了让有序集合的查找和范围型操作都尽可能快地执行,Redis 选择了同时使用字典和跳跃表两种数据结构来实现有序集合。编码转换#define REDIS_ZSET_MAX_ZIPLIST_ENTRIES 128 /*有序集合中元素个数小于该值,zset底层使用ziplist*/ #define REDIS_ZSET_MAX_ZIPLIST_VALUE 64 /*有序集合中元素长度小于该值,zset底层使用ziplist*/上述两个宏定义分别与redis的配置文件中zset-max-ziplist-entries和zset-max-ziplist-value对应总结这一次把redis的数据结构和对应的对象实现方式大致说了一遍,最重要的还是什么时候使用什么数据结构,并且各种数据结构一些命令的时间复杂度等,这些其实还没有进行阐述,后面会单独开一章进行讲解,因为在实际项目中,我们要针对不同场景对数据结构进行选取
2022年05月08日
218 阅读
0 评论
0 点赞
2022-05-08
redis服务器
博客网址:www.shicoder.top微信:kj11011029欢迎加群聊天 :452380935这一次主要讲下redis中服务器这个结构体相关代码,主要从是代码层面进行讲解redis服务器redis服务器结构体主要代码在redis.h/redisServer,下面给出该结构体源码,可以看到源码中对该结构体定义很长,这一节我们一点点分析,当然有些地方可能我也理解不到位hhh// redis服务器实例 struct redisServer { char *configfile; /* 配置文件的绝对路径 */ int hz; /* serverCron() 每秒调用的次数 */ redisDb *db; /* 数据库数组,里面存放的是该服务器所有的数据库 */ dict *commands; /* 命令表(受到 rename 配置选项的作用) */ dict *orig_commands; /* 命令表(无 rename 配置选项的作用) */ aeEventLoop *el; /* 事件状态 */ unsigned lruclock:REDIS_LRU_BITS; /* 最近一次使用时钟 */ int shutdown_asap; /* 关闭服务器的标识 */ int activerehashing; /* 在执行 serverCron() 时进行渐进式 rehash */ char *requirepass; /* 是否设置了密码 */ char *pidfile; /* PID 文件路径 */ int arch_bits; /* 架构类型32or64 */ int cronloops; /* serverCron() 函数的运行次数计数器 */ char runid[REDIS_RUN_ID_SIZE+1]; /* 本服务器的 RUN ID ID在每秒都会变化 */ int sentinel_mode; /* 服务器是否运行在 SENTINEL 模式 */ int port; /* TCP 监听端口 */ int tcp_backlog; /* TCP连接中已完成队列(完成三次握手之后)的长度 */ char *bindaddr[REDIS_BINDADDR_MAX]; /* 绑定地址 */ int bindaddr_count; /* bindaddr地址数量 */ char *unixsocket; /* UNIX socket 路径 */ mode_t unixsocketperm; /* UNIX socket permission */ int ipfd[REDIS_BINDADDR_MAX]; /* TCP套接字描述符 */ int ipfd_count; /* ipfd中使用的套接字数量 */ int sofd; /* Unix套接字描述符 */ int cfd[REDIS_BINDADDR_MAX];/* 集群总线监听套接字 */ int cfd_count; /* cfd使用到的套接字数量 */ list *clients; /* 链表,保存了所有客户端状态结构 */ list *clients_to_close; /* 链表,保存了所有待关闭的客户端 */ list *slaves, *monitors; /* 链表,保存了所有从服务器,以及所有监视器 */ redisClient *current_client; /* C服务器的当前客户端,仅用于崩溃报告 */ int clients_paused; /* 客服端是否被paused */ mstime_t clients_pause_end_time; /* 执行undo clients_paused的时间 */ char neterr[ANET_ERR_LEN]; /* anet.c网络错误缓冲区 */ dict *migrate_cached_sockets;/* MIGRATE缓冲套接字 */ int loading; /* 服务器是否正在被载入 */ off_t loading_total_bytes; /* 正在载入的数据的大小 */ off_t loading_loaded_bytes; /* 已载入数据的大小 */ time_t loading_start_time; /* 开始进行载入的时间 */ off_t loading_process_events_interval_bytes; // 常用命令的快捷连接 struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand, *rpopCommand; time_t stat_starttime; /* 服务器启动时间 */ long long stat_numcommands; /* 已处理命令的数量 */ long long stat_numconnections; /* 服务器接到的连接请求数量 */ long long stat_expiredkeys; /* 已过期的键数量 */ long long stat_evictedkeys; /* 因为回收内存而被释放的过期键的数量 */ long long stat_keyspace_hits; /* 成功查找键的次数 */ long long stat_keyspace_misses; /* 查找键失败的次数 */ size_t stat_peak_memory; /* 已使用内存峰值 */ long long stat_fork_time; /* 最后一次执行 fork() 时消耗的时间 */ long long stat_rejected_conn; /* 服务器因为客户端数量过多而拒绝客户端连接的次数 */ long long stat_sync_full; /* 执行 full sync 的次数 */ long long stat_sync_partial_ok; /* PSYNC 成功执行的次数 */ long long stat_sync_partial_err;/* PSYNC 执行失败的次数 */ list *slowlog; /* 保存了所有慢查询日志的链表 */ long long slowlog_entry_id; /* SLOWLOG当前条目ID */ long long slowlog_log_slower_than; /* 服务器配置 slowlog-log-slower-than 选项的值(SLOWLOG时间限制) */ unsigned long slowlog_max_len; /* 服务器配置 slowlog-max-len 选项的值(SLOWLOG记录的最大项目数) */ size_t resident_set_size; /* serverCron()中rss采样次数. */ long long ops_sec_last_sample_time; /* 最后一次进行抽样的时间 */ long long ops_sec_last_sample_ops; /* 最后一次抽样时,服务器已执行命令的数量 */ long long ops_sec_samples[REDIS_OPS_SEC_SAMPLES]; /* 抽样结果 */ int ops_sec_idx; /* 数组索引,用于保存抽样结果,并在需要时回绕到 0 */ int verbosity; /* 日志等级 Redis总共支持四个级别:debug、verbose、notice、warning,默认为notice */ int maxidletime; /* 客户端超时最大时间 */ int tcpkeepalive; /* 是否开启SO_KEEPALIVE选项 */ int active_expire_enabled; /* 测试时候可以禁用 */ size_t client_max_querybuf_len; /* 客户端查询缓冲区长度限制 */ int dbnum; /* 服务器初始化应该创建多少个服务器 config中databases 16可以设定该选项 */ int daemonize; /* 如果作为守护进程运行,则为True */ // 客户端输出缓冲区大小限制 // 数组的元素有 REDIS_CLIENT_LIMIT_NUM_CLASSES 个 // 每个代表一类客户端:普通、从服务器、pubsub,诸如此类 clientBufferLimitsConfig client_obuf_limits[REDIS_CLIENT_LIMIT_NUM_CLASSES]; int aof_state; /* AOF 状态(开启/关闭/可写) */ int aof_fsync; /* 所使用的 fsync 策略(每个写入/每秒/从不) */ char *aof_filename; /* AOF文件名字 */ int aof_no_fsync_on_rewrite; /* 如果重写是在prog中,请不要fsync */ int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */ off_t aof_rewrite_base_size; /* 最后一次执行 BGREWRITEAOF 时, AOF 文件的大小 */ off_t aof_current_size; /* AOF 文件的当前字节大小 */ int aof_rewrite_scheduled; /* BGSAVE终止后重写 */ pid_t aof_child_pid; /* 负责进行 AOF 重写的子进程 ID */ list *aof_rewrite_buf_blocks; /* AOF 重写缓存链表,链接着多个缓存块 */ sds aof_buf; /* AOF 缓冲区 */ int aof_fd; /* 当前所选AOF文件的文件描述符 */ int aof_selected_db; /* 当前在AOF中选择的数据库 */ time_t aof_flush_postponed_start; /*推迟AOF flush的UNIX时间 */ time_t aof_last_fsync; /* 最后一直执行 fsync 的时间 */ time_t aof_rewrite_time_last; /* 最后一次AOF重写运行所用的时间 */ time_t aof_rewrite_time_start; /* 当前AOF重写开始时间 */ int aof_lastbgrewrite_status; /* 最后一次执行 BGREWRITEAOF 的结果REDIS_OK或REDIS_ERR */ unsigned long aof_delayed_fsync; /* 记录 AOF 的 write 操作被推迟了多少次 */ int aof_rewrite_incremental_fsync;/* 指示是否需要每写入一定量的数据,就主动执行一次 fsync() */ int aof_last_write_status; /* REDIS_OK or REDIS_ERR */ int aof_last_write_errno; /* 如果aof_last_write_status是ERR,则有效 */ long long dirty; /* 自从上次 SAVE 执行以来,数据库被修改的次数 */ long long dirty_before_bgsave; /* BGSAVE 执行前的数据库被修改次数 */ pid_t rdb_child_pid; /* 负责执行 BGSAVE 的子进程的 ID,没在执行 BGSAVE 时,设为 -1 */ struct saveparam *saveparams; /* 为RDB保存点数组 */ int saveparamslen; /* saveparams长度 */ char *rdb_filename; /* RDB文件的名称 */ int rdb_compression; /* 是否在RDB中使用压缩 */ int rdb_checksum; /* 是否使用RDB校验和 */ time_t lastsave; /* 最后一次完成 SAVE 的时间 */ time_t lastbgsave_try; /* 最后一次尝试执行 BGSAVE 的时间 */ time_t rdb_save_time_last; /* 最近一次 BGSAVE 执行耗费的时间 */ time_t rdb_save_time_start; /* 数据库最近一次开始执行 BGSAVE 的时间 */ int lastbgsave_status; /* 最后一次执行 SAVE 的状态REDIS_OK or REDIS_ERR */ int stop_writes_on_bgsave_err; /* 如果不能BGSAVE,不允许写入 */ /* Propagation of commands in AOF / replication */ redisOpArray also_propagate; /* Additional command to propagate. */ char *logfile; /* 日志文件的路径 */ int syslog_enabled; /* 是否启用了syslog */ char *syslog_ident; /* 指定syslog的标示符,如果上面的syslog-enabled no,则这个选项无效 */ int syslog_facility; /* 指定syslog facility,必须是USER或者LOCAL0到LOCAL7 */ int slaveseldb; /* Last SELECTed DB in replication output */ long long master_repl_offset; /* 全局复制偏移量(一个累计值) */ int repl_ping_slave_period; /* Master每N秒ping一次slave */ // backlog 本身 char *repl_backlog; /* Replication backlog for partial syncs */ long long repl_backlog_size; /* Backlog循环缓冲区大小 */ long long repl_backlog_histlen; /* backlog 中数据的长度 */ long long repl_backlog_idx; /* backlog 的当前索引 */ long long repl_backlog_off; /* backlog 中可以被还原的第一个字节的偏移量 */ time_t repl_backlog_time_limit; /* backlog 的过期时间 */ time_t repl_no_slaves_since; /* 距离上一次有从服务器的时间 */ int repl_min_slaves_to_write; /* 是否开启最小数量从服务器写入功能 */ int repl_min_slaves_max_lag; /* 定义最小数量从服务器的最大延迟值 */ int repl_good_slaves_count; /* 延迟良好的从服务器的数量 lag <= max_lag. */ char *masterauth; /* 主服务器的验证密码 */ char *masterhost; /* 主服务器的地址 */ int masterport; /* 主服务器的端口 */ int repl_timeout; /* 主机空闲N秒后超时 */ redisClient *master; /* 主服务器所对应的客户端 */ redisClient *cached_master; /* 被缓存的主服务器,PSYNC 时使用 */ int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ int repl_state; /* 复制的状态(服务器是从服务器时使用) */ off_t repl_transfer_size; /* 在同步期间从主机读取的RDB的大小 */ off_t repl_transfer_read; /* 在同步期间从主设备读取的RDB字节数 */ // 最近一次执行 fsync 时的偏移量 // 用于 sync_file_range 函数 off_t repl_transfer_last_fsync_off; /* 上次fsync-ed时偏移 */ int repl_transfer_s; /* 主服务器的套接字 */ int repl_transfer_fd; /* 保存 RDB 文件的临时文件的描述符 */ char *repl_transfer_tmpfile; /* 保存 RDB 文件的临时文件名字 */ time_t repl_transfer_lastio; /* 最近一次读入 RDB 内容的时间 */ int repl_serve_stale_data; /* Serve stale data when link is down? */ int repl_slave_ro; /* 从服务器是否只读 */ time_t repl_down_since; /* 连接断开的时长 */ int repl_disable_tcp_nodelay; /* 是否要在 SYNC 之后关闭 NODELAY */ int slave_priority; /* 从服务器优先级 */ char repl_master_runid[REDIS_RUN_ID_SIZE+1]; /*本服务器(从服务器)当前主服务器的 RUN ID */ long long repl_master_initial_offset; /* Master PSYNC offset. */ /* ---------下面一些属性有些很难用到,对此我也没仔细看 */ /* Replication script cache. */ // 复制脚本缓存 // 字典 dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */ // FIFO 队列 list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */ // 缓存的大小 int repl_scriptcache_size; /* Max number of elements. */ /* Synchronous replication. */ list *clients_waiting_acks; /* Clients waiting in WAIT command. */ int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */ int maxclients; /* 最大并发客户端数 */ unsigned long long maxmemory; /* 要使用的最大内存字节数 */ int maxmemory_policy; /* Policy for key eviction */ int maxmemory_samples; /* Pricision of random sampling */ unsigned int bpop_blocked_clients; /* 列表阻止的客户端数量 */ list *unblocked_clients; /* 在下一个循环之前解锁的客户端列表 */ list *ready_keys; /* List of readyList structures for BLPOP & co */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; int sort_alpha; int sort_bypattern; int sort_store; /* Zip structure config, see redis.conf for more information */ size_t hash_max_ziplist_entries; size_t hash_max_ziplist_value; size_t list_max_ziplist_entries; size_t list_max_ziplist_value; size_t set_max_intset_entries; size_t zset_max_ziplist_entries; size_t zset_max_ziplist_value; size_t hll_sparse_max_bytes; time_t unixtime; /* Unix time sampled every cron cycle. */ long long mstime; /* Like 'unixtime' but with milliseconds resolution. */ /* Pubsub */ // 字典,键为频道,值为链表 // 链表中保存了所有订阅某个频道的客户端 // 新客户端总是被添加到链表的表尾 dict *pubsub_channels; /* Map channels to list of subscribed clients */ // 这个链表记录了客户端订阅的所有模式的名字 list *pubsub_patterns; /* A list of pubsub_patterns */ int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an xor of REDIS_NOTIFY... flags. */ /* Cluster */ int cluster_enabled; /* 群集是否已启用 */ mstime_t cluster_node_timeout; /* 集群节点超时时间. */ char *cluster_configfile; /* 集群自动生成的配置文件名 */ struct clusterState *cluster; /* 集群的状态*/ int cluster_migration_barrier; /* Cluster replicas migration barrier. */ /* Scripting */ // Lua 环境 lua_State *lua; /* The Lua interpreter. We use just one for all clients */ // 复制执行 Lua 脚本中的 Redis 命令的伪客户端 redisClient *lua_client; /* The "fake client" to query Redis from Lua */ // 当前正在执行 EVAL 命令的客户端,如果没有就是 NULL redisClient *lua_caller; /* The client running EVAL right now, or NULL */ // 一个字典,值为 Lua 脚本,键为脚本的 SHA1 校验和 dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */ // Lua 脚本的执行时限 mstime_t lua_time_limit; /* Script timeout in milliseconds */ // 脚本开始执行的时间 mstime_t lua_time_start; /* Start time of script, milliseconds time */ // 脚本是否执行过写命令 int lua_write_dirty; /* True if a write command was called during the execution of the current script. */ // 脚本是否执行过带有随机性质的命令 int lua_random_dirty; /* True if a random command was called during the execution of the current script. */ // 脚本是否超时 int lua_timedout; /* True if we reached the time limit for script execution. */ // 是否要杀死脚本 int lua_kill; /* Kill the script if true. */ /* Assert & bug reporting */ char *assert_failed; char *assert_file; int assert_line; int bug_report_start; /* True if bug report header was already logged. */ int watchdog_period; /* Software watchdog period in ms. 0 = off */ };下面重点讲下redis服务器启动的流程,主要包括以下几个步骤,不懂的同学可以看下redis.c/main函数,就可以大致了解其过程检查服务器是否以Sentinel模式启动初始化全局服务器配置initServerConfig()如果是Sentinel模式,则初始化相关配置initSentinelConfig、initSentinel加载配置文件loadServerConfig()将服务器进程设置为守护进程daemonize初始化服务器initServer如果服务器进程为守护进程,则创建PID文件createPidFile为服务器进程设置名字redisSetProcTitle打印logoredisAsciiArt加载数据库loadDataFromDisk:AOF 持久化已打开,则使用loadAppendOnlyFile(),否则使用加载RDB文件rdbLoad()运行事件处理器,一直到服务器关闭为止aeMain下面对上面几个函数依次进行讲解Sentinel模式Sentinel模式就是哨兵模式,下面给出该模式的一个例子其中server1是主服务器,其余server2,3,4为从服务器。在生产环境中,不免会有意外原因导致redis服务器挂掉,如果此时挂掉的是一个master节点,主节点宕机,主从复制将不能继续进行,写数据将会阻塞,而哨兵的存在主要是为了切换掉宕机的master,然后从master下面的slave节点中选举一个作为新的master,并且把旧的master的slave全部转移到新的master上面,继续原有的主从复制。 哨兵本身是一个独立的进程,本身也是有单点问题的,所以哨兵也有自身的集群,用来保证哨兵本身的容错机制。 可以将redis中sentinel想成一个特殊的redis服务器,但是他不会像redis普通服务器那样去加载rdb或者aof文件,在initSentinel函数中,会创建一个sentinel结构体 sentinelState,代码如下/* Sentinel 的状态结构 */ struct sentinelState { // 当前纪元 uint64_t current_epoch; // 保存了所有被这个 sentinel 监视的主服务器 // 字典的键是主服务器的名字 // 字典的值则是一个指向 sentinelRedisInstance 结构的指针 dict *masters; // 是否进入了 TILT 模式? int tilt; // 目前正在执行的脚本的数量 int running_scripts; // 进入 TILT 模式的时间 mstime_t tilt_start_time; // 最后一次执行时间处理器的时间 mstime_t previous_time; // 一个 FIFO 队列,包含了所有需要执行的用户脚本 list *scripts_queue; } sentinel; // 以 Sentinel 模式初始化服务器 void initSentinel(void) { int j; // 清空 Redis 服务器的命令表(该表用于普通模式) dictEmpty(server.commands,NULL); // 将 SENTINEL 模式所用的命令添加进命令表 for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) { int retval; struct redisCommand *cmd = sentinelcmds+j; retval = dictAdd(server.commands, sdsnew(cmd->name), cmd); redisAssert(retval == DICT_OK); } /* 初始化 Sentinel 的状态 */ // 初始化纪元 sentinel.current_epoch = 0; // 初始化保存主服务器信息的字典 sentinel.masters = dictCreate(&instancesDictType,NULL); // 初始化 TILT 模式的相关选项 sentinel.tilt = 0; sentinel.tilt_start_time = 0; sentinel.previous_time = mstime(); // 初始化脚本相关选项 sentinel.running_scripts = 0; sentinel.scripts_queue = listCreate(); }其中有一个master字典,这里面记录了记录了所有被 Sentinel 监视的主服务器的相关信息, 其中:字典的键是被监视主服务器的名字。而字典的值则是被监视主服务器对应的 sentinel.c/sentinelRedisInstance 结构。每个 sentinelRedisInstance 结构代表一个被 Sentinel 监视的 Redis 服务器实例(instance), 这个实例可以是主服务器、从服务器、或者另外一个 Sentinel 。下面给出这个结构体的代码// Sentinel 会为每个被监视的 Redis 实例创建相应的 sentinelRedisInstance 实例 // (被监视的实例可以是主服务器、从服务器、或者其他 Sentinel ) typedef struct sentinelRedisInstance { // 标识值,记录了实例的类型,以及该实例的当前状态 // 当为SRI_MASTER为主服务器,当为SRI_SLAVE为从服务器,当为SRI_SENTINEL为sentinel服务器 int flags; // 实例的名字 // 主服务器的名字由用户在配置文件中设置 // 从服务器以及 Sentinel 的名字由 Sentinel 自动设置 // 格式为 ip:port ,例如 "127.0.0.1:26379" char *name; // 实例的运行 ID char *runid; // 配置纪元,用于实现故障转移 uint64_t config_epoch; // 实例的地址 sentinelAddr *addr; // 用于发送命令的异步连接 redisAsyncContext *cc; // 用于执行 SUBSCRIBE 命令、接收频道信息的异步连接 // 仅在实例为主服务器时使用 redisAsyncContext *pc; // 已发送但尚未回复的命令数量 int pending_commands; // cc 连接的创建时间 mstime_t cc_conn_time; // pc 连接的创建时间 mstime_t pc_conn_time; // 最后一次从这个实例接收信息的时间 mstime_t pc_last_activity; // 实例最后一次返回正确的 PING 命令回复的时间 mstime_t last_avail_time; // 实例最后一次发送 PING 命令的时间 mstime_t last_ping_time; // 实例最后一次返回 PING 命令的时间,无论内容正确与否 mstime_t last_pong_time; // 最后一次向频道发送问候信息的时间 // 只在当前实例为 sentinel 时使用 mstime_t last_pub_time; // 最后一次接收到这个 sentinel 发来的问候信息的时间 // 只在当前实例为 sentinel 时使用 mstime_t last_hello_time; // 最后一次回复 SENTINEL is-master-down-by-addr 命令的时间 // 只在当前实例为 sentinel 时使用 mstime_t last_master_down_reply_time; // 实例被判断为 SDOWN 状态的时间 mstime_t s_down_since_time; // 实例被判断为 ODOWN 状态的时间 mstime_t o_down_since_time; // SENTINEL down-after-milliseconds 选项所设定的值 // 实例无响应多少毫秒之后才会被判断为主观下线(subjectively down) mstime_t down_after_period; // 从实例获取 INFO 命令的回复的时间 mstime_t info_refresh; // 实例的角色 int role_reported; // 角色的更新时间 mstime_t role_reported_time; // 最后一次从服务器的主服务器地址变更的时间 mstime_t slave_conf_change_time; /* 主服务器实例特有的属性 */ // 其他同样监控这个主服务器的所有 sentinel dict *sentinels; // 如果这个实例代表的是一个主服务器 // 那么这个字典保存着主服务器属下的从服务器 // 字典的键是从服务器的名字,字典的值是从服务器对应的 sentinelRedisInstance 结构 dict *slaves; // SENTINEL monitor <master-name> <IP> <port> <quorum> 选项中的 quorum 参数 // 判断这个实例为客观下线(objectively down)所需的支持投票数量 int quorum; // SENTINEL parallel-syncs <master-name> <number> 选项的值 // 在执行故障转移操作时,可以同时对新的主服务器进行同步的从服务器数量 int parallel_syncs; // 连接主服务器和从服务器所需的密码 char *auth_pass; /* 从服务器实例特有的属性*/ // 主从服务器连接断开的时间 mstime_t master_link_down_time; // 从服务器优先级 int slave_priority; // 执行故障转移操作时,从服务器发送 SLAVEOF <new-master> 命令的时间 mstime_t slave_reconf_sent_time; // 主服务器的实例(在本实例为从服务器时使用) struct sentinelRedisInstance *master; // INFO 命令的回复中记录的主服务器 IP char *slave_master_host; // INFO 命令的回复中记录的主服务器端口号 int slave_master_port; // INFO 命令的回复中记录的主从服务器连接状态 int slave_master_link_status; // 从服务器的复制偏移量 unsigned long long slave_repl_offset; /* 故障转移相关属性*/ // 如果这是一个主服务器实例,那么 leader 将是负责进行故障转移的 Sentinel 的运行 ID 。 // 如果这是一个 Sentinel 实例,那么 leader 就是被选举出来的领头 Sentinel 。 // 这个域只在 Sentinel 实例的 flags 属性的 SRI_MASTER_DOWN 标志处于打开状态时才有效。 char *leader; // 领头的纪元 uint64_t leader_epoch; // 当前执行中的故障转移的纪元 uint64_t failover_epoch; // 故障转移操作的当前状态 int failover_state; // 状态改变的时间 mstime_t failover_state_change_time; // 最后一次进行故障迁移的时间 mstime_t failover_start_time; // SENTINEL failover-timeout <master-name> <ms> 选项的值 // 刷新故障迁移状态的最大时限 mstime_t failover_timeout; mstime_t failover_delay_logged; // 指向被提升为新主服务器的从服务器的指针 struct sentinelRedisInstance *promoted_slave; // 一个文件路径,保存着 WARNING 级别的事件发生时执行的, // 用于通知管理员的脚本的地址 char *notification_script; // 一个文件路径,保存着故障转移执行之前、之后、或者被中止时, // 需要执行的脚本的地址 char *client_reconfig_script; } sentinelRedisInstance;假如此时启动sentinel时候,配置文件如下##################### # master1 configure # ##################### sentinel monitor master1 127.0.0.1 6379 2 sentinel down-after-milliseconds master1 30000 sentinel parallel-syncs master1 1 sentinel failover-timeout master1 900000 ##################### # master2 configure # ##################### sentinel monitor master2 127.0.0.1 12345 5 sentinel down-after-milliseconds master2 50000 sentinel parallel-syncs master2 5 sentinel failover-timeout master2 450000则会为2个服务器创建如下结构体sentinel结构体中maste字典内容如下当一个redis服务器以sentinel模式启动,则它会自动去替换一些普通模式服务器的代码,比如普通redis服务器使用redis.h/REDIS_SERVERPORT作为端口,但是sentinel模式下会以sentinel.c/REDIS_SENTINEL_PORT作为端口,同时普通redis服务器的支持的命令在redis.c/redisCommandTable中,但是sentinel模式下支持的命令在sentinel.c/sentinelcmds,其中代码较少,下面给出代码// 服务器在 sentinel 模式下可执行的命令 struct redisCommand sentinelcmds[] = { {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0}, {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0}, {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0}, {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0}, {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0}, {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0}, {"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0}, {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0}, {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0} };sentinel主要就是为了应用于主服务器下线导致集群不可用情况,因此最重要的就是如何检测和如何防范,下面通过主观下线和客观下线两种方式进行说明主观下线默认情况下,每个sentinel会每秒钟向其他所有主服务器、从服务器、sentinels发送ping消息,返回结果分为有效返回(+PONG、-LOADING、-MASTERDOWN)三者之一或无效返回(上述三种其他回复或者指定时间内没有回复),若出现无效返回情况,则会将sentinelRedisInstance属性中的flag字段打开SRI_S_DOWN标志客观下线当一个sentinel对一台服务器设置为主观下线后,还需要判断是否客观下线,它会向其他监视该服务器的sentinels进行询问,当接收到足够数量(设置的quorum参数)的sentinels说该服务器也下线,则表明该服务器客观下线。客观下线会打开SRI_O_DOWN标志当一个主服务器被判定为客观下线后,监视这个下线服务器的全部sentinels会进行协商,选举出一个lead sentinel,这个lead sentinel会对下线服务器进行故障转移,包括三个步骤1、在已下线主服务器的从服务器中选一个主服务器,然后向其发送SLAVEOF no one命令,设置为主服务器2、让已下线主服务器下面的从服务器用刚刚选举的主服务器作为主服务器3、将已下线的主服务器认刚刚选举的主服务器作为自己的主服务器,当这个下线服务器再次上线时,就会真的设置为自己的主服务器初始化全局服务器配置redis.c/initServerConfig()void initServerConfig() { int j; // 设置服务器的运行 ID getRandomHexChars(server.runid,REDIS_RUN_ID_SIZE); // 设置默认配置文件路径 server.configfile = NULL; // 设置默认服务器频率 server.hz = REDIS_DEFAULT_HZ; // 为运行 ID 加上结尾字符 server.runid[REDIS_RUN_ID_SIZE] = '\0'; // 设置服务器的运行架构 server.arch_bits = (sizeof(long) == 8) ? 64 : 32; // 设置默认服务器端口号 server.port = REDIS_SERVERPORT; server.tcp_backlog = REDIS_TCP_BACKLOG; server.bindaddr_count = 0; server.unixsocket = NULL; server.unixsocketperm = REDIS_DEFAULT_UNIX_SOCKET_PERM; server.ipfd_count = 0; server.sofd = -1; server.dbnum = REDIS_DEFAULT_DBNUM; server.verbosity = REDIS_DEFAULT_VERBOSITY; server.maxidletime = REDIS_MAXIDLETIME; server.tcpkeepalive = REDIS_DEFAULT_TCP_KEEPALIVE; server.active_expire_enabled = 1; server.client_max_querybuf_len = REDIS_MAX_QUERYBUF_LEN; server.saveparams = NULL; server.loading = 0; server.logfile = zstrdup(REDIS_DEFAULT_LOGFILE); server.syslog_enabled = REDIS_DEFAULT_SYSLOG_ENABLED; server.syslog_ident = zstrdup(REDIS_DEFAULT_SYSLOG_IDENT); server.syslog_facility = LOG_LOCAL0; server.daemonize = REDIS_DEFAULT_DAEMONIZE; server.aof_state = REDIS_AOF_OFF; server.aof_fsync = REDIS_DEFAULT_AOF_FSYNC; server.aof_no_fsync_on_rewrite = REDIS_DEFAULT_AOF_NO_FSYNC_ON_REWRITE; server.aof_rewrite_perc = REDIS_AOF_REWRITE_PERC; server.aof_rewrite_min_size = REDIS_AOF_REWRITE_MIN_SIZE; server.aof_rewrite_base_size = 0; server.aof_rewrite_scheduled = 0; server.aof_last_fsync = time(NULL); server.aof_rewrite_time_last = -1; server.aof_rewrite_time_start = -1; server.aof_lastbgrewrite_status = REDIS_OK; server.aof_delayed_fsync = 0; server.aof_fd = -1; server.aof_selected_db = -1; /* 保证不选中任意数据库 */ server.aof_flush_postponed_start = 0; server.aof_rewrite_incremental_fsync = REDIS_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC; server.pidfile = zstrdup(REDIS_DEFAULT_PID_FILE); server.rdb_filename = zstrdup(REDIS_DEFAULT_RDB_FILENAME); server.aof_filename = zstrdup(REDIS_DEFAULT_AOF_FILENAME); server.requirepass = NULL; server.rdb_compression = REDIS_DEFAULT_RDB_COMPRESSION; server.rdb_checksum = REDIS_DEFAULT_RDB_CHECKSUM; server.stop_writes_on_bgsave_err = REDIS_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR; server.activerehashing = REDIS_DEFAULT_ACTIVE_REHASHING; server.notify_keyspace_events = 0; server.maxclients = REDIS_MAX_CLIENTS; server.bpop_blocked_clients = 0; server.maxmemory = REDIS_DEFAULT_MAXMEMORY; server.maxmemory_policy = REDIS_DEFAULT_MAXMEMORY_POLICY; server.maxmemory_samples = REDIS_DEFAULT_MAXMEMORY_SAMPLES; server.hash_max_ziplist_entries = REDIS_HASH_MAX_ZIPLIST_ENTRIES; server.hash_max_ziplist_value = REDIS_HASH_MAX_ZIPLIST_VALUE; server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES; server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE; server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES; server.zset_max_ziplist_entries = REDIS_ZSET_MAX_ZIPLIST_ENTRIES; server.zset_max_ziplist_value = REDIS_ZSET_MAX_ZIPLIST_VALUE; server.hll_sparse_max_bytes = REDIS_DEFAULT_HLL_SPARSE_MAX_BYTES; server.shutdown_asap = 0; server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD; server.repl_timeout = REDIS_REPL_TIMEOUT; server.repl_min_slaves_to_write = REDIS_DEFAULT_MIN_SLAVES_TO_WRITE; server.repl_min_slaves_max_lag = REDIS_DEFAULT_MIN_SLAVES_MAX_LAG; server.cluster_enabled = 0; server.cluster_node_timeout = REDIS_CLUSTER_DEFAULT_NODE_TIMEOUT; server.cluster_migration_barrier = REDIS_CLUSTER_DEFAULT_MIGRATION_BARRIER; server.cluster_configfile = zstrdup(REDIS_DEFAULT_CLUSTER_CONFIG_FILE); server.lua_caller = NULL; server.lua_time_limit = REDIS_LUA_TIME_LIMIT; server.lua_client = NULL; server.lua_timedout = 0; server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL); server.loading_process_events_interval_bytes = (1024*1024*2); // 初始化 LRU 时间 server.lruclock = getLRUClock(); // 初始化并设置保存条件 resetServerSaveParams(); appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */ appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */ // 初始化和复制相关的状态 server.masterauth = NULL; server.masterhost = NULL; server.masterport = 6379; server.master = NULL; server.cached_master = NULL; server.repl_master_initial_offset = -1; server.repl_state = REDIS_REPL_NONE; server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT; server.repl_serve_stale_data = REDIS_DEFAULT_SLAVE_SERVE_STALE_DATA; server.repl_slave_ro = REDIS_DEFAULT_SLAVE_READ_ONLY; server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.repl_disable_tcp_nodelay = REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY; server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY; server.master_repl_offset = 0; // 初始化 PSYNC 命令所使用的 backlog server.repl_backlog = NULL; server.repl_backlog_size = REDIS_DEFAULT_REPL_BACKLOG_SIZE; server.repl_backlog_histlen = 0; server.repl_backlog_idx = 0; server.repl_backlog_off = 0; server.repl_backlog_time_limit = REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT; server.repl_no_slaves_since = time(NULL); // 设置客户端的输出缓冲区限制 for (j = 0; j < REDIS_CLIENT_LIMIT_NUM_CLASSES; j++) server.client_obuf_limits[j] = clientBufferLimitsDefaults[j]; // 初始化浮点常量 R_Zero = 0.0; R_PosInf = 1.0/R_Zero; R_NegInf = -1.0/R_Zero; R_Nan = R_Zero/R_Zero; // 初始化命令表 // 在这里初始化是因为接下来读取 .conf 文件时可能会用到这些命令 server.commands = dictCreate(&commandTableDictType,NULL); server.orig_commands = dictCreate(&commandTableDictType,NULL); populateCommandTable(); server.delCommand = lookupCommandByCString("del"); server.multiCommand = lookupCommandByCString("multi"); server.lpushCommand = lookupCommandByCString("lpush"); server.lpopCommand = lookupCommandByCString("lpop"); server.rpopCommand = lookupCommandByCString("rpop"); // 初始化慢查询日志 server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN; server.slowlog_max_len = REDIS_SLOWLOG_MAX_LEN; // 初始化调试项 server.assert_failed = "<no assertion failed>"; server.assert_file = "<no file>"; server.assert_line = 0; server.bug_report_start = 0; server.watchdog_period = 0; }主要包括以下几个方面网络监听相关,如绑定地址,TCP端口等虚拟内存相关,如swap文件、page大小等保存机制,多长时间内有多少次更新才进行保存复制相关,如是否是slave,master地址、端口Hash相关设置初始化命令表加载配置文件上面加载的可以想象成是一个默认配置文件,若 初始化时候,指定了配置文件,则会将其中一些字段进行修改config.c/loadServerConfigvoid loadServerConfig(char *filename, char *options) { sds config = sdsempty(); char buf[REDIS_CONFIGLINE_MAX+1]; // 载入文件内容 if (filename) { FILE *fp; if (filename[0] == '-' && filename[1] == '\0') { fp = stdin; } else { if ((fp = fopen(filename,"r")) == NULL) { redisLog(REDIS_WARNING, "Fatal error, can't open config file '%s'", filename); exit(1); } } while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) config = sdscat(config,buf); if (fp != stdin) fclose(fp); } // 追加 options 字符串到内容的末尾 if (options) { config = sdscat(config,"\n"); config = sdscat(config,options); } // 根据字符串内容,设置服务器配置 loadServerConfigFromString(config); sdsfree(config); }设置为守护进程代码如下void daemonize(void) { int fd; if (fork() != 0) exit(0); /* 父进程退出 */ setsid(); /* 创建新会话 */ /* 将输出定位到/dev/null */ if ((fd = open("/dev/null", O_RDWR, 0)) != -1) { dup2(fd, STDIN_FILENO); dup2(fd, STDOUT_FILENO); dup2(fd, STDERR_FILENO); if (fd > STDERR_FILENO) close(fd); } }初始化服务器initServer代码如下void initServer() { int j; // 设置信号处理函数 // 因为是守护进程,所以没有控制终端,屏蔽SIGHUP signal(SIGHUP, SIG_IGN); // SIGPIPE是写管道发现读进程终止时产生的信号,redis是服务器,会遇到各种client,所以需要忽略 signal(SIGPIPE, SIG_IGN); setupSignalHandlers(); // 设置 syslog if (server.syslog_enabled) { openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT, server.syslog_facility); } // 初始化并创建数据结构 server.current_client = NULL; server.clients = listCreate(); server.clients_to_close = listCreate(); server.slaves = listCreate(); server.monitors = listCreate(); server.slaveseldb = -1; server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); server.clients_waiting_acks = listCreate(); server.get_ack_from_slaves = 0; server.clients_paused = 0; // 创建共享对象 createSharedObjects(); adjustOpenFilesLimit(); server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR); server.db = zmalloc(sizeof(redisDb)*server.dbnum); // 打开 TCP 监听端口,用于等待客户端的命令请求 if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR) exit(1); // 打开 UNIX 本地端口 if (server.unixsocket != NULL) { unlink(server.unixsocket); /* don't care if this fails */ server.sofd = anetUnixServer(server.neterr,server.unixsocket, server.unixsocketperm, server.tcp_backlog); if (server.sofd == ANET_ERR) { redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr); exit(1); } anetNonBlock(NULL,server.sofd); } /* Abort if there are no listening sockets at all. */ if (server.ipfd_count == 0 && server.sofd < 0) { redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting."); exit(1); } // 创建并初始化数据库结构 for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); server.db[j].ready_keys = dictCreate(&setDictType,NULL); server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); server.db[j].eviction_pool = evictionPoolAlloc(); server.db[j].id = j; server.db[j].avg_ttl = 0; } // 创建 PUBSUB 相关结构 server.pubsub_channels = dictCreate(&keylistDictType,NULL); server.pubsub_patterns = listCreate(); listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern); server.cronloops = 0; server.rdb_child_pid = -1; server.aof_child_pid = -1; aofRewriteBufferReset(); server.aof_buf = sdsempty(); server.lastsave = time(NULL); /* At startup we consider the DB saved. */ server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */ server.rdb_save_time_last = -1; server.rdb_save_time_start = -1; server.dirty = 0; resetServerStats(); /* A few stats we don't want to reset: server startup time, and peak mem. */ server.stat_starttime = time(NULL); server.stat_peak_memory = 0; server.resident_set_size = 0; server.lastbgsave_status = REDIS_OK; server.aof_last_write_status = REDIS_OK; server.aof_last_write_errno = 0; server.repl_good_slaves_count = 0; updateCachedTime(); /* Create the serverCron() time event, that's our main way to process * background operations. */ // 为 serverCron() 创建时间事件 if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { redisPanic("Can't create the serverCron time event."); exit(1); } /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ // 为 TCP 连接关联连接应答(accept)处理器 // 用于接受并应答客户端的 connect() 调用 for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { redisPanic( "Unrecoverable error creating server.ipfd file event."); } } // 为本地套接字关联应答处理器 if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event."); /* Open the AOF file if needed. */ // 如果 AOF 持久化功能已经打开,那么打开或创建一个 AOF 文件 if (server.aof_state == REDIS_AOF_ON) { server.aof_fd = open(server.aof_filename, O_WRONLY|O_APPEND|O_CREAT,0644); if (server.aof_fd == -1) { redisLog(REDIS_WARNING, "Can't open the append-only file: %s", strerror(errno)); exit(1); } } // 对于 32 位实例来说,默认将最大可用内存限制在 3 GB if (server.arch_bits == 32 && server.maxmemory == 0) { redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now."); server.maxmemory = 3072LL*(1024*1024); /* 3 GB */ server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION; } // 如果服务器以 cluster 模式打开,那么初始化 cluster if (server.cluster_enabled) clusterInit(); // 初始化复制功能有关的脚本缓存 replicationScriptCacheInit(); // 初始化脚本系统 scriptingInit(); // 初始化慢查询功能 slowlogInit(); // 初始化 BIO 系统 bioInit(); }上面大多数注释已经对代码进行讲解,下面对slowlogInit进行单独讲解/* * 初始化服务器慢查询功能。 * * 这个函数只应该在服务器启动时执行一次。 */ void slowlogInit(void) { // 保存日志的链表,FIFO 顺序 server.slowlog = listCreate(); // 日志数量计数器 server.slowlog_entry_id = 0; // 日志链表的释构函数 listSetFreeMethod(server.slowlog,slowlogFreeEntry); } /* * 慢查询日志 */ typedef struct slowlogEntry { // 命令与命令参数 robj **argv; // 命令与命令参数的数量 int argc; // 唯一标识符 long long id; // 执行命令消耗的时间,以微秒为单位 // 注释里说的 nanoseconds 是错误的 long long duration; // 命令执行时的时间,格式为 UNIX 时间戳 time_t time; } slowlogEntry;其中还有一个函数bioInit,redis的BIO系统在redis3.0版本主要做两件事情:AOF持久化和关闭文件,可以将BIO系统想象成下面:创建一个队列,然后创建一些线程,来了一个任务就往队列里面添加任务,线程去任务队列里面取任务出来执行因为在redis3.0中只需要做两件事情,所以任务的结构体代码如下/* * 表示后台任务的数据结构 * * 这个结构只由 API 使用,不会被暴露给外部。 */ struct bio_job { // 任务创建时的时间 time_t time; /* * 任务的参数。参数多于三个时,可以传递数组或者结构 arg1一般是文件描述符 */ void *arg1, *arg2, *arg3; };任务初始化首先是相关静态变量的初始化#define REDIS_BIO_NUM_OPS 2 // 2个任务 // 工作线程,斥互和条件变量 static pthread_t bio_threads[REDIS_BIO_NUM_OPS]; static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS]; static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS]; // 存放工作的队列 static list *bio_jobs[REDIS_BIO_NUM_OPS]; // 初始化变量 for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { pthread_mutex_init(&bio_mutex[j],NULL); pthread_cond_init(&bio_condvar[j],NULL); bio_jobs[j] = listCreate(); bio_pending[j] = 0; } // 创建线程 for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { void *arg = (void*)(unsigned long) j; // 这里的函数参数是arg = j,也就是每个线程传入一个编号j,0代表关闭文件,1代表aof初始化 if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs."); exit(1); } bio_threads[j] = thread; } // bioProcessBackgroundJobs函数就是后台执行任务的函数 void *bioProcessBackgroundJobs(void *arg) { ... if (type == REDIS_BIO_CLOSE_FILE) { close((long)job->arg1); } else if (type == REDIS_BIO_AOF_FSYNC) { aof_fsync((long)job->arg1); } else { redisPanic("Wrong job type in bioProcessBackgroundJobs()."); } ... }事件处理器循环aeMain这个循环主要就是做两件事情,beforeSleep和aeProcessEvents// 运行事件处理器,一直到服务器关闭为止 aeSetBeforeSleepProc(server.el,beforeSleep); aeMain(server.el); // 服务器关闭,停止事件循环 aeDeleteEventLoop(server.el); /* * 设置处理事件前需要被执行的函数 */ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) { eventLoop->beforesleep = beforesleep; } /* * 事件处理器的主循环 */ void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { // 如果有需要在事件处理前执行的函数,那么运行它 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); // 开始处理事件 其实就是一个事件调度函数,包括处理时间事件和文件事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS); } } /* * 删除事件处理器 */ void aeDeleteEventLoop(aeEventLoop *eventLoop) { aeApiFree(eventLoop); zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); }下面单独对这两个函数进行讲解beforeSleep首先先看代码// 每次处理事件之前执行 void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); // 执行一次快速的主动过期检查 if (server.active_expire_enabled && server.masterhost == NULL) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); /* 如果在之前的事件循环迭代中至少有一个客户端阻塞,则向所有slave发送ACK请求 */ if (server.get_ack_from_slaves) { robj *argv[3]; argv[0] = createStringObject("REPLCONF",8); argv[1] = createStringObject("GETACK",6); argv[2] = createStringObject("*",1); /* Not used argument. */ replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); decrRefCount(argv[0]); decrRefCount(argv[1]); decrRefCount(argv[2]); server.get_ack_from_slaves = 0; } /* 解除阻塞等待同步复制的所有客户端 */ if (listLength(server.clients_waiting_acks)) processClientsWaitingReplicas(); /* 尝试为刚刚解除阻塞的客户端处理挂起的命令 */ if (listLength(server.unblocked_clients)) processUnblockedClients(); // 将 AOF 缓冲区的内容写入到 AOF 文件 // void flushAppendOnlyFile(int force) force参数表明是否强制刷新,当为0时候,若后台有fsync在执行,则延迟 flushAppendOnlyFile(0); // 在进入下个事件循环前,执行一些集群收尾工作 if (server.cluster_enabled) clusterBeforeSleep(); }aeProcessEventsredis中的事件主要分为两种事件:文件事件(和其他客户端连接产生的事件)和时间事件(定时时间产生的事件)redis处理时间事件的函数会在服务器运行期间,每隔一段事件运行,处理时间事件,每个事件以链表形式挂在一起,每次处理时候,都是遍历该链表/* Process time events * * 处理所有已到达的时间事件 */ static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; time_t now = time(NULL); /* If the system clock is moved to the future, and then set back to the * right value, time events may be delayed in a random way. Often this * means that scheduled operations will not be performed soon enough. * * Here we try to detect system clock skews, and force all the time * events to be processed ASAP when this happens: the idea is that * processing events earlier is less dangerous than delaying them * indefinitely, and practice suggests it is. */ // 通过重置事件的运行时间, // 防止因时间穿插(skew)而造成的事件处理混乱 if (now < eventLoop->lastTime) { te = eventLoop->timeEventHead; while(te) { te->when_sec = 0; te = te->next; } } // 更新最后一次处理时间事件的时间 eventLoop->lastTime = now; // 遍历链表 // 执行那些已经到达的事件 te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; while(te) { long now_sec, now_ms; long long id; // 跳过无效事件 if (te->id > maxId) { te = te->next; continue; } // 获取当前时间 aeGetTime(&now_sec, &now_ms); // 如果当前时间等于或等于事件的执行时间,那么说明事件已到达,执行这个事件 if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval; id = te->id; // 执行事件处理器,并获取返回值 retval = te->timeProc(eventLoop, id, te->clientData); processed++; /* After an event is processed our time event list may * no longer be the same, so we restart from head. * Still we make sure to don't process events registered * by event handlers itself in order to don't loop forever. * To do so we saved the max ID we want to handle. * * FUTURE OPTIMIZATIONS: * Note that this is NOT great algorithmically. Redis uses * a single time event so it's not a problem but the right * way to do this is to add the new elements on head, and * to flag deleted elements in a special way for later * deletion (putting references to the nodes to delete into * another linked list). */ // 记录是否有需要循环执行这个事件时间 if (retval != AE_NOMORE) { // 是的, retval 毫秒之后继续执行这个时间事件 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); } else { // 不,将这个事件删除 aeDeleteTimeEvent(eventLoop, id); } // 因为执行事件之后,事件列表可能已经被改变了 // 因此需要将 te 放回表头,继续开始执行事件 te = eventLoop->timeEventHead; } else { te = te->next; } } return processed; }下面的代码就是redis的事件调度函数/* * 事件调度函数 * 处理所有已到达的时间事件,以及所有已就绪的文件事件。 * 如果不传入特殊 flags 的话,那么函数睡眠直到文件事件就绪, * 或者下个时间事件到达(如果有的话)。 * * 如果 flags 为 0 ,那么函数不作动作,直接返回。 * 如果 flags 包含 AE_ALL_EVENTS ,所有类型的事件都会被处理。 * 如果 flags 包含 AE_FILE_EVENTS ,那么处理文件事件。 * 如果 flags 包含 AE_TIME_EVENTS ,那么处理时间事件。 * 如果 flags 包含 AE_DONT_WAIT , 那么函数在处理完所有不许阻塞的事件之后,即刻返回。 * 函数的返回值为已处理事件的数量 */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; // 获取最近的时间事件 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { // 如果时间事件存在的话 // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间 long now_sec, now_ms; // 计算距今最近的时间事件还要多久才能达到 // 并将该时间距保存在 tv 结构中 aeGetTime(&now_sec, &now_ms); tvp = &tv; tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; tvp->tv_sec --; } else { tvp->tv_usec = (shortest->when_ms - now_ms)*1000; } // 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞) if (tvp->tv_sec < 0) tvp->tv_sec = 0; if (tvp->tv_usec < 0) tvp->tv_usec = 0; } else { // 执行到这一步,说明没有时间事件 // 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度 /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { // 设置文件事件不阻塞 tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ // 文件事件可以阻塞直到有事件到达为止 tvp = NULL; /* wait forever */ } } // 处理文件事件,阻塞时间由 tvp 决定 numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { // 从已就绪数组中获取事件 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ // 读事件 if (fe->mask & mask & AE_READABLE) { // rfired 确保读/写事件只能执行其中一个 rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } // 写事件 if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } // 执行时间事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; } /* * 获取可执行事件 */ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; // 等待时间 retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); // 有至少一个事件就绪? if (retval > 0) { int j; // 为已就绪事件设置相应的模式 // 并加入到 eventLoop 的 fired 数组中 numevents = retval; for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } // 返回已就绪事件个数 return numevents; }由上面代码可知,因为文件事件是随机出现的,如果等待并处理完一次文件事件之后,仍未有任何时间事件到达,那么服务器将再次等待并处理文件事件。随着文件事件的不断执行,时间会逐渐向时间事件所设置的到达时间逼近,并最终来到到达时间,这时服务器就可以开始处理到达的时间事件了。
2022年05月08日
942 阅读
20 评论
0 点赞
2022-05-08
redis持久化
博客网址:www.shicoder.top微信:kj11011029欢迎加群聊天 :452380935本次主要是对redis中著名的持久化策略进行代码层面描述,主要包括RDB持久化和AOF持久化因为AOF文件的更新频率比RDB高,所以如果服务器开启AOF持久化,redis优先使用AOF文件还原,只有当AOF持久化关闭,才使用RDB文件进行还原RDB持久化RDB持久化主要有两个命令实现:SAVE和BGSAVESAVE、BGSAVESAVE会阻塞redis服务器,知道RDB文件创建完毕void saveCommand(redisClient *c) { // BGSAVE 已经在执行中,不能再执行 SAVE // 否则将产生竞争条件 if (server.rdb_child_pid != -1) { addReplyError(c,"Background save already in progress"); return; } // 执行 if (rdbSave(server.rdb_filename) == REDIS_OK) { addReply(c,shared.ok); } else { addReply(c,shared.err); } }BGSAVE不会阻塞,他会创建一个子进程,由子进程处理RDB文件保存void bgsaveCommand(redisClient *c) { // 不能重复执行 BGSAVE if (server.rdb_child_pid != -1) { addReplyError(c,"Background save already in progress"); // 不能在 BGREWRITEAOF 正在运行时执行 } else if (server.aof_child_pid != -1) { addReplyError(c,"Can't BGSAVE while AOF log rewriting is in progress"); // 执行 BGSAVE } else if (rdbSaveBackground(server.rdb_filename) == REDIS_OK) { addReplyStatus(c,"Background saving started"); } else { addReply(c,shared.err); } } int rdbSaveBackground(char *filename) { pid_t childpid; long long start; // 如果 BGSAVE 已经在执行,那么出错 if (server.rdb_child_pid != -1) return REDIS_ERR; // 记录 BGSAVE 执行前的数据库被修改次数 server.dirty_before_bgsave = server.dirty; // 最近一次尝试执行 BGSAVE 的时间 server.lastbgsave_try = time(NULL); // fork() 开始前的时间,记录 fork() 返回耗时用 start = ustime(); if ((childpid = fork()) == 0) { int retval; /* 子进程 */ // 关闭网络连接 fd closeListeningSockets(0); // 设置进程的标题,方便识别 redisSetProcTitle("redis-rdb-bgsave"); // 执行保存操作 retval = rdbSave(filename); // 打印 copy-on-write 时使用的内存数 if (retval == REDIS_OK) { size_t private_dirty = zmalloc_get_private_dirty(); if (private_dirty) { redisLog(REDIS_NOTICE, "RDB: %zu MB of memory used by copy-on-write", private_dirty/(1024*1024)); } } // 向父进程发送信号 exitFromChild((retval == REDIS_OK) ? 0 : 1); } else { /* 父进程 */ // 计算 fork() 执行的时间 server.stat_fork_time = ustime()-start; // 如果 fork() 出错,那么报告错误 if (childpid == -1) { server.lastbgsave_status = REDIS_ERR; redisLog(REDIS_WARNING,"Can't save in background: fork: %s", strerror(errno)); return REDIS_ERR; } // 打印 BGSAVE 开始的日志 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid); // 记录数据库开始 BGSAVE 的时间 server.rdb_save_time_start = time(NULL); // 记录负责执行 BGSAVE 的子进程 ID server.rdb_child_pid = childpid; // 关闭自动 rehash updateDictResizePolicy(); return REDIS_OK; } return REDIS_OK; /* unreached */ }两个命令内部都是执行rdbSave函数/* * 将数据库保存到磁盘上。 * 保存成功返回 REDIS_OK ,出错/失败返回 REDIS_ERR 。 */ int rdbSave(char *filename) { dictIterator *di = NULL; dictEntry *de; char tmpfile[256]; char magic[10]; int j; long long now = mstime(); FILE *fp; rio rdb; uint64_t cksum; // 创建临时文件 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s", strerror(errno)); return REDIS_ERR; } // 初始化 I/O rioInitWithFile(&rdb,fp); // 设置校验和函数 if (server.rdb_checksum) rdb.update_cksum = rioGenericUpdateChecksum; // 写入 RDB 版本号 snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION); // 写入错误,跳转到werr if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr; // 遍历所有数据库 for (j = 0; j < server.dbnum; j++) { // 指向数据库 redisDb *db = server.db+j; // 指向数据库键空间 dict *d = db->dict; // 跳过空数据库 if (dictSize(d) == 0) continue; // 创建键空间迭代器 di = dictGetSafeIterator(d); if (!di) { fclose(fp); return REDIS_ERR; } /* * 写入 DB 选择器 */ if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr; if (rdbSaveLen(&rdb,j) == -1) goto werr; /* * 遍历数据库,并写入每个键值对的数据 */ while((de = dictNext(di)) != NULL) { sds keystr = dictGetKey(de); robj key, *o = dictGetVal(de); long long expire; // 根据 keystr ,在栈中创建一个 key 对象 initStaticStringObject(key,keystr); // 获取键的过期时间 expire = getExpire(db,&key); // 保存键值对数据 if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr; } dictReleaseIterator(di); } di = NULL; /* So that we don't release it again on error. */ /* * 写入 EOF 代码 */ if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr; /* * CRC64 校验和。 * * 如果校验和功能已关闭,那么 rdb.cksum 将为 0 , * 在这种情况下, RDB 载入时会跳过校验和检查。 */ cksum = rdb.cksum; memrev64ifbe(&cksum); rioWrite(&rdb,&cksum,8); // 冲洗缓存,确保数据已写入磁盘 if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; if (fclose(fp) == EOF) goto werr; /* * 使用 RENAME ,原子性地对临时文件进行改名,覆盖原来的 RDB 文件。 */ if (rename(tmpfile,filename) == -1) { redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno)); unlink(tmpfile); return REDIS_ERR; } // 写入完成,打印日志 redisLog(REDIS_NOTICE,"DB saved on disk"); // 清零数据库脏状态 server.dirty = 0; // 记录最后一次完成 SAVE 的时间 server.lastsave = time(NULL); // 记录最后一次执行 SAVE 的状态 server.lastbgsave_status = REDIS_OK; return REDIS_OK; werr: // 关闭文件 fclose(fp); // 删除文件 unlink(tmpfile); redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno)); if (di) dictReleaseIterator(di); return REDIS_ERR; }RDB文件内容首先给出一个完整的RDB文件的格式后续为描述方便,大写为常量,小写为变量或者数据REDIS 这个其实就是RDB文件的标识符db_version长度4字节,记录RDB文件的版本号,redis3.0一般使用0006(第六版)databases表示任意个数据库EOF表示正文内容结束check_sum校验和,8字节,通过前面4部分内容计算得出下面重点说下databases字段,每个database都是包括如下几个部分。SELECTDB一字节,表示接下来要读一个数据库号码db_number表示一个数据库号码,长度1、2、5字节,当读入该数字后,redis会调用select命令进行数据库切换key_value_pairs表示数据库中所有的键值对数据,其中又分为不带过期时间的键值对,和带过期时间的键值对不带过期的键值对,由TYPE、key、value组成带过期的键值对,由EXPIRETIME_MS、ms、TYPE、key、value组成AOF持久化AOF持久化是通过保存redis服务器在运行期间所执行的写命令进行记录数据,AOF持久化分为命令追加、文件写入、文件同步三个步骤,下面分别对这三个步骤进行阐述命令追加当AOF持久化处于打开的状态,服务器在执行一个写命令之后,会以某种协议的方式将被执行的写命令追加到服务器redisServer中的aof_buf缓冲区末尾文件写入与同步上一次我们说到,redis在运行过程中,是一个事件循环,每次循环执行对应的时间事件和文件事件,因此AOF持久化的写入也在每次事件循环结束后进行,执行函数flushAppendOnlyFilevoid flushAppendOnlyFile(int force) { ssize_t nwritten; int sync_in_progress = 0; // 缓冲区中没有任何内容,直接返回 if (sdslen(server.aof_buf) == 0) return; // 策略为每秒 FSYNC if (server.aof_fsync == AOF_FSYNC_EVERYSEC) // 是否有 SYNC 正在后台进行? sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0; // 每秒 fsync ,并且强制写入为假 if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { /* * 当 fsync 策略为每秒钟一次时, fsync 在后台执行。 * 如果后台仍在执行 FSYNC ,那么我们可以延迟写操作一两秒 * (如果强制执行 write 的话,服务器主线程将阻塞在 write 上面) */ if (sync_in_progress) { // 有 fsync 正在后台进行 。。。 if (server.aof_flush_postponed_start == 0) { /* * 前面没有推迟过 write 操作,这里将推迟写操作的时间记录下来 * 然后就返回,不执行 write 或者 fsync */ server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start < 2) { /* * 如果之前已经因为 fsync 而推迟了 write 操作 * 但是推迟的时间不超过 2 秒,那么直接返回 * 不执行 write 或者 fsync */ return; } /* * 如果后台还有 fsync 在执行,并且 write 已经推迟 >= 2 秒 * 那么执行写操作(write 将被阻塞) */ server.aof_delayed_fsync++; redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); } } /* * 执行到这里,程序会对 AOF 文件进行写入。 * 清零延迟 write 的时间记录 */ server.aof_flush_postponed_start = 0; /* * 执行单个 write 操作,如果写入设备是物理的话,那么这个操作应该是原子的 * * 当然,如果出现像电源中断这样的不可抗现象,那么 AOF 文件也是可能会出现问题的 * 这时就要用 redis-check-aof 程序来进行修复。 */ nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); if (nwritten != (signed)sdslen(server.aof_buf)) { static time_t last_write_error_log = 0; int can_log = 0; // 将日志的记录频率限制在每行 AOF_WRITE_LOG_ERROR_RATE 秒 if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) { can_log = 1; last_write_error_log = server.unixtime; } // 如果写入出错,那么尝试将该情况写入到日志里面 if (nwritten == -1) { if (can_log) { redisLog(REDIS_WARNING,"Error writing to the AOF file: %s", strerror(errno)); server.aof_last_write_errno = errno; } } else { if (can_log) { redisLog(REDIS_WARNING,"Short write while writing to " "the AOF file: (nwritten=%lld, " "expected=%lld)", (long long)nwritten, (long long)sdslen(server.aof_buf)); } // 尝试移除新追加的不完整内容 if (ftruncate(server.aof_fd, server.aof_current_size) == -1) { if (can_log) { redisLog(REDIS_WARNING, "Could not remove short write " "from the append-only file. Redis may refuse " "to load the AOF the next time it starts. " "ftruncate: %s", strerror(errno)); } } else { /* If the ftrunacate() succeeded we can set nwritten to * -1 since there is no longer partial data into the AOF. */ nwritten = -1; } server.aof_last_write_errno = ENOSPC; } // 处理写入 AOF 文件时出现的错误 if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* We can't recover when the fsync policy is ALWAYS since the * reply for the client is already in the output buffers, and we * have the contract with the user that on acknowledged write data * is synched on disk. */ redisLog(REDIS_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting..."); exit(1); } else { /* Recover from failed write leaving data into the buffer. However * set an error to stop accepting writes as long as the error * condition is not cleared. */ server.aof_last_write_status = REDIS_ERR; /* Trim the sds buffer if there was a partial write, and there * was no way to undo it with ftruncate(2). */ if (nwritten > 0) { server.aof_current_size += nwritten; sdsrange(server.aof_buf,nwritten,-1); } return; /* We'll try again on the next call... */ } } else { // 写入成功,更新最后写入状态 if (server.aof_last_write_status == REDIS_ERR) { redisLog(REDIS_WARNING, "AOF write error looks solved, Redis can write again."); server.aof_last_write_status = REDIS_OK; } } // 更新写入后的 AOF 文件大小 server.aof_current_size += nwritten; /* * 如果 AOF 缓存的大小足够小的话,那么重用这个缓存, * 否则的话,释放 AOF 缓存。 */ if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) { // 清空缓存中的内容,等待重用 sdsclear(server.aof_buf); } else { // 释放缓存 sdsfree(server.aof_buf); server.aof_buf = sdsempty(); } /* * 如果 no-appendfsync-on-rewrite 选项为开启状态, * 并且有 BGSAVE 或者 BGREWRITEAOF 正在进行的话, * 那么不执行 fsync */ if (server.aof_no_fsync_on_rewrite && (server.aof_child_pid != -1 || server.rdb_child_pid != -1)) return; // 总是执行 fsnyc if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* aof_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */ // 更新最后一次执行 fsnyc 的时间 server.aof_last_fsync = server.unixtime; // 策略为每秒 fsnyc ,并且距离上次 fsync 已经超过 1 秒 } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { // 放到后台执行 if (!sync_in_progress) aof_background_fsync(server.aof_fd); // 更新最后一次执行 fsync 的时间 server.aof_last_fsync = server.unixtime; } }在上面代码中,我们可以看到执行fsync有几种可能,这些可能性通过appendfsync配置进行决定appendfsync选项的值flushappendonlyfile函数行为always将aof_buf缓冲区所有内容写入并同步到AOF文件everysec将aof buf缓冲区中的所有内容写入到AOF文件,如果上次同步AOF文件的时间距离现在超过一秒钟,那么再次对AOF 文件进行同步,并且这个同步操作是由一个线程专门负责执行的no将aof_buf缓冲区中的所有内容写入到AOF文件,但并不对AOF文件进行同步,何时同步由操作系统来决定AOF重写由AOF写入原理可知,每次执行命令,都会向文件中写入命令,那么这就会导致文件较大,而且对于比如这种情况:先添加一个a键,再删除一个a键,这其实最终的效果是和最初一样的,若将两次执行命令都写入,则其实是没有用的,因此redis采用AOF重写的方式,函数为rewriteAppendOnlyFileBackground/* * 以下是后台重写 AOF 文件(BGREWRITEAOF)的工作步骤: * * 1) 用户调用 BGREWRITEAOF * * 2) Redis 调用这个函数,它执行 fork() : * * 2a) 子进程在临时文件中对 AOF 文件进行重写 * * 2b) 父进程将新输入的写命令追加到 server.aof_rewrite_buf 中 * * 3) 当步骤 2a 执行完之后,子进程结束 * * 4) * 父进程会捕捉子进程的退出信号, * 如果子进程的退出状态是 OK 的话, * 那么父进程将新输入命令的缓存追加到临时文件, * 然后使用 rename(2) 对临时文件改名,用它代替旧的 AOF 文件, * 至此,后台 AOF 重写完成。 */ int rewriteAppendOnlyFileBackground(void) { pid_t childpid; long long start; // 已经有进程在进行 AOF 重写了 if (server.aof_child_pid != -1) return REDIS_ERR; // 记录 fork 开始前的时间,计算 fork 耗时用 start = ustime(); if ((childpid = fork()) == 0) { char tmpfile[256]; /* 子进程 */ // 关闭网络连接 fd closeListeningSockets(0); // 为进程设置名字,方便记认 redisSetProcTitle("redis-aof-rewrite"); // 创建临时文件,并进行 AOF 重写 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) { size_t private_dirty = zmalloc_get_private_dirty(); if (private_dirty) { redisLog(REDIS_NOTICE, "AOF rewrite: %zu MB of memory used by copy-on-write", private_dirty/(1024*1024)); } // 发送重写成功信号 exitFromChild(0); } else { // 发送重写失败信号 exitFromChild(1); } } else { /* 父进程 */ // 记录执行 fork 所消耗的时间 server.stat_fork_time = ustime()-start; if (childpid == -1) { redisLog(REDIS_WARNING, "Can't rewrite append only file in background: fork: %s", strerror(errno)); return REDIS_ERR; } redisLog(REDIS_NOTICE, "Background append only file rewriting started by pid %d",childpid); // 记录 AOF 重写的信息 server.aof_rewrite_scheduled = 0; server.aof_rewrite_time_start = time(NULL); server.aof_child_pid = childpid; // 关闭字典自动 rehash updateDictResizePolicy(); /* * 将 aof_selected_db 设为 -1 , * 强制让 feedAppendOnlyFile() 下次执行时引发一个 SELECT 命令, * 从而确保之后新添加的命令会设置到正确的数据库中 */ server.aof_selected_db = -1; replicationScriptCacheFlush(); return REDIS_OK; } return REDIS_OK; /* unreached */AOF重写的原理,其实是直接读取当前的数据库的值,最后使用一条写语句就可以实现AOF重写而且AOF重写是放在后台子进程执行,这样可以避免效率太低,但是使用子进程执行重写方式,则在重写过程中,父进程还会执行新的写命令,因此这段事件的命令也要被记录下来,最后再次同步给子进程
2022年05月08日
232 阅读
0 评论
0 点赞
首页
复制
搜索
前进
后退
重载网页
和我当邻居