Skip to content

Validate key slots used with RedisCluster::Client#with #314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,19 @@ def multi(watch: nil, &block)
::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block)
end

def with(key: nil, hashtag: nil, write: true, retry_count: 0, &block)
def with(key: nil, hashtag: nil, write: true, retry_count: 0)
key = process_with_arguments(key, hashtag)

node_key = @router.find_node_key_by_key(key, primary: write)
node = @router.find_node(node_key)
# Calling #with checks out the underlying connection if this is a pooled connection
# Calling it through #try_delegate ensures we handle any redirections and retry the entire
# transaction if so.
@router.try_delegate(node, :with, retry_count: retry_count, &block)
@router.try_delegate(node, :with, retry_count: retry_count) do |conn|
conn.locked_to_key_slot(key) do
yield conn
end
end
end

def pubsub
Expand Down
70 changes: 35 additions & 35 deletions lib/redis_client/cluster/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,47 +22,33 @@ class Command
keyword_init: true
)

class << self
def load(nodes, slow_command_timeout: -1)
cmd = errors = nil

nodes&.each do |node|
regular_timeout = node.read_timeout
node.read_timeout = slow_command_timeout > 0.0 ? slow_command_timeout : regular_timeout
reply = node.call('COMMAND')
node.read_timeout = regular_timeout
commands = parse_command_reply(reply)
cmd = ::RedisClient::Cluster::Command.new(commands)
break
rescue ::RedisClient::Error => e
errors ||= []
errors << e
end

return cmd unless cmd.nil?
def initialize
@commands = EMPTY_HASH
end

raise ::RedisClient::Cluster::InitialSetupError, errors
# n.b. this is not thread safe; it's called under a lock in Node#reload! though.
def load!(nodes, slow_command_timeout: -1)
commands = errors = nil

nodes&.each do |node|
regular_timeout = node.read_timeout
node.read_timeout = slow_command_timeout > 0.0 ? slow_command_timeout : regular_timeout
reply = node.call('COMMAND')
node.read_timeout = regular_timeout
commands = parse_command_reply(reply)
break
rescue ::RedisClient::Error => e
errors ||= []
errors << e
end

private

def parse_command_reply(rows)
rows&.each_with_object({}) do |row, acc|
next if row[0].nil?
raise ::RedisClient::Cluster::InitialSetupError, errors unless commands

acc[row[0].downcase] = ::RedisClient::Cluster::Command::Detail.new(
first_key_position: row[3],
last_key_position: row[4],
key_step: row[5],
write?: row[2].include?('write'),
readonly?: row[2].include?('readonly')
)
end.freeze || EMPTY_HASH
end
@commands = commands
end

def initialize(commands)
@commands = commands || EMPTY_HASH
def loaded?
@commands.any?
end

def extract_first_key(command)
Expand Down Expand Up @@ -153,6 +139,20 @@ def determine_optional_key_position(command, option_name) # rubocop:disable Metr
idx = command&.flatten&.map(&:to_s)&.map(&:downcase)&.index(option_name&.downcase)
idx.nil? ? 0 : idx + 1
end

def parse_command_reply(rows)
rows&.each_with_object({}) do |row, acc|
next if row[0].nil?

acc[row[0].downcase] = ::RedisClient::Cluster::Command::Detail.new(
first_key_position: row[3],
last_key_position: row[4],
key_step: row[5],
write?: row[2].include?('write'),
readonly?: row[2].include?('readonly')
)
end.freeze || EMPTY_HASH
end
end
end
end
31 changes: 27 additions & 4 deletions lib/redis_client/cluster/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require 'redis_client/cluster/node/random_replica'
require 'redis_client/cluster/node/random_replica_or_primary'
require 'redis_client/cluster/node/latency_replica'
require 'redis_client/cluster/pinning'

class RedisClient
class Cluster
Expand Down Expand Up @@ -78,14 +79,25 @@ def []=(index, element)
end
end

class SingleNodeRedisClient < ::RedisClient
include Pinning::ClientMixin
Copy link
Member

@supercaracal supercaracal Feb 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the following complexity as you said is derived from this mixin and middleware pattern.

  • Config instance holds the command information.
  • Our cluster client uses our own client implementation.
  • Pinning module holds the state of the locking.

I think it would be better to use a simple wrapper object for the slot validation.

class RedisClient
  class Cluster
    class PinningClient
      def initialize(client, key, command, command_builder)
        @client = client
        @slot = ::RedisClient::Cluster::KeySlotConverter.convert(key)
        @command = command
        @command_builder = command_builder
      end

      def call(*args, **kwargs, &block)
        command = @command_builder.generate(args, kwargs)
        call_v(command, &block)
      end

      def call_v(command, &block)
        validate_slot!(command)
        @client.call_v(command, &block)
      end

      # etc...

      private

      def validate_slot!(command)
        # raise an error if command is invalid
      end
    end
  end
end
class RedisClient
  class Cluster
    def with(key: nil, hashtag: nil, write: true, retry_count: 0)
      key = process_with_arguments(key, hashtag)
      node_key = @router.find_node_key_by_key(key, primary: write)
      node = @router.find_node(node_key)
      @router.try_delegate(node, :with, retry_count: retry_count) do |conn|
        yield PinningClient.new(conn, key, @command_builder, @router.command)
      end
    end
  end
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do this, but it's a little awkward because of the need to re-wrap the client if pipelined or multi is called. It ends up looking like this: https://github.com/redis-rb/redis-cluster-client/pull/298/files#diff-426e610bbc89f748ae5f9d43790b07d10fed426fa431d6bea29b67f649a7eb72R1 which i remember we were not very happy with.

Most of the complexity here is coming from the fact that we have to change the lifecycle of the RedisClient::Cluster::Command object so that it's available for the pinning middleware. If we made our custom RedisClient subclass support some kind of temporary "validation procs" itself, then the RedisCluster::Client instance can construct the validation proc only when #with is called and @commands is already known.

I think that will be a lot simpler - let me try that first.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can omit #pipelined and #multi in #with method. There is no user to use in the odd way. We can't never care about all use cases completely. Our users should have responsible to use it method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely need #multi at least - the whole point is to enable single shard transactions!

I managed to write a much better implementation of the delegator in #298 today though - expect to see another PR tomorrow!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the #multi methods is holded by the cluster client.

@cluster_client = build_cluster_client

@cluster_client.with(...) do |raw_or_pinning_cli|
  raw_or_pinning_cli.call('CMD')

  @cluster_client.multi(...) do |raw_or_pinning_cli|
    # do something
  end
end

Anyway, I think we shouldn't do overengineering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, my plan was to delete the #multi implementation in RedisClient::Cluster (or, at least, re-implement it in terms of RedisClient::Cluster#with.

My understanding was that the original idea of adding this kind of “pinning” was to avoid needing to implement transaction support in RedisClient::Cluster at all - instead users can issue transactions directly against the correct node selected by #with

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering your decision. In Redis cluster, I think users should use the transaction feature as simple because CAP theory. I've thought the #with method is just an optional way to be able to use the transaction feature with some complecated ways in cluster mode. I've thought it's not the main interface for the transaction feature. It's a bit different from the redis-client. Please give me time to think it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember, the original problem I was solving was that RedisClient::Cluster#multi currently calls the transaction block twice - once itself to figure out what node to send the transaction to, and then RedisClient#multi calls it again to actually build up the transaction to execute. This interface is also different from what's in RedisClient!

From my pretty extensive survey of the options here over the last few months, there's a pretty fundamental tradeoff to make here:

  • If you want to implement transaction support inside redis-cluster-client in terms of RedisClient#multi, you must know the node ahead of time. You can't lazily call RedisClient#multi once you see the first command in the transaction, because you've already yielded control back to the caller, and can't give the caller the real RedisClient::Multi object without yielding again (which was the original bug I was trying to solve!).
  • If you absolutely want RedisClient::Cluster#multi to work without knowing the node ahead of time (and thus, make it fully compatible), the only choice is to implement a kind of 'lazy transaction' system like I did in Support watch & unwatch properly #295. That is, you wait for the first command in the transaction, then issue MULTI on the appropriate node (and, thus, the redis-cluster-client must also take responsibility for calling EXEC/DISCARD/UNWATCH when appropriate too). This will work when issuing client.call('MULTI') ... client.call('EXEC'), which is valid too with RedisClient.

My understanding is that we settled on the #with interface as the best way to implement "knowing the node ahead of time", because you can also make the same code still work with RedisClient (since RedisClient#with does nothing - like I wrote in the docs here:

Because `RedisClient` from the redis-client gem implements `#with` as simply `yield self` and ignores all of its
arguments, it's possible to write code which is compatible with both redis-client and redis-cluster-client; the `#with`
call will pin the connection to a slot when using clustering, or be a no-op when not.
).

If we want to make RedisClient::Cluster#multi keep working how it is now, for backwards compatibility, we can keep its current implementation. However going forward I think we should be recommending people to use RedisClient::Cluter#with since it doesn't have the calls-block-twice behaviour.

For compatability with RedisClient, an approach like #295 would be better - it would let us behave exactly like a drop in replacement for RedisClient.

end

class Config < ::RedisClient::Config
def initialize(scale_read: false, middlewares: nil, **kwargs)
def initialize(cluster_commands:, scale_read: false, middlewares: nil, **kwargs)
@scale_read = scale_read
@cluster_commands = cluster_commands
middlewares ||= []
middlewares.unshift Pinning::ClientMiddleware
middlewares.unshift ErrorIdentification::Middleware
super(middlewares: middlewares, **kwargs)
super(
middlewares: middlewares,
client_implementation: SingleNodeRedisClient,
**kwargs)
end

attr_reader :cluster_commands
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between cluster_commands and commands?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I misread it. Forget about that.


private

def build_connection_prelude
Expand All @@ -106,11 +118,15 @@ def initialize(
@slots = build_slot_node_mappings(EMPTY_ARRAY)
@replications = build_replication_mappings(EMPTY_ARRAY)
klass = make_topology_class(config.use_replica?, config.replica_affinity)
@topology = klass.new(pool, @concurrent_worker, **kwargs)
@command = ::RedisClient::Cluster::Command.new
@base_connection_configuration = { **kwargs, cluster_commands: @command }
@topology = klass.new(pool, @concurrent_worker, **@base_connection_configuration)
@config = config
@mutex = Mutex.new
end

attr_reader :command

def inspect
"#<#{self.class.name} #{node_keys.join(', ')}>"
end
Expand Down Expand Up @@ -212,6 +228,13 @@ def reload!
end
@slots = build_slot_node_mappings(@node_info)
@replications = build_replication_mappings(@node_info)

# Call COMMAND to find out the commands available on this cluster. We only call this once
# the first time the client is constructed; if you perform a rolling update to a new version
# of Redis, for example, applications won't know about the new commands available until they
# construct new client objects (or, more likely, are restarted).
@command.load!(startup_clients, slow_command_timeout: @config.slow_command_timeout) unless @command.loaded?

@topology.process_topology_update!(@replications, @node_configs)
end
end
Expand Down Expand Up @@ -404,7 +427,7 @@ def with_startup_clients(count) # rubocop:disable Metrics/AbcSize
# Memoize the startup clients, so we maintain RedisClient's internal circuit breaker configuration
# if it's set.
@startup_clients ||= @config.startup_nodes.values.sample(count).map do |node_config|
::RedisClient::Cluster::Node::Config.new(**node_config).new_client
::RedisClient::Cluster::Node::Config.new(**@base_connection_configuration.merge(node_config)).new_client
end
yield @startup_clients
ensure
Expand Down
59 changes: 59 additions & 0 deletions lib/redis_client/cluster/pinning.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# frozen_string_literal: true

class RedisClient
class Cluster
module Pinning
module ClientMixin
attr_reader :locked_key_slot

# This gets called when handing out connections in Cluster#with to lock the returned
# connections to a given slot.
def locked_to_key_slot(key_slot)
raise ArgumentError, 'recursive slot locking is not allowed' if @locked_key_slot

begin
@locked_key_slot = key_slot
yield
ensure
@locked_key_slot = nil
end
end
end

# This middleware is what actually enforces the slot locking above.
module ClientMiddleware
def initialize(client)
@client = client
super
end

def assert_slot_valid!(command, config) # rubocop:disable Metrics/AbcSize
return unless @client.locked_key_slot
return unless config.cluster_commands.loaded?

keys = config.cluster_commands.extract_all_keys(command)
key_slots = keys.map { |k| ::RedisClient::Cluster::KeySlotConverter.convert(k) }
locked_slot = ::RedisClient::Cluster::KeySlotConverter.convert(@client.locked_key_slot)
return if key_slots.all? { |slot| slot == locked_slot }

key_slot_pairs = keys.zip(key_slots).map { |key, slot| "#{key} => #{slot}" }.join(', ')
raise ::RedisClient::Cluster::Transaction::ConsistencyError, <<~MESSAGE
Connection is pinned to slot #{locked_slot} (via key #{@client.locked_key_slot}). \
However, command #{command.inspect} has keys hashing to slots #{key_slot_pairs}. \
Transactions in redis cluster must only refer to keys hashing to the same slot.
MESSAGE
end

def call(command, config)
assert_slot_valid!(command, config)
super
end

def call_pipelined(command, config)
assert_slot_valid!(command, config)
super
end
end
end
end
end
13 changes: 8 additions & 5 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs)
@client_kwargs = kwargs
@node = ::RedisClient::Cluster::Node.new(concurrent_worker, config: config, pool: pool, **kwargs)
update_cluster_info!
@command = ::RedisClient::Cluster::Command.load(@node.replica_clients.shuffle, slow_command_timeout: config.slow_command_timeout)
@command_builder = @config.command_builder
end

Expand Down Expand Up @@ -163,12 +162,12 @@ def find_node_key_by_key(key, seed: nil, primary: false)
end

def find_node_key(command, seed: nil)
key = @command.extract_first_key(command)
find_node_key_by_key(key, seed: seed, primary: @command.should_send_to_primary?(command))
key = cluster_commands.extract_first_key(command)
find_node_key_by_key(key, seed: seed, primary: cluster_commands.should_send_to_primary?(command))
end

def find_primary_node_key(command)
key = @command.extract_first_key(command)
key = cluster_commands.extract_first_key(command)
return nil unless key&.size&.> 0

find_node_key_by_key(key, primary: true)
Expand All @@ -184,8 +183,12 @@ def find_node(node_key, retry_count: 3)
retry
end

def cluster_commands
@node.command
end

def command_exists?(name)
@command.exists?(name)
cluster_commands.exists?(name)
end

def assign_redirection_node(err_msg)
Expand Down
10 changes: 5 additions & 5 deletions test/redis_client/cluster/node/test_latency_replica.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,36 @@ class TestLatencyReplica < TestingWrapper

def test_clients_with_redis_client
got = @test_node.clients
got.each { |client| assert_instance_of(::RedisClient, client) }
got.each { |client| assert_kind_of(::RedisClient, client) }
assert_equal(%w[master slave], got.map { |v| v.call('ROLE').first }.uniq.sort)
end

def test_clients_with_pooled_redis_client
test_node = make_node(pool: { timeout: 3, size: 2 })
got = test_node.clients
got.each { |client| assert_instance_of(::RedisClient::Pooled, client) }
got.each { |client| assert_kind_of(::RedisClient::Pooled, client) }
assert_equal(%w[master slave], got.map { |v| v.call('ROLE').first }.uniq.sort)
end

def test_primary_clients
got = @test_node.primary_clients
got.each do |client|
assert_instance_of(::RedisClient, client)
assert_kind_of(::RedisClient, client)
assert_equal('master', client.call('ROLE').first)
end
end

def test_replica_clients
got = @test_node.replica_clients
got.each do |client|
assert_instance_of(::RedisClient, client)
assert_kind_of(::RedisClient, client)
assert_equal('slave', client.call('ROLE').first)
end
end

def test_clients_for_scanning
got = @test_node.clients_for_scanning
got.each { |client| assert_instance_of(::RedisClient, client) }
got.each { |client| assert_kind_of(::RedisClient, client) }
assert_equal(TEST_SHARD_SIZE, got.size)
end

Expand Down
10 changes: 5 additions & 5 deletions test/redis_client/cluster/node/test_primary_only.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class TestPrimaryOnly < TestingWrapper
def test_clients_with_redis_client
got = @test_node.clients
got.each do |client|
assert_instance_of(::RedisClient, client)
assert_kind_of(::RedisClient, client)
assert_equal('master', client.call('ROLE').first)
end
end
Expand All @@ -22,31 +22,31 @@ def test_clients_with_pooled_redis_client
test_node = make_node(pool: { timeout: 3, size: 2 })
got = test_node.clients
got.each do |client|
assert_instance_of(::RedisClient::Pooled, client)
assert_kind_of(::RedisClient::Pooled, client)
assert_equal('master', client.call('ROLE').first)
end
end

def test_primary_clients
got = @test_node.primary_clients
got.each do |client|
assert_instance_of(::RedisClient, client)
assert_kind_of(::RedisClient, client)
assert_equal('master', client.call('ROLE').first)
end
end

def test_replica_clients
got = @test_node.replica_clients
got.each do |client|
assert_instance_of(::RedisClient, client)
assert_kind_of(::RedisClient, client)
assert_equal('master', client.call('ROLE').first)
end
end

def test_clients_for_scanning
got = @test_node.clients_for_scanning
got.each do |client|
assert_instance_of(::RedisClient, client)
assert_kind_of(::RedisClient, client)
assert_equal('master', client.call('ROLE').first)
end
end
Expand Down
Loading