Skip to content

Commit 4eba87e

Browse files
Assign a new node after calling update_cluster_info!
If we catch a connection error and refresh the cluster topology, we need to re-calculate what node to send the command to in the router; the node we're using might not even be a valid node any longer.
1 parent b272f19 commit 4eba87e

File tree

4 files changed

+98
-10
lines changed

4 files changed

+98
-10
lines changed

lib/redis_client/cluster/optimistic_locking.rb

+15-6
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@ def watch(keys)
1515
slot = find_slot(keys)
1616
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil?
1717

18-
# We have not yet selected a node for this transaction, initially, which means we can handle
19-
# redirections freely initially (i.e. for the first WATCH call)
20-
node = @router.find_primary_node_by_slot(slot)
21-
handle_redirection(node, retry_count: 1) do |nd|
18+
handle_redirection(slot, retry_count: 1) do |nd|
2219
nd.with do |c|
2320
c.ensure_connected_cluster_scoped(retryable: false) do
2421
c.call('ASKING') if @asking
@@ -39,10 +36,22 @@ def watch(keys)
3936

4037
private
4138

42-
def handle_redirection(node, retry_count: 1, &blk)
43-
@router.handle_redirection(node, retry_count: retry_count) do |nd|
39+
def handle_redirection(slot, retry_count: 1, &blk)
40+
# We have not yet selected a node for this transaction, initially, which means we can handle
41+
# redirections freely initially (i.e. for the first WATCH call)
42+
node = @router.find_primary_node_by_slot(slot)
43+
times_block_executed = 0
44+
@router.handle_redirection(node, nil, retry_count: retry_count) do |nd|
45+
times_block_executed += 1
4446
handle_asking_once(nd, &blk)
4547
end
48+
rescue ::RedisClient::ConnectionError
49+
# Deduct the number of retries that happened _inside_ router#handle_redirection from our remaining
50+
# _external_ retries. Always deduct at least one in case handle_redirection raises without trying the block.
51+
retry_count -= [times_block_executed, 1].min
52+
raise if retry_count < 0
53+
54+
retry
4655
end
4756

4857
def handle_asking_once(node)

lib/redis_client/cluster/router.rb

+16-4
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
7979

8080
# @see https://redis.io/docs/reference/cluster-spec/#redirection-and-resharding Redirection and resharding
8181
def try_send(node, method, command, args, retry_count: 3, &block)
82-
handle_redirection(node, retry_count: retry_count) do |on_node|
82+
handle_redirection(node, command, retry_count: retry_count) do |on_node|
8383
if args.empty?
8484
# prevent memory allocation for variable-length args
8585
on_node.public_send(method, command, &block)
@@ -90,12 +90,12 @@ def try_send(node, method, command, args, retry_count: 3, &block)
9090
end
9191

9292
def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block)
93-
handle_redirection(node, retry_count: retry_count) do |on_node|
93+
handle_redirection(node, nil, retry_count: retry_count) do |on_node|
9494
on_node.public_send(method, *args, **kwargs, &block)
9595
end
9696
end
9797

98-
def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
98+
def handle_redirection(node, command, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
9999
yield node
100100
rescue ::RedisClient::CircuitBreaker::OpenCircuitError
101101
raise
@@ -124,7 +124,19 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me
124124

125125
update_cluster_info!
126126

127-
raise if retry_count <= 0
127+
# if command.nil?, then we don't have a way to know what node this command should be done on
128+
# under the new topology. It might be e.g. part of a transaction which needs to be retried at a
129+
# higher level.
130+
raise if retry_count <= 0 || command.nil?
131+
132+
# Find the node to use for this command - if this fails for some reason, though, re-raise
133+
# the original connection error.
134+
node = begin
135+
find_node(find_node_key(command), retry_count: 0)
136+
rescue StandardError
137+
nil
138+
end
139+
raise unless node
128140

129141
retry_count -= 1
130142
retry

test/cluster_controller.rb

+2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ def initialize(node_addrs,
5252
@debug = ENV.fetch('DEBUG', '0')
5353
end
5454

55+
attr_reader :clients
56+
5557
def wait_for_cluster_to_be_ready
5658
print_debug('wait for nodes to be recognized...')
5759
wait_meeting(@clients, max_attempts: @max_attempts)

test/test_against_cluster_broken.rb

+65
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# frozen_string_literal: true
22

33
require 'testing_helper'
4+
require 'securerandom'
45

56
class TestAgainstClusterBroken < TestingWrapper
67
WAIT_SEC = 3
@@ -34,8 +35,55 @@ def test_a_primary_is_down
3435
do_test_a_node_is_down(sacrifice, number_of_keys: 10)
3536
end
3637

38+
def test_reloading_on_connection_error
39+
sacrifice = @controller.select_sacrifice_of_primary
40+
# Find a key which lives on the sacrifice node
41+
test_key = generate_key_for_node(sacrifice)
42+
@client.call('SET', test_key, 'foobar1')
43+
44+
# Shut the node down.
45+
kill_a_node_and_wait_for_failover(sacrifice)
46+
47+
# When we try and fetch the key, it'll attempt to connect to the broken node, and
48+
# thus trigger a reload of the cluster topology.
49+
assert_equal 'OK', @client.call('SET', test_key, 'foobar2')
50+
end
51+
52+
def test_transaction_retry_on_connection_error
53+
sacrifice = @controller.select_sacrifice_of_primary
54+
# Find a key which lives on the sacrifice node
55+
test_key = generate_key_for_node(sacrifice)
56+
@client.call('SET', test_key, 'foobar1')
57+
58+
call_count = 0
59+
# Begin a transaction, but shut the node down after the WATCH is issued
60+
res = @client.multi(watch: [test_key]) do |tx|
61+
kill_a_node_and_wait_for_failover(sacrifice) if call_count == 0
62+
call_count += 1
63+
tx.call('SET', test_key, 'foobar2')
64+
end
65+
66+
# The transaction should have retried once and successfully completed
67+
# the second time.
68+
assert_equal ['OK'], res
69+
assert_equal 'foobar2', @client.call('GET', test_key)
70+
assert_equal 2, call_count
71+
end
72+
3773
private
3874

75+
def generate_key_for_node(conn)
76+
# Figure out a slot on the the sacrifice node, and a key in that slot.
77+
conn_id = conn.call('CLUSTER', 'MYID')
78+
conn_slots = conn.call('CLUSTER', 'SLOTS')
79+
.select { |res| res[2][2] == conn_id }
80+
.flat_map { |res| (res[0]..res[1]).to_a }
81+
loop do
82+
test_key = SecureRandom.hex
83+
return test_key if conn_slots.include?(conn.call('CLUSTER', 'KEYSLOT', test_key))
84+
end
85+
end
86+
3987
def wait_for_replication
4088
client_side_timeout = TEST_TIMEOUT_SEC + 1.0
4189
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
@@ -78,6 +126,23 @@ def kill_a_node(sacrifice, kill_attempts:)
78126
assert_raises(::RedisClient::ConnectionError) { sacrifice.call('PING') }
79127
end
80128

129+
def kill_a_node_and_wait_for_failover(sacrifice)
130+
other_client = @controller.clients.reject { _1 == sacrifice }.first
131+
sacrifice_id = sacrifice.call('CLUSTER', 'MYID')
132+
kill_a_node(sacrifice, kill_attempts: 10)
133+
failover_checks = 0
134+
loop do
135+
raise 'Timed out waiting for failover in kill_a_node_and_wait_for_failover' if failover_checks > 30
136+
137+
# Wait for the sacrifice node to not be a primary according to CLUSTER SLOTS.
138+
cluster_slots = other_client.call('CLUSTER', 'SLOTS')
139+
break unless cluster_slots.any? { _1[2][2] == sacrifice_id }
140+
141+
sleep 1
142+
failover_checks += 1
143+
end
144+
end
145+
81146
def wait_for_cluster_to_be_ready(wait_attempts:)
82147
loop do
83148
break if wait_attempts <= 0 || @client.call('PING') == 'PONG'

0 commit comments

Comments
 (0)