Skip to content

Commit 7eff2f2

Browse files
authored
Fix two separate but related problems with watch retry handling (#338)
2 parents 3230791 + 99de3d3 commit 7eff2f2

File tree

5 files changed

+171
-14
lines changed

5 files changed

+171
-14
lines changed

lib/redis_client/cluster.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ def multi(watch: nil)
9898
return transaction.execute
9999
end
100100

101-
::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot|
101+
::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot, asking|
102102
transaction = ::RedisClient::Cluster::Transaction.new(
103-
@router, @command_builder, node: c, slot: slot
103+
@router, @command_builder, node: c, slot: slot, asking: asking
104104
)
105105
yield transaction
106106
transaction.execute

lib/redis_client/cluster/optimistic_locking.rb

+36-6
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,56 @@ class Cluster
88
class OptimisticLocking
99
def initialize(router)
1010
@router = router
11+
@asking = false
1112
end
1213

1314
def watch(keys)
1415
slot = find_slot(keys)
1516
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil?
1617

18+
# We have not yet selected a node for this transaction, initially, which means we can handle
19+
# redirections freely initially (i.e. for the first WATCH call)
1720
node = @router.find_primary_node_by_slot(slot)
18-
@router.handle_redirection(node, retry_count: 1) do |nd|
21+
handle_redirection(node, retry_count: 1) do |nd|
1922
nd.with do |c|
20-
c.call('WATCH', *keys)
21-
yield(c, slot)
22-
rescue StandardError
23-
c.call('UNWATCH')
24-
raise
23+
c.ensure_connected_cluster_scoped(retryable: false) do
24+
c.call('ASKING') if @asking
25+
c.call('WATCH', *keys)
26+
begin
27+
yield(c, slot, @asking)
28+
rescue ::RedisClient::ConnectionError
29+
# No need to unwatch on a connection error.
30+
raise
31+
rescue StandardError
32+
c.call('UNWATCH')
33+
raise
34+
end
35+
end
2536
end
2637
end
2738
end
2839

2940
private
3041

42+
def handle_redirection(node, retry_count: 1, &blk)
43+
@router.handle_redirection(node, retry_count: retry_count) do |nd|
44+
handle_asking_once(nd, &blk)
45+
end
46+
end
47+
48+
def handle_asking_once(node)
49+
yield node
50+
rescue ::RedisClient::CommandError => e
51+
raise unless ErrorIdentification.client_owns_error?(e, node)
52+
raise unless e.message.start_with?('ASK')
53+
54+
node = @router.assign_asking_node(e.message)
55+
@asking = true
56+
yield node
57+
ensure
58+
@asking = false
59+
end
60+
3161
def find_slot(keys)
3262
return if keys.empty?
3363
return if keys.any? { |k| k.nil? || k.empty? }

lib/redis_client/cluster/transaction.rb

+10-6
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ class Transaction
99
ConsistencyError = Class.new(::RedisClient::Error)
1010
MAX_REDIRECTION = 2
1111

12-
def initialize(router, command_builder, node: nil, slot: nil)
12+
def initialize(router, command_builder, node: nil, slot: nil, asking: false)
1313
@router = router
1414
@command_builder = command_builder
1515
@retryable = true
@@ -18,6 +18,7 @@ def initialize(router, command_builder, node: nil, slot: nil)
1818
@node = node
1919
prepare_tx unless @node.nil?
2020
@watching_slot = slot
21+
@asking = asking
2122
end
2223

2324
def call(*command, **kwargs, &block)
@@ -93,7 +94,11 @@ def prepare_tx
9394

9495
def settle
9596
@pipeline.call('EXEC')
96-
send_transaction(@node, redirect: MAX_REDIRECTION)
97+
# If we needed ASKING on the watch, we need ASKING on the multi as well.
98+
@node.call('ASKING') if @asking
99+
# Don't handle redirections at this level if we're in a watch (the watcher handles redirections
100+
# at the whole-transaction level.)
101+
send_transaction(@node, redirect: !!@watching_slot ? 0 : MAX_REDIRECTION)
97102
end
98103

99104
def send_transaction(client, redirect:)
@@ -110,7 +115,8 @@ def send_pipeline(client, redirect:)
110115
client.middlewares.call_pipelined(commands, client.config) do
111116
connection.call_pipelined(commands, nil)
112117
rescue ::RedisClient::CommandError => e
113-
return handle_command_error!(commands, e, redirect: redirect) unless redirect.zero?
118+
ensure_the_same_slot!(commands)
119+
return handle_command_error!(e, redirect: redirect) unless redirect.zero?
114120

115121
raise
116122
end
@@ -139,15 +145,13 @@ def coerce_results!(results, offset: 1)
139145
results
140146
end
141147

142-
def handle_command_error!(commands, err, redirect:) # rubocop:disable Metrics/AbcSize
148+
def handle_command_error!(err, redirect:) # rubocop:disable Metrics/AbcSize
143149
if err.message.start_with?('CROSSSLOT')
144150
raise ConsistencyError, "#{err.message}: #{err.command}"
145151
elsif err.message.start_with?('MOVED')
146-
ensure_the_same_slot!(commands)
147152
node = @router.assign_redirection_node(err.message)
148153
send_transaction(node, redirect: redirect - 1)
149154
elsif err.message.start_with?('ASK')
150-
ensure_the_same_slot!(commands)
151155
node = @router.assign_asking_node(err.message)
152156
try_asking(node) ? send_transaction(node, redirect: redirect - 1) : err
153157
else

test/redis_client/test_cluster.rb

+94
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,100 @@ def test_transaction_with_meaningless_watch
295295
assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2'))
296296
end
297297

298+
def test_transaction_does_not_pointlessly_unwatch_on_success
299+
@client.call('MSET', '{key}1', '0', '{key}2', '0')
300+
301+
@captured_commands.clear
302+
@client.multi(watch: %w[{key}1 {key}2]) do |tx|
303+
tx.call('SET', '{key}1', '1')
304+
tx.call('SET', '{key}2', '2')
305+
end
306+
assert_equal(%w[WATCH MULTI SET SET EXEC], @captured_commands.to_a.map(&:command).map(&:first))
307+
assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2'))
308+
end
309+
310+
def test_transaction_unwatches_on_error
311+
test_error = Class.new(StandardError)
312+
313+
@captured_commands.clear
314+
assert_raises(test_error) do
315+
@client.multi(watch: %w[{key}1 {key}2]) do
316+
raise test_error, 'error!'
317+
end
318+
end
319+
320+
assert_equal(%w[WATCH UNWATCH], @captured_commands.to_a.map(&:command).map(&:first))
321+
end
322+
323+
def test_transaction_does_not_unwatch_on_connection_error
324+
@captured_commands.clear
325+
assert_raises(RedisClient::ConnectionError) do
326+
@client.multi(watch: %w[{key}1 {key}2]) do |tx|
327+
tx.call('SET', '{key}1', 'x')
328+
tx.call('QUIT')
329+
end
330+
end
331+
command_list = @captured_commands.to_a.map(&:command).map(&:first)
332+
assert_includes(command_list, 'WATCH')
333+
refute_includes(command_list, 'UNWATCH')
334+
end
335+
336+
def test_transaction_does_not_retry_without_rewatching
337+
client2 = new_test_client
338+
339+
@client.call('SET', 'key', 'original_value')
340+
341+
assert_raises(RedisClient::ConnectionError) do
342+
@client.multi(watch: %w[key]) do |tx|
343+
# Simulate all the connections closing behind the router's back
344+
# Sending QUIT to redis makes the server side close the connection (and the client
345+
# side thus get a RedisClient::ConnectionError)
346+
node = @client.instance_variable_get(:@router).instance_variable_get(:@node)
347+
node.clients.each do |conn|
348+
conn.with(&:close)
349+
end
350+
351+
# Now the second client sets the value, which should make this watch invalid
352+
client2.call('SET', 'key', 'client2_value')
353+
354+
tx.call('SET', 'key', '@client_value')
355+
# Committing this transaction will fail, not silently reconnect (without the watch!)
356+
end
357+
end
358+
359+
# The transaction did not commit.
360+
assert_equal('client2_value', @client.call('GET', 'key'))
361+
end
362+
363+
def test_transaction_with_watch_retries_block
364+
client2 = new_test_client
365+
call_count = 0
366+
367+
@client.call('SET', 'key', 'original_value')
368+
369+
@client.multi(watch: %w[key]) do |tx|
370+
if call_count == 0
371+
# Simulate all the connections closing behind the router's back
372+
# Sending QUIT to redis makes the server side close the connection (and the client
373+
# side thus get a RedisClient::ConnectionError)
374+
node = @client.instance_variable_get(:@router).instance_variable_get(:@node)
375+
node.clients.each do |conn|
376+
conn.with(&:close)
377+
end
378+
379+
# Now the second client sets the value, which should make this watch invalid
380+
client2.call('SET', 'key', 'client2_value')
381+
end
382+
call_count += 1
383+
384+
tx.call('SET', 'key', "@client_value_#{call_count}")
385+
end
386+
387+
# The transaction did commit (but it was the second time)
388+
assert_equal('@client_value_2', @client.call('GET', 'key'))
389+
assert_equal(2, call_count)
390+
end
391+
298392
def test_transaction_with_error
299393
@client.call('SET', 'key1', 'x')
300394

test/test_against_cluster_state.rb

+29
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,35 @@ def test_the_state_of_cluster_resharding_with_transaction_and_watch
103103
assert_equal(1, call_cnt)
104104
end
105105

106+
def test_the_state_of_cluster_resharding_with_reexecuted_watch
107+
client2 = new_test_client
108+
call_cnt = 0
109+
110+
@client.call('SET', 'watch_key', 'original_value')
111+
@client.multi(watch: %w[watch_key]) do |tx|
112+
# Use client2 to change the value of watch_key, which would cause this transaction to fail
113+
if call_cnt == 0
114+
client2.call('SET', 'watch_key', 'client2_value')
115+
116+
# Now perform (and _finish_) a reshard, which should make this transaction receive a MOVED
117+
# redirection when it goes to commit. That should result in the entire block being retried
118+
slot = ::RedisClient::Cluster::KeySlotConverter.convert('watch_key')
119+
src, dest = @controller.select_resharding_target(slot)
120+
@controller.start_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
121+
@controller.finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
122+
end
123+
call_cnt += 1
124+
125+
tx.call('SET', 'watch_key', "@client_value_#{call_cnt}")
126+
end
127+
# It should have retried the entire transaction block.
128+
assert_equal(2, call_cnt)
129+
# The second call succeeded
130+
assert_equal('@client_value_2', @client.call('GET', 'watch_key'))
131+
ensure
132+
client2&.close
133+
end
134+
106135
def test_the_state_of_cluster_resharding_with_pipelining_on_new_connection
107136
# This test is excercising a very delicate race condition; i think the use of @client to set
108137
# the keys in do_resharding_test is actually causing the race condition not to happen, so this

0 commit comments

Comments
 (0)