Skip to content

Validate keys when a raw connection is checked out with #with #317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require 'redis_client/cluster/concurrent_worker'
require 'redis_client/cluster/pinned_connection_delegator'
require 'redis_client/cluster/pipeline'
require 'redis_client/cluster/pub_sub'
require 'redis_client/cluster/router'
Expand Down Expand Up @@ -93,15 +94,21 @@ def multi(watch: nil, &block)
::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block)
end

def with(key: nil, hashtag: nil, write: true, retry_count: 0, &block)
def with(key: nil, hashtag: nil, write: true, retry_count: 0)
key = process_with_arguments(key, hashtag)

node_key = @router.find_node_key_by_key(key, primary: write)
node = @router.find_node(node_key)
# Calling #with checks out the underlying connection if this is a pooled connection
# Calling it through #try_delegate ensures we handle any redirections and retry the entire
# transaction if so.
@router.try_delegate(node, :with, retry_count: retry_count, &block)
@router.try_delegate(node, :with, retry_count: retry_count) do |raw_client|
if @config.client_side_slot_validation
yield PinnedConnectionDelegator.new(raw_client, locked_key_slot: key, cluster_commands: @router.cluster_commands)
else
yield raw_client
end
end
end

def pubsub
Expand Down
80 changes: 80 additions & 0 deletions lib/redis_client/cluster/pinned_connection_delegator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# frozen_string_literal: true

require 'delegate'
class RedisClient
class Cluster
class PinnedConnectionDelegator < SimpleDelegator
METHOD_WRAPPERS = {
# These methods return a Pipeline or Multi to the yielded block which itself
# requires wrapping.
%i[pipelined multi] => lambda do |*args, **kwargs, &block|
super(*args, **kwargs, &wrap_yielded_object(block))
end,
# These are various commanding methods, each of which has a different signature
# which we need to understand to extract the commands properly
%i[call call_once] => lambda do |*command, **kwargs, &block|
validate_slot! command
super(*command, **kwargs, &block)
end,
%i[call_v call_once_v] => lambda do |command, &block|
validate_slot! command
super command, &block
end,
%i[blocking_call] => lambda do |timeout, *command, **kwargs, &block|
validate_slot! command
super timeout, *command, **kwargs, &block
end,
%i[blocking_call_v] => lambda do |timeout, command, &block|
validate_slot! command
super timeout, command, &block
end
}.freeze

def initialize(connection, locked_key_slot:, cluster_commands:)
@locked_key_slot = locked_key_slot
@cluster_commands = cluster_commands
super(connection)

# Set up the slot validation for the set of methods which this object
# implements. Note that we might be wrapping a RedisClient, RedisClient::Multi,
# or RedisClient::Pipeline object, and they do not all respond to the same set of
# methods. Hence, we need to dynamically detect what we're wrapping and define
# the correct methods.
METHOD_WRAPPERS.each do |methods, impl|
methods.each do |method|
define_singleton_method(method, &impl) if respond_to?(method)
end
end
end

private

def validate_slot!(command)
keys = @cluster_commands.extract_all_keys(command)
key_slots = keys.map { |k| ::RedisClient::Cluster::KeySlotConverter.convert(k) }
locked_slot = ::RedisClient::Cluster::KeySlotConverter.convert(@locked_key_slot)

return if key_slots.all? { |slot| slot == locked_slot }

key_slot_pairs = keys.zip(key_slots).map { |key, slot| "#{key} => #{slot}" }.join(', ')
raise ::RedisClient::Cluster::Transaction::ConsistencyError, <<~MESSAGE
Connection is pinned to slot #{locked_slot} (via key #{@locked_key_slot}). \
However, command #{command.inspect} has keys hashing to slots #{key_slot_pairs}. \
Transactions in redis cluster must only refer to keys hashing to the same slot.
MESSAGE
end

# This method ensures that calls to #pipelined, #multi, etc wrap yielded redis-callable objects
# back in the pinning delegator
def wrap_yielded_object(orig_block)
return nil if orig_block.nil?

proc do |clientish|
orig_block.call(
self.class.new(clientish, locked_key_slot: @locked_key_slot, cluster_commands: @cluster_commands)
)
end
end
end
end
end
4 changes: 4 additions & 0 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ def command_exists?(name)
@command.exists?(name)
end

def cluster_commands
@command
end

def assign_redirection_node(err_msg)
_, slot, node_key = err_msg.split
slot = slot.to_i
Expand Down
6 changes: 4 additions & 2 deletions lib/redis_client/cluster_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ class ClusterConfig
InvalidClientConfigError = Class.new(::RedisClient::Error)

attr_reader :command_builder, :client_config, :replica_affinity, :slow_command_timeout,
:connect_with_original_config, :startup_nodes
:connect_with_original_config, :startup_nodes, :client_side_slot_validation

def initialize(
def initialize( # rubocop:disable Metrics/ParameterLists
nodes: DEFAULT_NODES,
replica: false,
replica_affinity: :random,
Expand All @@ -36,6 +36,7 @@ def initialize(
client_implementation: ::RedisClient::Cluster, # for redis gem
slow_command_timeout: SLOW_COMMAND_TIMEOUT,
command_builder: ::RedisClient::CommandBuilder,
client_side_slot_validation: true,
**client_config
)

Expand All @@ -51,6 +52,7 @@ def initialize(
@connect_with_original_config = connect_with_original_config
@client_implementation = client_implementation
@slow_command_timeout = slow_command_timeout
@client_side_slot_validation = client_side_slot_validation
end

def inspect
Expand Down
66 changes: 64 additions & 2 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -533,15 +533,30 @@ def test_pinning_two_keys
end

def test_pinning_cross_slot
skip 'This is not implemented yet!'

assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
@client.with(hashtag: 'slot1') do |conn|
conn.call('GET', '{slot2}')
end
end
end

def test_pinning_cross_slot_without_validation
client = new_test_client(client_side_slot_validation: false)
assert_raises(::RedisClient::CommandError) do
client.with(hashtag: 'slot1') do |conn|
conn.call('GET', '{slot2}')
end
end
end

def test_pinning_blocking_call
got = @client.with(key: 'key1') do |conn|
conn.blocking_call(1, 'SET', 'key1', 'hello')
conn.blocking_call_v(1, %w[GET key1])
end
assert_equal('hello', got)
end

def test_pinning_hashtag_with_braces
got = @client.with(hashtag: '{slot}') do |conn|
conn.call('SET', '{slot}key1', 'v1')
Expand All @@ -565,6 +580,16 @@ def test_pinning_pipeline
assert_equal(3, got)
end

def test_pinning_pipeline_cross_slot
assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
@client.with(hashtag: 'slot1') do |conn|
conn.pipelined do |pipe|
pipe.call('GET', '{slot2}foo')
end
end
end
end

def test_pinning_pipeline_with_error
assert_raises(RedisClient::CommandError) do
@client.with(hashtag: 'slot') do |conn|
Expand All @@ -591,6 +616,30 @@ def test_pinning_transaction
assert_equal(%w[OK OK], got)
end

def test_pinning_transaction_cross_slot
assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
@client.with(hashtag: 'slot1') do |conn|
conn.multi do |txn|
txn.call('GET', '{slot2}foo')
end
end
end
end

def test_pinning_multi_does_not_respond_to_blocking
@client.with(hashtag: 'slot1') do |conn|
assert_respond_to conn, :blocking_call_v
conn.multi do |txn|
refute_respond_to txn.__getobj__, :blocking_call_v
refute_respond_to txn, :blocking_call_v
# i.e. does not raise Transaction::ConsistencyError
assert_raises(NoMethodError) do
txn.blocking_call_v('SET', '{slot}key1', 'value1')
end
end
end
end

def test_pinning_transaction_watch_arg
@client.call('MSET', '{slot}key1', 'val1', '{slot}key2', 'val2')
@captured_commands.clear
Expand Down Expand Up @@ -669,6 +718,19 @@ def test_pinning_transaction_can_unwatch_manually
assert_equal('OK', got)
end

def test_pinning_transaction_can_coerce_with_block
@client.call('MSET', '{slot}key1', 1, '{slot}key2', 2)
got = @client.with(hashtag: 'slot') do |conn|
conn.multi do |txn|
# This tests a RedisClient behaviour where blocks can be used to modify the return results
# of multi/pipeliend blocks
txn.call('GET', '{slot}key1') { |res| res.to_i + 5 }
txn.call_v(['GET', '{slot}key2']) { |res| res.to_i + 2 }
end
end
assert_equal([6, 4], got)
end

def test_pinning_timeouts_update_topology
# Create a new test client with a lower timeout for this test so it's fast.
captured_commands = CommandCaptureMiddleware::CommandBuffer.new
Expand Down