diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index 02281105..dd397e17 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require 'redis_client/cluster/concurrent_worker' +require 'redis_client/cluster/pinned_connection_delegator' require 'redis_client/cluster/pipeline' require 'redis_client/cluster/pub_sub' require 'redis_client/cluster/router' @@ -93,7 +94,7 @@ def multi(watch: nil, &block) ::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block) end - def with(key: nil, hashtag: nil, write: true, retry_count: 0, &block) + def with(key: nil, hashtag: nil, write: true, retry_count: 0) key = process_with_arguments(key, hashtag) node_key = @router.find_node_key_by_key(key, primary: write) @@ -101,7 +102,13 @@ def with(key: nil, hashtag: nil, write: true, retry_count: 0, &block) # Calling #with checks out the underlying connection if this is a pooled connection # Calling it through #try_delegate ensures we handle any redirections and retry the entire # transaction if so. - @router.try_delegate(node, :with, retry_count: retry_count, &block) + @router.try_delegate(node, :with, retry_count: retry_count) do |raw_client| + if @config.client_side_slot_validation + yield PinnedConnectionDelegator.new(raw_client, locked_key_slot: key, cluster_commands: @router.cluster_commands) + else + yield raw_client + end + end end def pubsub diff --git a/lib/redis_client/cluster/pinned_connection_delegator.rb b/lib/redis_client/cluster/pinned_connection_delegator.rb new file mode 100644 index 00000000..3c547482 --- /dev/null +++ b/lib/redis_client/cluster/pinned_connection_delegator.rb @@ -0,0 +1,80 @@ +# frozen_string_literal: true + +require 'delegate' +class RedisClient + class Cluster + class PinnedConnectionDelegator < SimpleDelegator + METHOD_WRAPPERS = { + # These methods return a Pipeline or Multi to the yielded block which itself + # requires wrapping. + %i[pipelined multi] => lambda do |*args, **kwargs, &block| + super(*args, **kwargs, &wrap_yielded_object(block)) + end, + # These are various commanding methods, each of which has a different signature + # which we need to understand to extract the commands properly + %i[call call_once] => lambda do |*command, **kwargs, &block| + validate_slot! command + super(*command, **kwargs, &block) + end, + %i[call_v call_once_v] => lambda do |command, &block| + validate_slot! command + super command, &block + end, + %i[blocking_call] => lambda do |timeout, *command, **kwargs, &block| + validate_slot! command + super timeout, *command, **kwargs, &block + end, + %i[blocking_call_v] => lambda do |timeout, command, &block| + validate_slot! command + super timeout, command, &block + end + }.freeze + + def initialize(connection, locked_key_slot:, cluster_commands:) + @locked_key_slot = locked_key_slot + @cluster_commands = cluster_commands + super(connection) + + # Set up the slot validation for the set of methods which this object + # implements. Note that we might be wrapping a RedisClient, RedisClient::Multi, + # or RedisClient::Pipeline object, and they do not all respond to the same set of + # methods. Hence, we need to dynamically detect what we're wrapping and define + # the correct methods. + METHOD_WRAPPERS.each do |methods, impl| + methods.each do |method| + define_singleton_method(method, &impl) if respond_to?(method) + end + end + end + + private + + def validate_slot!(command) + keys = @cluster_commands.extract_all_keys(command) + key_slots = keys.map { |k| ::RedisClient::Cluster::KeySlotConverter.convert(k) } + locked_slot = ::RedisClient::Cluster::KeySlotConverter.convert(@locked_key_slot) + + return if key_slots.all? { |slot| slot == locked_slot } + + key_slot_pairs = keys.zip(key_slots).map { |key, slot| "#{key} => #{slot}" }.join(', ') + raise ::RedisClient::Cluster::Transaction::ConsistencyError, <<~MESSAGE + Connection is pinned to slot #{locked_slot} (via key #{@locked_key_slot}). \ + However, command #{command.inspect} has keys hashing to slots #{key_slot_pairs}. \ + Transactions in redis cluster must only refer to keys hashing to the same slot. + MESSAGE + end + + # This method ensures that calls to #pipelined, #multi, etc wrap yielded redis-callable objects + # back in the pinning delegator + def wrap_yielded_object(orig_block) + return nil if orig_block.nil? + + proc do |clientish| + orig_block.call( + self.class.new(clientish, locked_key_slot: @locked_key_slot, cluster_commands: @cluster_commands) + ) + end + end + end + end +end diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index 7fa37d15..4472a6de 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -188,6 +188,10 @@ def command_exists?(name) @command.exists?(name) end + def cluster_commands + @command + end + def assign_redirection_node(err_msg) _, slot, node_key = err_msg.split slot = slot.to_i diff --git a/lib/redis_client/cluster_config.rb b/lib/redis_client/cluster_config.rb index 78455b49..b5fe0c6c 100644 --- a/lib/redis_client/cluster_config.rb +++ b/lib/redis_client/cluster_config.rb @@ -24,9 +24,9 @@ class ClusterConfig InvalidClientConfigError = Class.new(::RedisClient::Error) attr_reader :command_builder, :client_config, :replica_affinity, :slow_command_timeout, - :connect_with_original_config, :startup_nodes + :connect_with_original_config, :startup_nodes, :client_side_slot_validation - def initialize( + def initialize( # rubocop:disable Metrics/ParameterLists nodes: DEFAULT_NODES, replica: false, replica_affinity: :random, @@ -36,6 +36,7 @@ def initialize( client_implementation: ::RedisClient::Cluster, # for redis gem slow_command_timeout: SLOW_COMMAND_TIMEOUT, command_builder: ::RedisClient::CommandBuilder, + client_side_slot_validation: true, **client_config ) @@ -51,6 +52,7 @@ def initialize( @connect_with_original_config = connect_with_original_config @client_implementation = client_implementation @slow_command_timeout = slow_command_timeout + @client_side_slot_validation = client_side_slot_validation end def inspect diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 433a9af6..b771c997 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -533,8 +533,6 @@ def test_pinning_two_keys end def test_pinning_cross_slot - skip 'This is not implemented yet!' - assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do @client.with(hashtag: 'slot1') do |conn| conn.call('GET', '{slot2}') @@ -542,6 +540,23 @@ def test_pinning_cross_slot end end + def test_pinning_cross_slot_without_validation + client = new_test_client(client_side_slot_validation: false) + assert_raises(::RedisClient::CommandError) do + client.with(hashtag: 'slot1') do |conn| + conn.call('GET', '{slot2}') + end + end + end + + def test_pinning_blocking_call + got = @client.with(key: 'key1') do |conn| + conn.blocking_call(1, 'SET', 'key1', 'hello') + conn.blocking_call_v(1, %w[GET key1]) + end + assert_equal('hello', got) + end + def test_pinning_hashtag_with_braces got = @client.with(hashtag: '{slot}') do |conn| conn.call('SET', '{slot}key1', 'v1') @@ -565,6 +580,16 @@ def test_pinning_pipeline assert_equal(3, got) end + def test_pinning_pipeline_cross_slot + assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do + @client.with(hashtag: 'slot1') do |conn| + conn.pipelined do |pipe| + pipe.call('GET', '{slot2}foo') + end + end + end + end + def test_pinning_pipeline_with_error assert_raises(RedisClient::CommandError) do @client.with(hashtag: 'slot') do |conn| @@ -591,6 +616,30 @@ def test_pinning_transaction assert_equal(%w[OK OK], got) end + def test_pinning_transaction_cross_slot + assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do + @client.with(hashtag: 'slot1') do |conn| + conn.multi do |txn| + txn.call('GET', '{slot2}foo') + end + end + end + end + + def test_pinning_multi_does_not_respond_to_blocking + @client.with(hashtag: 'slot1') do |conn| + assert_respond_to conn, :blocking_call_v + conn.multi do |txn| + refute_respond_to txn.__getobj__, :blocking_call_v + refute_respond_to txn, :blocking_call_v + # i.e. does not raise Transaction::ConsistencyError + assert_raises(NoMethodError) do + txn.blocking_call_v('SET', '{slot}key1', 'value1') + end + end + end + end + def test_pinning_transaction_watch_arg @client.call('MSET', '{slot}key1', 'val1', '{slot}key2', 'val2') @captured_commands.clear @@ -669,6 +718,19 @@ def test_pinning_transaction_can_unwatch_manually assert_equal('OK', got) end + def test_pinning_transaction_can_coerce_with_block + @client.call('MSET', '{slot}key1', 1, '{slot}key2', 2) + got = @client.with(hashtag: 'slot') do |conn| + conn.multi do |txn| + # This tests a RedisClient behaviour where blocks can be used to modify the return results + # of multi/pipeliend blocks + txn.call('GET', '{slot}key1') { |res| res.to_i + 5 } + txn.call_v(['GET', '{slot}key2']) { |res| res.to_i + 2 } + end + end + assert_equal([6, 4], got) + end + def test_pinning_timeouts_update_topology # Create a new test client with a lower timeout for this test so it's fast. captured_commands = CommandCaptureMiddleware::CommandBuffer.new