集群原理及解析

1、概要

redis集群是redis提供的分布式数据库方案,集群通过分片(sharding)来进行数据共享,并提供复制和故障转移功能

2、节点

一个redis集群通常由多个节点组成,在刚开始的时候,每个节点是相互独立的,它们都处于一个只包含自己的集群当中,要组建一个真正可工作的集群,必须将各个节点连接起来,构成一个包含多个节点的集群。

连接各个节点的工作可以使用CLUSTER MEET命令来完成,命令的格式为

CLUSTER MEET <ip> <port> <cport>

向一个节点node发送CLUSTER MEET命令,可以让node节点与ip和port指定的节点进行握手,当握手成功后,node节点就会将ip和port指定的节点添加到node节点当前所在的集群中。

2.1 启动节点

一个节点就是一个运行在集群模式下的redis服务器,redis服务器在启动时会根据cluster-enabled配置选项是否为yes来决定是否开启服务器的集群模式。

节点会继续使用单机模式中使用的服务器组件

  • 节点会继续使用文件事件处理器来处理命令请求和返回命令回复
  • 节点会继续使用时间事件处理器来执行serverCron函数,而serverCron函数又会调用集群模式下的clusterCron函数。clusterCron函数负责执行在集群模式下需要执行的常规操作,例如向集群中的其他节点发送Gossip消息,检查节点是否断线,或者检查是否需要对下线节点进行自动故障转移等。
  • 节点会继续使用数据库来保存键值对数据,键值对依然会是各种不同类型的对象。
  • 节点会继续使用rdb持久化模块和aof持久化模块来执行持久化工作。
  • 节点会继续使用复制模块来进行节点的复制工作。
  • 节点会继续使用lua脚本环境来的执行客户端输入的lua脚本。

节点会继续使用redisServer结构来保存服务器的状态,使用client结构来保存客户端的状态,至于那此只有在集群模式下用到的数据,节点将它们保存到cluster.h/clusterNode,cluster.h/clusterLink,cluster.h/clusterState结构里面

集群原理及解析

集群原理及解析

集群原理及解析

集群原理及解析

集群模式下会另外创建监听端口10000+port,来处理其它节点的连接请求。

集群原理及解析

同时会创建slot->key映射的radix tree

集群原理及解析

2.2 cluster meet命令的实现

通过向节点a发送cluster meet命令,客户端可以让接收命令的节点a将另一个节点b添加到节点a当前所在的集群里面。

收到命令的节点a将与节点b进行握手,以此来确认彼此的存在,并为将来的进一步通信打好基础。

  1. 节点a会为节点b创建一个clusterNode结构,并将该结构添加到自己的clusterState.nodes字典里面
  2. 节点a将根据 cluster meet命令指定的ip地址和端口号,向节点b发送一条meet消息。
  3. 如果一切顺利,节点b将接收到节点a发送的meet消息,节点b会为节点a创建一个clusterNode结构,并将该结构添加到自己的clusterState.ndoes字典里面。
  4. 之后,节点b将向节点a返回一条pong消息
  5. 如果一切顺利,节点a将接收到节点b返回的pong消息,通过这条pong消息节点a可以知道节点b已经成功地接收到了自己发送的meet消息。
  6. 之后,节点a将向节点b发送一条ping消息(每10次迭代并且从节点中随机选取5个取最早接收到pong的结点或者在超过了超时时间的一半时)
  7. 如果一切顺利,节点b将接收到节点a返回的ping消息,通过这条ping消息节点b可知道节点a已经成功地接收到自己返回的pong消息,握手完成。

之后,节点a会将节点b的信息通过Gossip协议传播给集群中的其他节点,让其他节点也与节点b进行握手,最终,经过一段时间之后,节点b会被集群中的所有节点认识。

3、消息

cluster消息由消息头及消息体组成,消息体有以下几种

  • PING, MEET,PONG
  • FAIL
  • PUBLISH
  • UPDATE
  • MODULE

3.1 消息定义

集群原理及解析

3.2 消息体

消息体用clusterMsgData表示,为union类型

集群原理及解析

3.2.1 ping消息体

集群原理及解析

3.2.2 fail消息体

集群原理及解析

3.2.3 publish消息体

集群原理及解析

3.2.4 update消息体

集群原理及解析

3.2.5 module消息体

集群原理及解析

4、槽指派

redis集群通过分片方式来保存数据库的键值对:集群的整个数据库被分成16384个槽,数据库中的每个键都属于这16384个槽中的其中一个,集群中的每个节点可以处理0个或者最多16384个槽。

当数据库中的16384个槽都有节点在处理时,集群处于上线状态:相反,如果数据库中有任何一个槽没有得到处理,那么集群处于下线状态。

通过向节点发送cluster addslots命令,可以将一个或多个槽指派给节点负责

CLUSTER ADDSLOTS <slot> [slot …]

4.1 记录节点的槽指派信息

clusterNode结构的slots属性和numslot属性记录了节点负责处理哪些槽

slots属性是一个二进制位数组,这个数组的长度为2048个字节,共包含16384个二进制位。

redis以0为起始索引,16384为终止索引,对slots数组的16384个二进制位进行编号,并根据索引i上的二进制位的值来判断节点是否负责处理槽i

4.2 传播节点的槽指派信息

一个节点除了会将自己负责处理的槽记录在clusterNode结构的slots属性和numslots属性之外,还会将自己的slots数组通过消息发送给集群中的其他节点,以此来告诉其他节点自己负责处理哪些槽。

当节点a通过消息从节点b那里接收到节点b的slots数组时,节点a会在自己的clusterState.nodes字典中查找节点b对应的clusterNode结构,并对结构中的slots数组进行保存或者更新。

因为集群中的每个节点都会将自己的slots数组通过消息发送给集群中的其他节点,并且每个接收到slots数组的节点都会将数组保存到相应节点的clusterNode结构里面。因此,集群中的每个节点都会知道数据库中的16384个槽分别被指派给了集群中的哪些节点。

4.3 记录集群所有槽的指派信息

clusterState结构中的slots数组记录了集群中所有16384个槽的指派信息

slots数组包含16384个项,每个数组项都是一个指向clusterNode结构的指针。

5、在集群中执行命令

在对数据库中的16384个槽都执行了指派之后,集群进入一线状态,这时客户端可以向集群中的节点发送数据命令。

当客户端向节点发送与数据库键有关的命令时,接收命令的节点会计算命令要处理的数据库键属于哪个槽,并检查这个槽是否指派给了自己:

  • 如果键所在的槽正好就指派给了当前节点,那么节点直接执行这个命令
  • 如果键所在的槽没有指派给当前节点,那么节点会向客户端返回一个MOVED错误,指引客户端转向至正确的节点,并再次发送之前想要执行的命令。

5.1 计算键属于哪个槽

节点使用对给定键key计算crc-16值,然后与0x3FFF与操作来计算属于哪个槽

5.2 判断槽是否由当前节点负责处理

当节点计算出键所属的槽i之后,节点会检查自己在clusterState.slots数组中的项i,判断键所在的槽是否由自己负责:

  • 如果clusterState.slots[i]等于myself,那么说明 i由当前节点负责,节点可以执行客户端发送的命令
  • 如果clusterState.slots[i]不等于myself,那么说明槽i并非由当前节点负责,节点会根据clusterState.slots[i]指向的clusterNode结构所记录的节点ip和端口号,向客户端返回MOVED错误,指引客户端转向至正在处理槽i的节点。

5.3 MOVED错误

当节点发现键所在的槽并非由自己负责处理的时候,节点就会向客户端返回一个MOVED错误,指引客户端转向至正在负责槽的节点。

MOVED错误的格式为

MOVED <slot> <ip>:<port>

当客户端接收到节点返回的MOVED错误时,客户端会根据MOVED错误中提供的IP地址和端口号,转向至负责处理槽slot的节点,并向该节点重新发送之前想要执行后命令。

一个集群客户端通常会与集群中的多个节点创建套接字连接,而所谓的节点转向实际上就是换一个套接字来发送命令。

如果客户端尚未与想要转向的节点 创建套接字连接,那么客户端会先根据MOVED错误提供的IP地址和端口号来连接节点,然后再进行转向。

5.4 节点数据库的实现

集群节点保存键值对以及键值对过期时间的方式与单机的方式相同,一个区别是节点只能使用0号数据库,而单机没有这个限制。

除了将键值对保存在数据库里面之外,节点还会有clusterState结构中的slots_to_keys类型为rax来保存槽和键之间的关系。

6、重新分片

redis集群的重新分片操作可以将任意数量已经指派给某个节点(源节点)的槽改为指派给另一个节点(目标节点 ),并且相关槽所属的键值对也会从源节点被移到到目标节点。

重新分片操作可以在线进行,在重新分片的过程中,集群不需要 下线,并且 源节点和目标节点都可以继续处理命令请求。

6.1 重新分片的实现原理

redis集群的重新分片操作是由redis的集群管理软件redis-trib负责执行的,redis提供了进行重新分片所需要的所有命令,而redis-trib则通过向源节点和目标节点发送命令进行重新分片操作

最终还是通过redis-cli的集群管理命令来处理的

集群原理及解析

命令如下:

redis-cli –cluster reshard <host>:<port> –cluster-from <node-id> –cluster-to <node-id> –cluster-slots <number of slots> –cluster-yes

客户端对于集群管理命令中的重新分片步骤如下:

  1. 向目标结点发送CLUSTER SETLOG <slot> IMPORTING <source_id>命令,让目标结点准备好从源节点导入属于槽slot的键值对。目标服务器会将clusterState中的importing_slots_from[slot] = source_id对应的节点
  2. 向源节点发送CLUSTER SETLOG <slot> migrating <target_id>命令,让源节点准备好将属于槽slot的键值对迁移到目标节点。源服务器会将clusterState中的migrating_slots_to[slot] = target_id对应的节点
  3. 向源节点发送 CLUSTER GETKEYSINSLOT <slot> <count>命令,获得最多count个属于槽slot的键值对的键名。
  4. 向源节点发送MIGRATE <target_ip> <target_port> "" 0 <time_out> keys <keylist>,将被选中的键原子地从源节点迁移至目标节点
  5. 重复执行3,4,直到源节点保存的所有属于槽slot的键值对都被迁移到目标节点为止。
  6. 向集群中的所有节点发送CLUSTER SETSLOT <slot> node <target_id>命令,将槽slot指派给目标节点,这一指派信息会通过消息发送至整个集群,最终集群中的所有节点都会知道slot已经指派给了目标节点

7、ASK错误

在进行重新分片期间,源节点向目标节点迁移一个槽的过程中,可能会出现这样一种情况:属于被迁移槽的一部分键值对保存在源节点,另外一部分在目标节点。

当客户端向源节点发送一个与数据库键有关的命令,并且命令要处理的数据库键恰好就属于正在被迁移的槽时

  • 源节点会先在自己的数据库里面查找指定 的键,如果找到的话,就直接执行客户端发送的命令
  • 如果源节点没能在自己的数据库里面找到指定的键,那么这个键有可能已经被迁移到目标节点 ,指引客户端转向正在导入槽的目标节点,并再次发送之前想要执行的命令。

7.1 ASK错误

如果节点收到一个关于键key的命令请求,并且 键key所属的槽i正好就指派给了这个节点,那么节点会尝试在自己的数据库里查找键key,如果找到了的话,节点就直接执行客户端发送的命令。

如果节点没有在自己的数据库里找到键key,那么节点会检查自己的clusterState.migrating_slots_to[i],看键key属的槽i是否正在进行迁移,如果槽i的确是在进行迁移 的话,那么 节点会向客户端发送一个ask错误,引导客户端到正在导入槽i的节点去查找键key.

接到ask错误的客户端会根据错误提供 的ip地址和端口号,转向到正在导入槽的目标节点,重新发送原本想要执行的命令。

7.2 ASKING命令

asking命令是开启发送该命令的客户端CLIENT_ASKING标识,返回ok

8 、复制与故障转移

redis集群中的节点分为主节点、从节点,其中主节点用于处理槽,从节点用于复制某个主节点,并在被复制的主节点下线时,代替下线主节点继续处理命令请求。

8.1 设置从节点

向一个节点发送命令:

CLUSTER REPLICATE <node_id>

让接收命令的节点成为node_id所指定节点 的从节点,并开始对主节点进行复制

  • 接收到命令的节点首先会在自己的clusterState.nodes字典中找到node_id对应节点的clusterNode结构,并将myself.slaveof指针指向这个结构
  • 修改myself.flags属性,关闭CLUSTER_NODE_MASTER,打开CLUSTER_NODE_SLAVE
  • 最后,节点会调用 复制代码,并根据myself.slaveof指向的clusterNode结构所保存的ip地址和端口号,对主节点进行复制。因为节点的复制功能和单机redis服务器的复制功能使用了相同的代码,所以让从节点复制主节点相当于向从节点发送命令slaveof <master_ip> <master_port>

一个节点成为从节点,并开始复制某个节点这一信息会通过消息给集群中的其他节点,最终集群中的所有节点都会知道某个从节点正在复制某个主节点。

8.2 故障检测

集群中的每个节点都会定期地向集群中的其他节点发送PING消息,以此来检测对方是否在线,如果接收PING消息的节点没有在规定的时间内, 向发送PING消息的节点返回PONG消息,那么发送PING消息的节点就会将接收PING消息的节点标记为疑似下线CLUSTER_NODE_PFAIL

if (node_delay > server.cluster_node_timeout) { /* Timeout reached. Set the node as possibly failing if it is * not already in this state. */ if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) { serverLog(LL_DEBUG,"*** NODE %.40s possibly failing", node->name); node->flags |= CLUSTER_NODE_PFAIL; update_state = 1; } }

统计集群中标记为CLUSTER_NODE_PFAIL的个数

 clusterNode *node = dictGetVal(de); /* Not interested in reconnecting the link with myself or nodes * for which we have no address. */ if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue; if (node->flags & CLUSTER_NODE_PFAIL) server.cluster->stats_pfail_nodes++; 

在发送PING消息时,会将正常节点信息和PFAIL节点信息放在类型为PING的ClusterMsg中,消息体类型为clusterMsgDataGossip,其中flags存有PFAIL信息。PFAIL节点信息放在正常信息后面。

void clusterSendPing(clusterLink *link, int type) { unsigned char *buf; clusterMsg *hdr; int gossipcount = 0; /* Number of gossip sections added so far. */ int wanted; /* Number of gossip sections we want to append if possible. */ int totlen; /* Total packet length. */ /* freshnodes is the max number of nodes we can hope to append at all: * nodes available minus two (ourself and the node we are sending the * message to). However practically there may be less valid nodes since * nodes in handshake state, disconnected, are not considered. */ int freshnodes = dictSize(server.cluster->nodes)-2; /* How many gossip sections we want to add? 1/10 of the number of nodes * and anyway at least 3. Why 1/10? * * If we have N masters, with N/10 entries, and we consider that in * node_timeout we exchange with each other node at least 4 packets * (we ping in the worst case in node_timeout/2 time, and we also * receive two pings from the host), we have a total of 8 packets * in the node_timeout*2 failure reports validity time. So we have * that, for a single PFAIL node, we can expect to receive the following * number of failure reports (in the specified window of time): * * PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS: * * PROB = probability of being featured in a single gossip entry, * which is 1 / NUM_OF_NODES. * ENTRIES = 10. * TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS. * * If we assume we have just masters (so num of nodes and num of masters * is the same), with 1/10 we always get over the majority, and specifically * 80% of the number of nodes, to account for many masters failing at the * same time. * * Since we have non-voting slaves that lower the probability of an entry * to feature our node, we set the number of entries per packet as * 10% of the total nodes we have. */ wanted = floor(dictSize(server.cluster->nodes)/10); if (wanted < 3) wanted = 3; if (wanted > freshnodes) wanted = freshnodes; /* Include all the nodes in PFAIL state, so that failure reports are * faster to propagate to go from PFAIL to FAIL state. */ int pfail_wanted = server.cluster->stats_pfail_nodes; /* Compute the maximum totlen to allocate our buffer. We'll fix the totlen * later according to the number of gossip sections we really were able * to put inside the packet. */ totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted)); /* Note: clusterBuildMessageHdr() expects the buffer to be always at least * sizeof(clusterMsg) or more. */ if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg); buf = zcalloc(totlen); hdr = (clusterMsg*) buf; /* Populate the header. */ if (link->node && type == CLUSTERMSG_TYPE_PING) link->node->ping_sent = mstime(); clusterBuildMessageHdr(hdr,type); /* Populate the gossip fields */ int maxiterations = wanted*3; while(freshnodes > 0 && gossipcount < wanted && maxiterations--) { dictEntry *de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de); /* Don't include this node: the whole packet header is about us * already, so we just gossip about other nodes. */ if (this == myself) continue; /* PFAIL nodes will be added later. */ if (this->flags & CLUSTER_NODE_PFAIL) continue; /* In the gossip section don't include: * 1) Nodes in HANDSHAKE state. * 3) Nodes with the NOADDR flag set. * 4) Disconnected nodes if they don't have configured slots. */ if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) || (this->link == NULL && this->numslots == 0)) { freshnodes--; /* Technically not correct, but saves CPU. */ continue; } /* Do not add a node we already have. */ if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue; /* Add it */ clusterSetGossipEntry(hdr,gossipcount,this); freshnodes--; gossipcount++; } /* If there are PFAIL nodes, add them at the end. */ if (pfail_wanted) { dictIterator *di; dictEntry *de; di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL && pfail_wanted > 0) { clusterNode *node = dictGetVal(de); if (node->flags & CLUSTER_NODE_HANDSHAKE) continue; if (node->flags & CLUSTER_NODE_NOADDR) continue; if (!(node->flags & CLUSTER_NODE_PFAIL)) continue; clusterSetGossipEntry(hdr,gossipcount,node); freshnodes--; gossipcount++; /* We take the count of the slots we allocated, since the * PFAIL stats may not match perfectly with the current number * of PFAIL nodes. */ pfail_wanted--; } dictReleaseIterator(di); } /* Ready to send... fix the totlen fiend and queue the message in the * output buffer. */ totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen += (sizeof(clusterMsgDataGossip)*gossipcount); hdr->count = htons(gossipcount); hdr->totlen = htonl(totlen); clusterSendMessage(link,buf,totlen); zfree(buf); }

集群中的各个节点会通过互相发送消息的方式来交换集群中各个节点的状态信息。当一个节点A通过消息得知主节点B认为主节点C疑似下线状态时,主节点 A会在自己的clusterState.nodes字典中找到主节点C所对应的clusterNode结构,并将主节点B的下线报告添加到clusterNode的fail_reporst链表里面。

集群原理及解析

如果在一个集群里面,半数以上负责处理槽的主节点都将某个主节点x报告为疑似下线,那么这个主节点x将被标记为已下线,将主节点x标记为已下线的节点会向集群广播一条关于主节点x的FAIL消息,所有收到这条FAIL消息的节点都会立即将主节点x标记为已下线。

集群原理及解析

8.3 故障转移

当一个从节点发现自己正在复制的主节点进入了已下线状态时,从节点开始对下线主节点进行故障转移:

  1. 复制下线主节点的所有从节点里面,会有一个从节点被选中
  2. 当前从节点取消复制变为主节点
  3. 新的主节点会撤销所有对已下线主节点的槽指派,并将这些槽全部指派给自己
  4. 新的主节点向集群广播一条PONG消息,这条PONG消息可以让集群中的其他节点立即知道这个节点已经由从节点变成了主节点,并且这个主节点已经接管了原本由已下线节点负责处理的槽
  5. 新的主节点开始接收和自己负责处理的槽相关的命令请求,故障转移完成

集群原理及解析

8.4 选举新的主节点

8.4.1 集群状态

由ClusterState结构来表示

集群原理及解析

failover_auth_time:上一次或下一次选举时间

failover_auth_count:接收到的投票个数

failover_auth_sent:是否开始投票了

failover_auth_rank:当前从节点的排名(根据复制偏移计算)

failover_auth_epoch:当前选举的时代

新的主节点是通过选举产生的

  1. 集群的配置纪元是一个自增计数器,它的初始值是0
  2. 当集群里的某个节点开始一次故障转移操作时,集群配置纪元的值会被增一
  3. 对于每个配置纪元,集群里每个负责处理槽的主节点都有一次投票的机会,而第一个向主节点要求投票的从节点将获得主节点的投票
  4. 当从节点发现自己正在复制的主节点进行已下线状态时,从节点会向集群广播一条CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST消息,要求所有收到这条消息、并且具有投票权的主节点向这个从节点投票
  5. 如果一个主节点具有投票权,并且这个主节点尚未投票给其他从节点,那么主节点将向要求投票的从节点返回一条CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK消息,表示这个主节点支持从节点成为新的主节点
  6. 每个参与选举的从节点都会接收CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK消息,并根据自己收到了多少条这种消息来统计自己获得了多少主节点的支持
  7. 如果集群里有N个具有投票权的主节点,那么当一个从节点收集到大于等于N/2+1张支持票时,这个从节点就会当选为新的主节点
  8. 因为在每一个配置纪元里面,每个具有投票权的主节点只能投一次票,所以如果有N个主节点进行投票,那么具有大于等于N/2+1张支持票的从节点只会有一个,这确保了新的主节点只有一个
  9. 如果在一个配置纪元里面没有从节点能收集到足够多的支持票,那么 集群进入一个新的配置纪元,并再次进行选举,直到选出新的主节点为止

其流程为

集群原理及解析

原文链接:https://blog.csdn.net/xiexingshishu/article/details/114854381

原创文章,作者:优速盾-小U,如若转载,请注明出处:https://www.cdnb.net/bbs/archives/16399

(0)
上一篇 2022年11月16日
下一篇 2022年11月16日

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

优速盾注册领取大礼包www.cdnb.net
/sitemap.xml