Skip to content

Fix two separate but related problems with watch retry handling #338

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ def multi(watch: nil)
return transaction.execute
end

::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot|
::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot, asking|
transaction = ::RedisClient::Cluster::Transaction.new(
@router, @command_builder, node: c, slot: slot
@router, @command_builder, node: c, slot: slot, asking: asking
)
yield transaction
transaction.execute
Expand Down
42 changes: 36 additions & 6 deletions lib/redis_client/cluster/optimistic_locking.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,56 @@ class Cluster
class OptimisticLocking
def initialize(router)
@router = router
@asking = false
end

def watch(keys)
slot = find_slot(keys)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil?

# We have not yet selected a node for this transaction, initially, which means we can handle
# redirections freely initially (i.e. for the first WATCH call)
node = @router.find_primary_node_by_slot(slot)
@router.handle_redirection(node, retry_count: 1) do |nd|
handle_redirection(node, retry_count: 1) do |nd|
nd.with do |c|
c.call('WATCH', *keys)
yield(c, slot)
rescue StandardError
c.call('UNWATCH')
raise
c.ensure_connected_cluster_scoped(retryable: false) do
c.call('ASKING') if @asking
c.call('WATCH', *keys)
begin
yield(c, slot, @asking)
rescue ::RedisClient::ConnectionError
# No need to unwatch on a connection error.
raise
rescue StandardError
c.call('UNWATCH')
raise
end
end
end
end
end

private

def handle_redirection(node, retry_count: 1, &blk)
@router.handle_redirection(node, retry_count: retry_count) do |nd|
handle_asking_once(nd, &blk)
end
end

def handle_asking_once(node)
yield node
rescue ::RedisClient::CommandError => e
raise unless ErrorIdentification.client_owns_error?(e, node)
raise unless e.message.start_with?('ASK')

node = @router.assign_asking_node(e.message)
@asking = true
yield node
ensure
@asking = false
end

def find_slot(keys)
return if keys.empty?
return if keys.any? { |k| k.nil? || k.empty? }
Expand Down
16 changes: 10 additions & 6 deletions lib/redis_client/cluster/transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Transaction
ConsistencyError = Class.new(::RedisClient::Error)
MAX_REDIRECTION = 2

def initialize(router, command_builder, node: nil, slot: nil)
def initialize(router, command_builder, node: nil, slot: nil, asking: false)
@router = router
@command_builder = command_builder
@retryable = true
Expand All @@ -18,6 +18,7 @@ def initialize(router, command_builder, node: nil, slot: nil)
@node = node
prepare_tx unless @node.nil?
@watching_slot = slot
@asking = asking
end

def call(*command, **kwargs, &block)
Expand Down Expand Up @@ -93,7 +94,11 @@ def prepare_tx

def settle
@pipeline.call('EXEC')
send_transaction(@node, redirect: MAX_REDIRECTION)
# If we needed ASKING on the watch, we need ASKING on the multi as well.
@node.call('ASKING') if @asking
# Don't handle redirections at this level if we're in a watch (the watcher handles redirections
# at the whole-transaction level.)
send_transaction(@node, redirect: !!@watching_slot ? 0 : MAX_REDIRECTION)
end

def send_transaction(client, redirect:)
Expand All @@ -110,7 +115,8 @@ def send_pipeline(client, redirect:)
client.middlewares.call_pipelined(commands, client.config) do
connection.call_pipelined(commands, nil)
rescue ::RedisClient::CommandError => e
return handle_command_error!(commands, e, redirect: redirect) unless redirect.zero?
ensure_the_same_slot!(commands)
return handle_command_error!(e, redirect: redirect) unless redirect.zero?

raise
end
Expand Down Expand Up @@ -139,15 +145,13 @@ def coerce_results!(results, offset: 1)
results
end

def handle_command_error!(commands, err, redirect:) # rubocop:disable Metrics/AbcSize
def handle_command_error!(err, redirect:) # rubocop:disable Metrics/AbcSize
if err.message.start_with?('CROSSSLOT')
raise ConsistencyError, "#{err.message}: #{err.command}"
elsif err.message.start_with?('MOVED')
ensure_the_same_slot!(commands)
node = @router.assign_redirection_node(err.message)
send_transaction(node, redirect: redirect - 1)
elsif err.message.start_with?('ASK')
ensure_the_same_slot!(commands)
node = @router.assign_asking_node(err.message)
try_asking(node) ? send_transaction(node, redirect: redirect - 1) : err
else
Expand Down
94 changes: 94 additions & 0 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,100 @@ def test_transaction_with_meaningless_watch
assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2'))
end

def test_transaction_does_not_pointlessly_unwatch_on_success
@client.call('MSET', '{key}1', '0', '{key}2', '0')

@captured_commands.clear
@client.multi(watch: %w[{key}1 {key}2]) do |tx|
tx.call('SET', '{key}1', '1')
tx.call('SET', '{key}2', '2')
end
assert_equal(%w[WATCH MULTI SET SET EXEC], @captured_commands.to_a.map(&:command).map(&:first))
assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2'))
end

def test_transaction_unwatches_on_error
test_error = Class.new(StandardError)

@captured_commands.clear
assert_raises(test_error) do
@client.multi(watch: %w[{key}1 {key}2]) do
raise test_error, 'error!'
end
end

assert_equal(%w[WATCH UNWATCH], @captured_commands.to_a.map(&:command).map(&:first))
end

def test_transaction_does_not_unwatch_on_connection_error
@captured_commands.clear
assert_raises(RedisClient::ConnectionError) do
@client.multi(watch: %w[{key}1 {key}2]) do |tx|
tx.call('SET', '{key}1', 'x')
tx.call('QUIT')
end
end
command_list = @captured_commands.to_a.map(&:command).map(&:first)
assert_includes(command_list, 'WATCH')
refute_includes(command_list, 'UNWATCH')
end

def test_transaction_does_not_retry_without_rewatching
client2 = new_test_client

@client.call('SET', 'key', 'original_value')

assert_raises(RedisClient::ConnectionError) do
@client.multi(watch: %w[key]) do |tx|
# Simulate all the connections closing behind the router's back
# Sending QUIT to redis makes the server side close the connection (and the client
# side thus get a RedisClient::ConnectionError)
node = @client.instance_variable_get(:@router).instance_variable_get(:@node)
node.clients.each do |conn|
conn.with(&:close)
end

# Now the second client sets the value, which should make this watch invalid
client2.call('SET', 'key', 'client2_value')

tx.call('SET', 'key', '@client_value')
# Committing this transaction will fail, not silently reconnect (without the watch!)
end
end

# The transaction did not commit.
assert_equal('client2_value', @client.call('GET', 'key'))
end

def test_transaction_with_watch_retries_block
client2 = new_test_client
call_count = 0

@client.call('SET', 'key', 'original_value')

@client.multi(watch: %w[key]) do |tx|
if call_count == 0
# Simulate all the connections closing behind the router's back
# Sending QUIT to redis makes the server side close the connection (and the client
# side thus get a RedisClient::ConnectionError)
node = @client.instance_variable_get(:@router).instance_variable_get(:@node)
node.clients.each do |conn|
conn.with(&:close)
end

# Now the second client sets the value, which should make this watch invalid
client2.call('SET', 'key', 'client2_value')
end
call_count += 1

tx.call('SET', 'key', "@client_value_#{call_count}")
end

# The transaction did commit (but it was the second time)
assert_equal('@client_value_2', @client.call('GET', 'key'))
assert_equal(2, call_count)
end

def test_transaction_with_error
@client.call('SET', 'key1', 'x')

Expand Down
29 changes: 29 additions & 0 deletions test/test_against_cluster_state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,35 @@ def test_the_state_of_cluster_resharding_with_transaction_and_watch
assert_equal(1, call_cnt)
end

def test_the_state_of_cluster_resharding_with_reexecuted_watch
client2 = new_test_client
call_cnt = 0

@client.call('SET', 'watch_key', 'original_value')
@client.multi(watch: %w[watch_key]) do |tx|
# Use client2 to change the value of watch_key, which would cause this transaction to fail
if call_cnt == 0
client2.call('SET', 'watch_key', 'client2_value')

# Now perform (and _finish_) a reshard, which should make this transaction receive a MOVED
# redirection when it goes to commit. That should result in the entire block being retried
slot = ::RedisClient::Cluster::KeySlotConverter.convert('watch_key')
src, dest = @controller.select_resharding_target(slot)
@controller.start_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
@controller.finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
end
call_cnt += 1

tx.call('SET', 'watch_key', "@client_value_#{call_cnt}")
end
# It should have retried the entire transaction block.
assert_equal(2, call_cnt)
# The second call succeeded
assert_equal('@client_value_2', @client.call('GET', 'watch_key'))
ensure
client2&.close
end

def test_the_state_of_cluster_resharding_with_pipelining_on_new_connection
# This test is excercising a very delicate race condition; i think the use of @client to set
# the keys in do_resharding_test is actually causing the race condition not to happen, so this
Expand Down