@@ -850,10 +850,15 @@ void setClusterNodeToInboundClusterLink(clusterNode *node, clusterLink *link) {
850
850
/* A peer may disconnect and then reconnect with us, and it's not guaranteed that
851
851
* we would always process the disconnection of the existing inbound link before
852
852
* accepting a new existing inbound link. Therefore, it's possible to have more than
853
- * one inbound link from the same node at the same time. */
853
+ * one inbound link from the same node at the same time. Our cleanup logic assumes
854
+ * a one to one relationship between nodes and inbound links, so we need to kill
855
+ * one of the links. The existing link is more likely the outdated one, but it's
856
+ * possible the the other node may need to open another link. */
854
857
serverLog (LL_DEBUG , "Replacing inbound link fd %d from node %.40s with fd %d" ,
855
858
node -> inbound_link -> conn -> fd , node -> name , link -> conn -> fd );
859
+ freeClusterLink (node -> inbound_link );
856
860
}
861
+ serverAssert (!node -> inbound_link );
857
862
node -> inbound_link = link ;
858
863
link -> node = node ;
859
864
}
@@ -1810,12 +1815,18 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
1810
1815
/* IP -> string conversion. 'buf' is supposed to at least be 46 bytes.
1811
1816
* If 'announced_ip' length is non-zero, it is used instead of extracting
1812
1817
* the IP from the socket peer address. */
1813
- void nodeIp2String (char * buf , clusterLink * link , char * announced_ip ) {
1818
+ int nodeIp2String (char * buf , clusterLink * link , char * announced_ip ) {
1814
1819
if (announced_ip [0 ] != '\0' ) {
1815
1820
memcpy (buf ,announced_ip ,NET_IP_STR_LEN );
1816
1821
buf [NET_IP_STR_LEN - 1 ] = '\0' ; /* We are not sure the input is sane. */
1822
+ return C_OK ;
1817
1823
} else {
1818
- connPeerToString (link -> conn , buf , NET_IP_STR_LEN , NULL );
1824
+ if (connPeerToString (link -> conn , buf , NET_IP_STR_LEN , NULL ) == C_ERR ) {
1825
+ serverLog (LL_NOTICE , "Error converting peer IP to string: %s" ,
1826
+ link -> conn ? connGetLastError (link -> conn ) : "no link" );
1827
+ return C_ERR ;
1828
+ }
1829
+ return C_OK ;
1819
1830
}
1820
1831
}
1821
1832
@@ -1847,7 +1858,11 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
1847
1858
* it is safe to call during packet processing. */
1848
1859
if (link == node -> link ) return 0 ;
1849
1860
1850
- nodeIp2String (ip ,link ,hdr -> myip );
1861
+ /* If the peer IP is unavailable for some reasons like invalid fd or closed
1862
+ * link, just give up the update this time, and the update will be retried
1863
+ * in the next round of PINGs */
1864
+ if (nodeIp2String (ip ,link ,hdr -> myip ) == C_ERR ) return 0 ;
1865
+
1851
1866
if (node -> port == port && node -> cport == cport && node -> pport == pport &&
1852
1867
strcmp (ip ,node -> ip ) == 0 ) return 0 ;
1853
1868
@@ -2000,7 +2015,13 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
2000
2015
clusterDoBeforeSleep (CLUSTER_TODO_SAVE_CONFIG |
2001
2016
CLUSTER_TODO_UPDATE_STATE |
2002
2017
CLUSTER_TODO_FSYNC_CONFIG );
2003
- } else if (myself -> slaveof && myself -> slaveof -> slaveof ) {
2018
+ } else if (myself -> slaveof && myself -> slaveof -> slaveof &&
2019
+ /* In some rare case when CLUSTER FAILOVER TAKEOVER is used, it
2020
+ * can happen that myself is a replica of a replica of myself. If
2021
+ * this happens, we do nothing to avoid a crash and wait for the
2022
+ * admin to repair the cluster. */
2023
+ myself -> slaveof -> slaveof != myself )
2024
+ {
2004
2025
/* Safeguard against sub-replicas. A replica's master can turn itself
2005
2026
* into a replica if its last slot is removed. If no other node takes
2006
2027
* over the slot, there is nothing else to trigger replica migration. */
@@ -2337,7 +2358,7 @@ int clusterProcessPacket(clusterLink *link) {
2337
2358
* 传入的 nodename 是 NULL,会随机生成一个节点名来赋值给 node->name */
2338
2359
node = createClusterNode (NULL ,CLUSTER_NODE_HANDSHAKE );
2339
2360
/* 发送方的端口信息我们是知道了,进行填充,缓冲区中有 */
2340
- nodeIp2String (node -> ip ,link ,hdr -> myip );
2361
+ serverAssert ( nodeIp2String (node -> ip ,link ,hdr -> myip ) == C_OK );
2341
2362
node -> port = ntohs (hdr -> port );
2342
2363
node -> pport = ntohs (hdr -> pport );
2343
2364
node -> cport = ntohs (hdr -> cport );
@@ -7074,7 +7095,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
7074
7095
* slot migration, the channel will be served from the source
7075
7096
* node until the migration completes with CLUSTER SETSLOT <slot>
7076
7097
* NODE <node-id>. */
7077
- int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY ;
7098
+ int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE ;
7078
7099
if ((migrating_slot || importing_slot ) && !is_pubsubshard )
7079
7100
{
7080
7101
if (lookupKeyReadWithFlags (& server .db [0 ], thiskey , flags ) == NULL ) missing_keys ++ ;
@@ -7088,6 +7109,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
7088
7109
* without redirections or errors in all the cases. */
7089
7110
if (n == NULL ) return myself ;
7090
7111
7112
+ uint64_t cmd_flags = getCommandFlags (c );
7091
7113
/* Cluster is globally down but we got keys? We only serve the request
7092
7114
* if it is a read command and when allow_reads_when_down is enabled. */
7093
7115
if (server .cluster -> state != CLUSTER_OK ) {
@@ -7101,7 +7123,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
7101
7123
* cluster is down. */
7102
7124
if (error_code ) * error_code = CLUSTER_REDIR_DOWN_STATE ;
7103
7125
return NULL ;
7104
- } else if (cmd -> flags & CMD_WRITE ) {
7126
+ } else if (cmd_flags & CMD_WRITE ) {
7105
7127
/* The cluster is configured to allow read only commands */
7106
7128
if (error_code ) * error_code = CLUSTER_REDIR_DOWN_RO_STATE ;
7107
7129
return NULL ;
@@ -7139,7 +7161,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
7139
7161
* involves multiple keys and we don't have them all, the only option is
7140
7162
* to send a TRYAGAIN error. */
7141
7163
if (importing_slot &&
7142
- (c -> flags & CLIENT_ASKING || cmd -> flags & CMD_ASKING ))
7164
+ (c -> flags & CLIENT_ASKING || cmd_flags & CMD_ASKING ))
7143
7165
{
7144
7166
if (multiple_keys && missing_keys ) {
7145
7167
if (error_code ) * error_code = CLUSTER_REDIR_UNSTABLE ;
@@ -7152,7 +7174,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
7152
7174
/* Handle the read-only client case reading from a slave: if this
7153
7175
* node is a slave and the request is about a hash slot our master
7154
7176
* is serving, we can reply without redirection. */
7155
- int is_write_command = (c -> cmd -> flags & CMD_WRITE ) ||
7177
+ int is_write_command = (cmd_flags & CMD_WRITE ) ||
7156
7178
(c -> cmd -> proc == execCommand && (c -> mstate .cmd_flags & CMD_WRITE ));
7157
7179
if (((c -> flags & CLIENT_READONLY ) || is_pubsubshard ) &&
7158
7180
!is_write_command &&
0 commit comments