Skip to content

Commit 11d5ff5

Browse files
committed
合并上游代码
2 parents 8935d54 + befcf62 commit 11d5ff5

File tree

5 files changed

+82
-26
lines changed

5 files changed

+82
-26
lines changed

src/cluster.c

+28-12
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,8 @@ clusterNode *createClusterNode(char *nodename, int flags) {
535535
node->port = 0;
536536
node->fail_reports = listCreate();
537537
node->voted_time = 0;
538+
node->repl_offset_time = 0;
539+
node->repl_offset = 0;
538540
listSetFreeMethod(node->fail_reports,zfree);
539541

540542
return node;
@@ -2325,25 +2327,24 @@ void clusterBroadcastMessage(void *buf, size_t len) {
23252327
// 构建信息
23262328
void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
23272329
int totlen = 0;
2328-
clusterNode *master;
2330+
uint64_t offset;
2331+
clusterNode *master, *myself = server.cluster->myself;
23292332

23302333
/* If this node is a master, we send its slots bitmap and configEpoch.
23312334
*
23322335
* 如果这是一个主节点,那么发送该节点的槽 bitmap 和配置纪元。
23332336
*
23342337
* If this node is a slave we send the master's information instead (the
23352338
* node is flagged as slave so the receiver knows that it is NOT really
2336-
* in charge for this slots.
2337-
*
2339+
* in charge for this slots.
23382340
* 如果这是一个从节点,
23392341
* 那么发送这个节点的主节点的槽 bitmap 和配置纪元。
23402342
*
23412343
* 因为接收信息的节点通过标识可以知道这个节点是一个从节点,
23422344
* 所以接收信息的节点不会将从节点错认作是主节点。
23432345
*/
2344-
master = (server.cluster->myself->flags & REDIS_NODE_SLAVE &&
2345-
server.cluster->myself->slaveof) ?
2346-
server.cluster->myself->slaveof : server.cluster->myself;
2346+
master = (myself->flags & REDIS_NODE_SLAVE && myself->slaveof) ?
2347+
myself->slaveof : myself;
23472348

23482349
// 清零信息头
23492350
memset(hdr,0,sizeof(*hdr));
@@ -2352,24 +2353,23 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
23522353
hdr->type = htons(type);
23532354

23542355
// 设置信息发送者
2355-
memcpy(hdr->sender,server.cluster->myself->name,REDIS_CLUSTER_NAMELEN);
2356+
memcpy(hdr->sender,myself->name,REDIS_CLUSTER_NAMELEN);
23562357

23572358
// 设置当前节点负责的槽
23582359
memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
23592360

23602361
// 清零 slaveof 域
23612362
memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN);
2363+
23622364
// 如果节点是从节点的话,那么设置 slaveof 域
2363-
if (server.cluster->myself->slaveof != NULL) {
2364-
memcpy(hdr->slaveof,server.cluster->myself->slaveof->name,
2365-
REDIS_CLUSTER_NAMELEN);
2366-
}
2365+
if (myself->slaveof != NULL)
2366+
memcpy(hdr->slaveof,myself->slaveof->name, REDIS_CLUSTER_NAMELEN);
23672367

23682368
// 设置端口号
23692369
hdr->port = htons(server.port);
23702370

23712371
// 设置标识
2372-
hdr->flags = htons(server.cluster->myself->flags);
2372+
hdr->flags = htons(myself->flags);
23732373

23742374
// 设置状态
23752375
hdr->state = server.cluster->state;
@@ -2380,6 +2380,22 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
23802380
// 设置主节点当前配置纪元
23812381
hdr->configEpoch = htonu64(master->configEpoch);
23822382

2383+
/* Set the replication offset. */
2384+
// 设置复制偏移量
2385+
if (myself->flags & REDIS_NODE_SLAVE) {
2386+
if (server.master)
2387+
offset = server.master->reploff;
2388+
else if (server.cached_master)
2389+
offset = server.cached_master->reploff;
2390+
else
2391+
offset = 0;
2392+
} else {
2393+
offset = server.master_repl_offset;
2394+
}
2395+
hdr->offset = htonu64(offset);
2396+
2397+
/* Compute the message length for certain messages. For other messages
2398+
* this is up to the caller. */
23832399
// 计算信息的长度
23842400
if (type == CLUSTERMSG_TYPE_FAIL) {
23852401
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);

src/cluster.h

+18-7
Original file line numberDiff line numberDiff line change
@@ -143,16 +143,22 @@ struct clusterNode {
143143
struct clusterNode *slaveof; /* pointer to the master node */
144144

145145
// 最后一次发送 PING 命令的时间
146-
mstime_t ping_sent; /* Unix time we sent latest ping */
146+
mstime_t ping_sent; /* Unix time we sent latest ping */
147147

148148
// 最后一次接收 PONG 回复的时间戳
149-
mstime_t pong_received; /* Unix time we received the pong */
149+
mstime_t pong_received; /* Unix time we received the pong */
150150

151151
// 最后一次被设置为 FAIL 状态的时间
152-
mstime_t fail_time; /* Unix time when FAIL flag was set */
152+
mstime_t fail_time; /* Unix time when FAIL flag was set */
153153

154154
// 最后一次给某个从节点投票的时间
155-
mstime_t voted_time; /* Last time we voted for a slave of this master */
155+
mstime_t voted_time; /* Last time we voted for a slave of this master */
156+
157+
// 最后一次从这个节点接收到复制偏移量的时间
158+
mstime_t repl_offset_time; /* Unix time we received offset for this node */
159+
160+
// 这个节点的复制偏移量
161+
long long repl_offset; /* Last known repl offset for this node. */
156162

157163
// 节点的 IP 地址
158164
char ip[REDIS_IP_STR_LEN]; /* Latest known IP address of this node */
@@ -391,8 +397,13 @@ typedef struct {
391397

392398
// 如果消息发送者是一个主节点,那么这里记录的是消息发送者的配置纪元
393399
// 如果消息发送者是一个从节点,那么这里记录的是消息发送者正在复制的主节点的配置纪元
394-
uint64_t configEpoch; /* The config epoch if it's a master, or the last epoch
395-
advertised by its master if it is a slave. */
400+
uint64_t configEpoch; /* The config epoch if it's a master, or the last
401+
epoch advertised by its master if it is a
402+
slave. */
403+
404+
// 节点的复制偏移量
405+
uint64_t offset; /* Master replication offset if node is a master or
406+
processed replication offset if node is a slave. */
396407

397408
// 消息发送者的名字(ID)
398409
char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node */
@@ -425,7 +436,7 @@ typedef struct {
425436

426437
#define CLUSTERMSG_MIN_LEN (sizeof(clusterMsg)-sizeof(union clusterMsgData))
427438

428-
/* ----------------------- API exported outside cluster.c ------------------------- */
439+
/* ---------------------- API exported outside cluster.c -------------------- */
429440
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
430441

431442
#endif /* __REDIS_CLUSTER_H */

src/networking.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -890,7 +890,7 @@ void freeClient(redisClient *c) {
890890
}
891891

892892
/* Log link disconnection with slave */
893-
if (c->flags & REDIS_SLAVE) {
893+
if ((c->flags & REDIS_SLAVE) && !(c->flags & REDIS_MONITOR)) {
894894
char ip[REDIS_IP_STR_LEN];
895895

896896
if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) {

src/redis-trib.rb

+34-5
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,34 @@ def delnode_cluster_cmd(argv,opt)
912912
node.r.shutdown
913913
end
914914

915+
def set_timeout_cluster_cmd(argv,opt)
916+
timeout = argv[1].to_i
917+
if timeout < 100
918+
puts "Setting a node timeout of less than 100 milliseconds is a bad idea."
919+
exit 1
920+
end
921+
922+
# Load cluster information
923+
load_cluster_info_from_node(argv[0])
924+
ok_count = 0
925+
err_count = 0
926+
927+
# Send CLUSTER FORGET to all the nodes but the node to remove
928+
xputs ">>> Reconfiguring node timeout in every cluster node..."
929+
@nodes.each{|n|
930+
begin
931+
n.r.config("set","cluster-node-timeout",timeout)
932+
n.r.config("rewrite")
933+
ok_count += 1
934+
xputs "*** New timeout set for #{n}"
935+
rescue => e
936+
puts "ERR setting node-timeot for #{n}: #{e}"
937+
err_count += 1
938+
end
939+
}
940+
xputs ">>> New node timeout set. #{ok_count} OK, #{err_count} ERR."
941+
end
942+
915943
def help_cluster_cmd(argv,opt)
916944
show_help
917945
exit 0
@@ -952,8 +980,9 @@ def parse_options(cmd)
952980
"check" => ["check_cluster_cmd", 2, "host:port"],
953981
"fix" => ["fix_cluster_cmd", 2, "host:port"],
954982
"reshard" => ["reshard_cluster_cmd", 2, "host:port"],
955-
"addnode" => ["addnode_cluster_cmd", 3, "new_host:new_port existing_host:existing_port"],
956-
"delnode" => ["delnode_cluster_cmd", 3, "host:port node_id"],
983+
"add-node" => ["addnode_cluster_cmd", 3, "new_host:new_port existing_host:existing_port"],
984+
"del-node" => ["delnode_cluster_cmd", 3, "host:port node_id"],
985+
"set-timeout" => ["set_timeout_cluster_cmd", 3, "host:port milliseconds"],
957986
"help" => ["help_cluster_cmd", 1, "(show this help)"]
958987
}
959988

@@ -966,14 +995,14 @@ def show_help
966995
puts "Usage: redis-trib <command> <options> <arguments ...>\n\n"
967996
COMMANDS.each{|k,v|
968997
o = ""
969-
puts " #{k.ljust(10)} #{v[2]}"
998+
puts " #{k.ljust(15)} #{v[2]}"
970999
if ALLOWED_OPTIONS[k]
9711000
ALLOWED_OPTIONS[k].each{|optname,has_arg|
972-
puts " --#{optname}" + (has_arg ? " <arg>" : "")
1001+
puts " --#{optname}" + (has_arg ? " <arg>" : "")
9731002
}
9741003
end
9751004
}
976-
puts "\nFor check, fix, reshard, delnode, you can specify host:port of any working node.\n"
1005+
puts "\nFor check, fix, reshard, del-node, set-timeout you can specify the host and port of any working node in the cluster.\n"
9771006
end
9781007

9791008
# Sanity check

src/redis.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -2557,7 +2557,7 @@ int processCommand(redisClient *c) {
25572557
if (server.stop_writes_on_bgsave_err &&
25582558
server.saveparamslen > 0
25592559
&& server.lastbgsave_status == REDIS_ERR &&
2560-
server.masterhost != NULL &&
2560+
server.masterhost == NULL &&
25612561
(c->cmd->flags & REDIS_CMD_WRITE ||
25622562
c->cmd->proc == pingCommand))
25632563
{

0 commit comments

Comments
 (0)