@@ -94,7 +94,7 @@ def failover
94
94
replica_info = rows . find { |row | row . primary_id == primary_info . id }
95
95
96
96
wait_replication_delay ( @clients , replica_size : @replica_size , timeout : @timeout )
97
- replica_info . client . call ( 'CLUSTER' , 'FAILOVER' , 'TAKEOVER' )
97
+ replica_info . client . call_once ( 'CLUSTER' , 'FAILOVER' , 'TAKEOVER' )
98
98
wait_failover (
99
99
@clients ,
100
100
primary_node_key : primary_info . node_key ,
@@ -117,24 +117,24 @@ def start_resharding(slot:, src_node_key:, dest_node_key:)
117
117
dest_host , dest_port = dest_info . node_key . split ( ':' )
118
118
119
119
# @see https://redis.io/commands/cluster-setslot/#redis-cluster-live-resharding-explained
120
- dest_client . call ( 'CLUSTER' , 'SETSLOT' , slot , 'IMPORTING' , src_node_id )
121
- src_client . call ( 'CLUSTER' , 'SETSLOT' , slot , 'MIGRATING' , dest_node_id )
120
+ dest_client . call_once ( 'CLUSTER' , 'SETSLOT' , slot , 'IMPORTING' , src_node_id )
121
+ src_client . call_once ( 'CLUSTER' , 'SETSLOT' , slot , 'MIGRATING' , dest_node_id )
122
122
123
123
db_idx = '0'
124
124
timeout_msec = @timeout . to_i * 1000
125
125
126
- number_of_keys = src_client . call ( 'CLUSTER' , 'COUNTKEYSINSLOT' , slot )
127
- keys = src_client . call ( 'CLUSTER' , 'GETKEYSINSLOT' , slot , number_of_keys )
126
+ number_of_keys = src_client . call_once ( 'CLUSTER' , 'COUNTKEYSINSLOT' , slot )
127
+ keys = src_client . call_once ( 'CLUSTER' , 'GETKEYSINSLOT' , slot , number_of_keys )
128
128
print_debug ( "#{ src_client . config . host } :#{ src_client . config . port } => #{ dest_client . config . host } :#{ dest_client . config . port } ... #{ keys } " )
129
129
return if keys . empty?
130
130
131
131
begin
132
- src_client . call ( 'MIGRATE' , dest_host , dest_port , '' , db_idx , timeout_msec , 'KEYS' , *keys )
132
+ src_client . call_once ( 'MIGRATE' , dest_host , dest_port , '' , db_idx , timeout_msec , 'KEYS' , *keys )
133
133
rescue ::RedisClient ::CommandError => e
134
134
raise unless e . message . start_with? ( 'IOERR' )
135
135
136
136
# retry once
137
- src_client . call ( 'MIGRATE' , dest_host , dest_port , '' , db_idx , timeout_msec , 'REPLACE' , 'KEYS' , *keys )
137
+ src_client . call_once ( 'MIGRATE' , dest_host , dest_port , '' , db_idx , timeout_msec , 'REPLACE' , 'KEYS' , *keys )
138
138
end
139
139
140
140
wait_replication_delay ( @clients , replica_size : @replica_size , timeout : @timeout )
@@ -151,7 +151,7 @@ def finish_resharding(slot:, src_node_key:, dest_node_key:)
151
151
rest = rows . reject { |r | r . replica? || r . client . equal? ( src ) || r . client . equal? ( dest ) } . map ( &:client )
152
152
153
153
( [ dest , src ] + rest ) . each do |cli |
154
- cli . call ( 'CLUSTER' , 'SETSLOT' , slot , 'NODE' , id )
154
+ cli . call_once ( 'CLUSTER' , 'SETSLOT' , slot , 'NODE' , id )
155
155
print_debug ( "#{ cli . config . host } :#{ cli . config . port } ... CLUSTER SETSLOT #{ slot } NODE #{ id } " )
156
156
rescue ::RedisClient ::CommandError => e
157
157
raise unless e . message . start_with? ( 'ERR Please use SETSLOT only with masters.' )
@@ -174,12 +174,12 @@ def scale_out(primary_url:, replica_url:)
174
174
@shard_size += 1
175
175
@number_of_replicas = @replica_size * @shard_size
176
176
177
- primary . call ( 'CLUSTER' , 'MEET' , target_host , target_port )
178
- replica . call ( 'CLUSTER' , 'MEET' , target_host , target_port )
177
+ primary . call_once ( 'CLUSTER' , 'MEET' , target_host , target_port )
178
+ replica . call_once ( 'CLUSTER' , 'MEET' , target_host , target_port )
179
179
wait_meeting ( @clients , max_attempts : @max_attempts )
180
180
181
- primary_id = primary . call ( 'CLUSTER' , 'MYID' )
182
- replica . call ( 'CLUSTER' , 'REPLICATE' , primary_id )
181
+ primary_id = primary . call_once ( 'CLUSTER' , 'MYID' )
182
+ replica . call_once ( 'CLUSTER' , 'REPLICATE' , primary_id )
183
183
save_config ( @clients )
184
184
wait_for_cluster_to_be_ready ( skip_clients : [ primary , replica ] )
185
185
@@ -213,16 +213,16 @@ def scale_in
213
213
threads = @clients . map do |cli |
214
214
Thread . new ( cli ) do |c |
215
215
c . pipelined do |pi |
216
- pi . call ( 'CLUSTER' , 'FORGET' , replica_info . id )
217
- pi . call ( 'CLUSTER' , 'FORGET' , primary_info . id )
216
+ pi . call_once ( 'CLUSTER' , 'FORGET' , replica_info . id )
217
+ pi . call_once ( 'CLUSTER' , 'FORGET' , primary_info . id )
218
218
end
219
219
rescue ::RedisClient ::Error
220
220
# ignore
221
221
end
222
222
end
223
223
threads . each ( &:join )
224
- replica . call ( 'CLUSTER' , 'RESET' , 'SOFT' )
225
- primary . call ( 'CLUSTER' , 'RESET' , 'SOFT' )
224
+ replica . call_once ( 'CLUSTER' , 'RESET' , 'SOFT' )
225
+ primary . call_once ( 'CLUSTER' , 'RESET' , 'SOFT' )
226
226
@clients . reject! { |c | c . equal? ( primary ) || c . equal? ( replica ) }
227
227
@shard_size -= 1
228
228
@number_of_replicas = @replica_size * @shard_size
@@ -266,7 +266,7 @@ def close
266
266
267
267
def flush_all_data ( clients )
268
268
clients . each do |c |
269
- c . call ( 'FLUSHALL' )
269
+ c . call_once ( 'FLUSHALL' )
270
270
print_debug ( "#{ c . config . host } :#{ c . config . port } ... FLUSHALL" )
271
271
rescue ::RedisClient ::CommandError , ::RedisClient ::ReadOnlyError
272
272
# READONLY You can't write against a read only replica.
@@ -277,7 +277,7 @@ def flush_all_data(clients)
277
277
278
278
def reset_cluster ( clients )
279
279
clients . each do |c |
280
- c . call ( 'CLUSTER' , 'RESET' , 'HARD' )
280
+ c . call_once ( 'CLUSTER' , 'RESET' , 'HARD' )
281
281
print_debug ( "#{ c . config . host } :#{ c . config . port } ... CLUSTER RESET HARD" )
282
282
rescue ::RedisClient ::ConnectionError => e
283
283
print_debug ( "#{ c . config . host } :#{ c . config . port } ... CLUSTER RESET HARD: #{ e . class } : #{ e . message } " )
@@ -294,15 +294,15 @@ def assign_slots(clients, shard_size:)
294
294
slot_idx = 0
295
295
primaries . zip ( slot_sizes ) . each do |c , s |
296
296
slot_range = slot_idx ..slot_idx + s - 1
297
- c . call ( 'CLUSTER' , 'ADDSLOTS' , *slot_range . to_a )
297
+ c . call_once ( 'CLUSTER' , 'ADDSLOTS' , *slot_range . to_a )
298
298
slot_idx += s
299
299
print_debug ( "#{ c . config . host } :#{ c . config . port } ... CLUSTER ADDSLOTS #{ slot_range . to_a } " )
300
300
end
301
301
end
302
302
303
303
def save_config_epoch ( clients )
304
304
clients . each_with_index do |c , i |
305
- c . call ( 'CLUSTER' , 'SET-CONFIG-EPOCH' , i + 1 )
305
+ c . call_once ( 'CLUSTER' , 'SET-CONFIG-EPOCH' , i + 1 )
306
306
print_debug ( "#{ c . config . host } :#{ c . config . port } ... CLUSTER SET-CONFIG-EPOCH #{ i + 1 } " )
307
307
rescue ::RedisClient ::CommandError
308
308
# ERR Node config epoch is already non-zero
@@ -315,7 +315,7 @@ def meet_each_other(clients)
315
315
rows = parse_cluster_nodes ( rows )
316
316
target_host , target_port = rows . first . node_key . split ( ':' )
317
317
clients . drop ( 1 ) . each do |c |
318
- c . call ( 'CLUSTER' , 'MEET' , target_host , target_port )
318
+ c . call_once ( 'CLUSTER' , 'MEET' , target_host , target_port )
319
319
print_debug ( "#{ c . config . host } :#{ c . config . port } ... CLUSTER MEET #{ target_host } :#{ target_port } " )
320
320
end
321
321
end
@@ -335,19 +335,19 @@ def replicate(clients, shard_size:, replica_size:)
335
335
replicas = take_replicas ( clients , shard_size : shard_size )
336
336
337
337
replicas . each_slice ( replica_size ) . each_with_index do |subset , i |
338
- primary_id = primaries [ i ] . call ( 'CLUSTER' , 'MYID' )
338
+ primary_id = primaries [ i ] . call_once ( 'CLUSTER' , 'MYID' )
339
339
340
340
loop do
341
341
begin
342
342
subset . each do |replica |
343
- replica . call ( 'CLUSTER' , 'REPLICATE' , primary_id )
343
+ replica . call_once ( 'CLUSTER' , 'REPLICATE' , primary_id )
344
344
print_debug ( "#{ replica . config . host } :#{ replica . config . port } ... CLUSTER REPLICATE #{ primaries [ i ] . config . host } :#{ primaries [ i ] . config . port } " )
345
345
end
346
346
rescue ::RedisClient ::CommandError => e
347
347
print_debug ( e . message )
348
348
# ERR Unknown node [node-id]
349
349
sleep SLEEP_SEC
350
- primary_id = primaries [ i ] . call ( 'CLUSTER' , 'MYID' )
350
+ primary_id = primaries [ i ] . call_once ( 'CLUSTER' , 'MYID' )
351
351
next
352
352
end
353
353
@@ -358,7 +358,7 @@ def replicate(clients, shard_size:, replica_size:)
358
358
359
359
def save_config ( clients )
360
360
clients . each do |c |
361
- c . call ( 'CLUSTER' , 'SAVECONFIG' )
361
+ c . call_once ( 'CLUSTER' , 'SAVECONFIG' )
362
362
print_debug ( "#{ c . config . host } :#{ c . config . port } ... CLUSTER SAVECONFIG" )
363
363
end
364
364
end
@@ -412,7 +412,7 @@ def wait_cluster_recovering(clients, max_attempts:, skip_clients: [])
412
412
key = 0
413
413
wait_for_state ( clients , max_attempts : max_attempts ) do |client |
414
414
print_debug ( "#{ client . config . host } :#{ client . config . port } ... GET #{ key } " )
415
- client . call ( 'GET' , key ) if primary_client? ( client ) && !skip_clients . include? ( client )
415
+ client . call_once ( 'GET' , key ) if primary_client? ( client ) && !skip_clients . include? ( client )
416
416
true
417
417
rescue ::RedisClient ::CommandError => e
418
418
if e . message . start_with? ( 'CLUSTERDOWN' )
@@ -443,11 +443,11 @@ def wait_for_state(clients, max_attempts:)
443
443
end
444
444
445
445
def hashify_cluster_info ( client )
446
- client . call ( 'CLUSTER' , 'INFO' ) . split ( "\r \n " ) . to_h { |v | v . split ( ':' ) }
446
+ client . call_once ( 'CLUSTER' , 'INFO' ) . split ( "\r \n " ) . to_h { |v | v . split ( ':' ) }
447
447
end
448
448
449
449
def fetch_cluster_nodes ( client )
450
- client . call ( 'CLUSTER' , 'NODES' ) . split ( "\n " ) . map ( &:split )
450
+ client . call_once ( 'CLUSTER' , 'NODES' ) . split ( "\n " ) . map ( &:split )
451
451
end
452
452
453
453
def associate_with_clients_and_nodes ( clients )
@@ -502,11 +502,11 @@ def take_replicas(clients, shard_size:)
502
502
end
503
503
504
504
def primary_client? ( client )
505
- client . call ( 'ROLE' ) . first == 'master'
505
+ client . call_once ( 'ROLE' ) . first == 'master'
506
506
end
507
507
508
508
def replica_client? ( client )
509
- client . call ( 'ROLE' ) . first == 'slave'
509
+ client . call_once ( 'ROLE' ) . first == 'slave'
510
510
end
511
511
512
512
def print_debug ( msg )
0 commit comments