Skip to content

Commit 0703ce5

Browse files
KJTsanaktsidisKJ Tsanaktsidis
authored and
KJ Tsanaktsidis
committed
Assign a new node after calling update_cluster_info!
If we catch a connection error and refresh the cluster topology, we need to re-calculate what node to send the command to in the router; the node we're using might not even be a valid node any longer.
1 parent 5a5f84c commit 0703ce5

File tree

4 files changed

+96
-9
lines changed

4 files changed

+96
-9
lines changed

lib/redis_client/cluster/optimistic_locking.rb

+15-6
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@ def watch(keys) # rubocop:disable Metrics/AbcSize
1515
slot = find_slot(keys)
1616
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil?
1717

18-
# We have not yet selected a node for this transaction, initially, which means we can handle
19-
# redirections freely initially (i.e. for the first WATCH call)
20-
node = @router.find_primary_node_by_slot(slot)
21-
handle_redirection(node, retry_count: 1) do |nd|
18+
handle_redirection(slot, retry_count: 1) do |nd|
2219
nd.with do |c|
2320
c.ensure_connected_cluster_scoped(retryable: false) do
2421
c.call('ASKING') if @asking
@@ -45,10 +42,22 @@ def watch(keys) # rubocop:disable Metrics/AbcSize
4542

4643
private
4744

48-
def handle_redirection(node, retry_count: 1, &blk)
49-
@router.handle_redirection(node, retry_count: retry_count) do |nd|
45+
def handle_redirection(slot, retry_count: 1, &blk)
46+
# We have not yet selected a node for this transaction, initially, which means we can handle
47+
# redirections freely initially (i.e. for the first WATCH call)
48+
node = @router.find_primary_node_by_slot(slot)
49+
times_block_executed = 0
50+
@router.handle_redirection(node, nil, retry_count: retry_count) do |nd|
51+
times_block_executed += 1
5052
handle_asking_once(nd, &blk)
5153
end
54+
rescue ::RedisClient::ConnectionError
55+
# Deduct the number of retries that happened _inside_ router#handle_redirection from our remaining
56+
# _external_ retries. Always deduct at least one in case handle_redirection raises without trying the block.
57+
retry_count -= [times_block_executed, 1].min
58+
raise if retry_count < 0
59+
60+
retry
5261
end
5362

5463
def handle_asking_once(node)

lib/redis_client/cluster/router.rb

+14-3
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
9090

9191
# @see https://redis.io/docs/reference/cluster-spec/#redirection-and-resharding Redirection and resharding
9292
def try_send(node, method, command, args, retry_count: 3, &block)
93-
handle_redirection(node, retry_count: retry_count) do |on_node|
93+
handle_redirection(node, command, retry_count: retry_count) do |on_node|
9494
if args.empty?
9595
# prevent memory allocation for variable-length args
9696
on_node.public_send(method, command, &block)
@@ -101,12 +101,12 @@ def try_send(node, method, command, args, retry_count: 3, &block)
101101
end
102102

103103
def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block)
104-
handle_redirection(node, retry_count: retry_count) do |on_node|
104+
handle_redirection(node, nil, retry_count: retry_count) do |on_node|
105105
on_node.public_send(method, *args, **kwargs, &block)
106106
end
107107
end
108108

109-
def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
109+
def handle_redirection(node, command, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
110110
yield node
111111
rescue ::RedisClient::CircuitBreaker::OpenCircuitError
112112
raise
@@ -134,6 +134,17 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me
134134

135135
retry_count -= 1
136136
renew_cluster_state
137+
138+
if retry_count >= 0
139+
# Find the node to use for this command - if this fails for some reason, though, re-use
140+
# the old node.
141+
begin
142+
node = find_node(find_node_key(command)) if command
143+
rescue StandardError # rubocop:disable Lint/SuppressedException
144+
end
145+
retry
146+
end
147+
137148
retry if retry_count >= 0
138149
raise
139150
end

test/cluster_controller.rb

+2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ def initialize(node_addrs,
5858
@debug = ENV.fetch('DEBUG', '0')
5959
end
6060

61+
attr_reader :clients
62+
6163
def wait_for_cluster_to_be_ready(skip_clients: [])
6264
print_debug('wait for nodes to be recognized...')
6365
wait_meeting(@clients, max_attempts: @max_attempts)

test/test_against_cluster_broken.rb

+65
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
require 'logger'
44
require 'json'
55
require 'testing_helper'
6+
require 'securerandom'
67

78
class TestAgainstClusterBroken < TestingWrapper
89
WAIT_SEC = 0.1
@@ -54,6 +55,41 @@ def test_client_patience
5455
do_assertions(offset: 3)
5556
end
5657

58+
def test_reloading_on_connection_error
59+
sacrifice = @controller.select_sacrifice_of_primary
60+
# Find a key which lives on the sacrifice node
61+
test_key = generate_key_for_node(sacrifice)
62+
@clients[0].call('SET', test_key, 'foobar1')
63+
64+
# Shut the node down.
65+
kill_a_node_and_wait_for_failover(sacrifice)
66+
67+
# When we try and fetch the key, it'll attempt to connect to the broken node, and
68+
# thus trigger a reload of the cluster topology.
69+
assert_equal 'OK', @clients[0].call('SET', test_key, 'foobar2')
70+
end
71+
72+
def test_transaction_retry_on_connection_error
73+
sacrifice = @controller.select_sacrifice_of_primary
74+
# Find a key which lives on the sacrifice node
75+
test_key = generate_key_for_node(sacrifice)
76+
@clients[0].call('SET', test_key, 'foobar1')
77+
78+
call_count = 0
79+
# Begin a transaction, but shut the node down after the WATCH is issued
80+
res = @clients[0].multi(watch: [test_key]) do |tx|
81+
kill_a_node_and_wait_for_failover(sacrifice) if call_count == 0
82+
call_count += 1
83+
tx.call('SET', test_key, 'foobar2')
84+
end
85+
86+
# The transaction should have retried once and successfully completed
87+
# the second time.
88+
assert_equal ['OK'], res
89+
assert_equal 'foobar2', @clients[0].call('GET', test_key)
90+
assert_equal 2, call_count
91+
end
92+
5793
private
5894

5995
def prepare_test_data
@@ -129,6 +165,18 @@ def do_assertions(offset:)
129165
end
130166
end
131167

168+
def generate_key_for_node(conn)
169+
# Figure out a slot on the the sacrifice node, and a key in that slot.
170+
conn_id = conn.call('CLUSTER', 'MYID')
171+
conn_slots = conn.call('CLUSTER', 'SLOTS')
172+
.select { |res| res[2][2] == conn_id }
173+
.flat_map { |res| (res[0]..res[1]).to_a }
174+
loop do
175+
test_key = SecureRandom.hex
176+
return test_key if conn_slots.include?(conn.call('CLUSTER', 'KEYSLOT', test_key))
177+
end
178+
end
179+
132180
def wait_for_replication(client)
133181
client_side_timeout = TEST_TIMEOUT_SEC + 1.0
134182
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
@@ -211,6 +259,23 @@ def retryable(attempts: MAX_ATTEMPTS, wait_sec: WAIT_SEC)
211259
end
212260
end
213261

262+
def kill_a_node_and_wait_for_failover(sacrifice)
263+
other_client = @controller.clients.reject { _1 == sacrifice }.first
264+
sacrifice_id = sacrifice.call('CLUSTER', 'MYID')
265+
kill_a_node(sacrifice)
266+
failover_checks = 0
267+
loop do
268+
raise 'Timed out waiting for failover in kill_a_node_and_wait_for_failover' if failover_checks > 30
269+
270+
# Wait for the sacrifice node to not be a primary according to CLUSTER SLOTS.
271+
cluster_slots = other_client.call('CLUSTER', 'SLOTS')
272+
break unless cluster_slots.any? { _1[2][2] == sacrifice_id }
273+
274+
sleep 1
275+
failover_checks += 1
276+
end
277+
end
278+
214279
def build_client(
215280
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
216281
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],

0 commit comments

Comments
 (0)