Skip to content

Commit 99de3d3

Browse files
author
KJ Tsanaktsidis
committed
Don't attempt redirection on the actual MULTI for WATCH...MULTI
If we have a WATCH, then the MULTI _must_ exec on exactly that node; it should not be allowed for that to be redirected to a different node, because then the MULTI isn't on the same connection as the WATCH anymore! The first part of this patch fixes this by disabling redirection handling in transaction.rb if we are in a watch. I also added a test test_the_state_of_cluster_resharding_with_reexecuted_watch for this. However, the second part of this patch is a lot trickier... This change causes a different test, test_the_state_of_cluster_resharding_with_transaction_and_watch, to break. That test is asserting that - if we watch something where the slot is in the middle of migrating between nodes, - and all the keys we wanted to watch are on the new node, - that the client correctly retries the transaction with ASKING against the new node Disabling redirection handling obviously makes this stop working, and it _is_ possible to handle this case correctly. We need to record whether or not we had to issue an ASKING on the WATCH for the transaction, and if so, pre-emptively issue an ASKING on the MULTI too. That's because this slot is not yet actually assigned to the node we're connected to (it's IMPORTING). It may well not be worth it, and I'm also OK with just failing WATCH/MULTI on slots which are currently migrating. That would imply: - Keeping test_the_state_of_cluster_resharding_with_reexecuted_watch - Deleting test_the_state_of_cluster_resharding_with_transaction_and_watch
1 parent 9546682 commit 99de3d3

File tree

4 files changed

+64
-10
lines changed

4 files changed

+64
-10
lines changed

lib/redis_client/cluster.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ def multi(watch: nil)
9898
return transaction.execute
9999
end
100100

101-
::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot|
101+
::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot, asking|
102102
transaction = ::RedisClient::Cluster::Transaction.new(
103-
@router, @command_builder, node: c, slot: slot
103+
@router, @command_builder, node: c, slot: slot, asking: asking
104104
)
105105
yield transaction
106106
transaction.execute

lib/redis_client/cluster/optimistic_locking.rb

+23-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ class Cluster
88
class OptimisticLocking
99
def initialize(router)
1010
@router = router
11+
@asking = false
1112
end
1213

1314
def watch(keys)
@@ -17,12 +18,13 @@ def watch(keys)
1718
# We have not yet selected a node for this transaction, initially, which means we can handle
1819
# redirections freely initially (i.e. for the first WATCH call)
1920
node = @router.find_primary_node_by_slot(slot)
20-
@router.handle_redirection(node, retry_count: 1) do |nd|
21+
handle_redirection(node, retry_count: 1) do |nd|
2122
nd.with do |c|
2223
c.ensure_connected_cluster_scoped(retryable: false) do
24+
c.call('ASKING') if @asking
2325
c.call('WATCH', *keys)
2426
begin
25-
yield(c, slot)
27+
yield(c, slot, @asking)
2628
rescue ::RedisClient::ConnectionError
2729
# No need to unwatch on a connection error.
2830
raise
@@ -37,6 +39,25 @@ def watch(keys)
3739

3840
private
3941

42+
def handle_redirection(node, retry_count: 1, &blk)
43+
@router.handle_redirection(node, retry_count: retry_count) do |nd|
44+
handle_asking_once(nd, &blk)
45+
end
46+
end
47+
48+
def handle_asking_once(node)
49+
yield node
50+
rescue ::RedisClient::CommandError => e
51+
raise unless ErrorIdentification.client_owns_error?(e, node)
52+
raise unless e.message.start_with?('ASK')
53+
54+
node = @router.assign_asking_node(e.message)
55+
@asking = true
56+
yield node
57+
ensure
58+
@asking = false
59+
end
60+
4061
def find_slot(keys)
4162
return if keys.empty?
4263
return if keys.any? { |k| k.nil? || k.empty? }

lib/redis_client/cluster/transaction.rb

+10-6
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ class Transaction
99
ConsistencyError = Class.new(::RedisClient::Error)
1010
MAX_REDIRECTION = 2
1111

12-
def initialize(router, command_builder, node: nil, slot: nil)
12+
def initialize(router, command_builder, node: nil, slot: nil, asking: false)
1313
@router = router
1414
@command_builder = command_builder
1515
@retryable = true
@@ -18,6 +18,7 @@ def initialize(router, command_builder, node: nil, slot: nil)
1818
@node = node
1919
prepare_tx unless @node.nil?
2020
@watching_slot = slot
21+
@asking = asking
2122
end
2223

2324
def call(*command, **kwargs, &block)
@@ -93,7 +94,11 @@ def prepare_tx
9394

9495
def settle
9596
@pipeline.call('EXEC')
96-
send_transaction(@node, redirect: MAX_REDIRECTION)
97+
# If we needed ASKING on the watch, we need ASKING on the multi as well.
98+
@node.call('ASKING') if @asking
99+
# Don't handle redirections at this level if we're in a watch (the watcher handles redirections
100+
# at the whole-transaction level.)
101+
send_transaction(@node, redirect: !!@watching_slot ? 0 : MAX_REDIRECTION)
97102
end
98103

99104
def send_transaction(client, redirect:)
@@ -110,7 +115,8 @@ def send_pipeline(client, redirect:)
110115
client.middlewares.call_pipelined(commands, client.config) do
111116
connection.call_pipelined(commands, nil)
112117
rescue ::RedisClient::CommandError => e
113-
return handle_command_error!(commands, e, redirect: redirect) unless redirect.zero?
118+
ensure_the_same_slot!(commands)
119+
return handle_command_error!(e, redirect: redirect) unless redirect.zero?
114120

115121
raise
116122
end
@@ -139,15 +145,13 @@ def coerce_results!(results, offset: 1)
139145
results
140146
end
141147

142-
def handle_command_error!(commands, err, redirect:) # rubocop:disable Metrics/AbcSize
148+
def handle_command_error!(err, redirect:) # rubocop:disable Metrics/AbcSize
143149
if err.message.start_with?('CROSSSLOT')
144150
raise ConsistencyError, "#{err.message}: #{err.command}"
145151
elsif err.message.start_with?('MOVED')
146-
ensure_the_same_slot!(commands)
147152
node = @router.assign_redirection_node(err.message)
148153
send_transaction(node, redirect: redirect - 1)
149154
elsif err.message.start_with?('ASK')
150-
ensure_the_same_slot!(commands)
151155
node = @router.assign_asking_node(err.message)
152156
try_asking(node) ? send_transaction(node, redirect: redirect - 1) : err
153157
else

test/test_against_cluster_state.rb

+29
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,35 @@ def test_the_state_of_cluster_resharding_with_transaction_and_watch
103103
assert_equal(1, call_cnt)
104104
end
105105

106+
def test_the_state_of_cluster_resharding_with_reexecuted_watch
107+
client2 = new_test_client
108+
call_cnt = 0
109+
110+
@client.call('SET', 'watch_key', 'original_value')
111+
@client.multi(watch: %w[watch_key]) do |tx|
112+
# Use client2 to change the value of watch_key, which would cause this transaction to fail
113+
if call_cnt == 0
114+
client2.call('SET', 'watch_key', 'client2_value')
115+
116+
# Now perform (and _finish_) a reshard, which should make this transaction receive a MOVED
117+
# redirection when it goes to commit. That should result in the entire block being retried
118+
slot = ::RedisClient::Cluster::KeySlotConverter.convert('watch_key')
119+
src, dest = @controller.select_resharding_target(slot)
120+
@controller.start_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
121+
@controller.finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
122+
end
123+
call_cnt += 1
124+
125+
tx.call('SET', 'watch_key', "@client_value_#{call_cnt}")
126+
end
127+
# It should have retried the entire transaction block.
128+
assert_equal(2, call_cnt)
129+
# The second call succeeded
130+
assert_equal('@client_value_2', @client.call('GET', 'watch_key'))
131+
ensure
132+
client2&.close
133+
end
134+
106135
def test_the_state_of_cluster_resharding_with_pipelining_on_new_connection
107136
# This test is excercising a very delicate race condition; i think the use of @client to set
108137
# the keys in do_resharding_test is actually causing the race condition not to happen, so this

0 commit comments

Comments
 (0)