Skip to content

Commit 4f578b8

Browse files
authored
Add middleware to ensure Router only handles errors for its clients (#310)
2 parents 09484ea + 65cef5a commit 4f578b8

File tree

4 files changed

+76
-4
lines changed

4 files changed

+76
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# frozen_string_literal: true
2+
3+
class RedisClient
4+
class Cluster
5+
module ErrorIdentification
6+
def self.client_owns_error?(err, client)
7+
err.is_a?(TaggedError) && err.from?(client)
8+
end
9+
10+
module TaggedError
11+
attr_accessor :config_instance
12+
13+
def from?(client)
14+
client.config.equal?(config_instance)
15+
end
16+
end
17+
18+
module Middleware
19+
def connect(config)
20+
super
21+
rescue RedisClient::Error => e
22+
identify_error(e, config)
23+
raise
24+
end
25+
26+
def call(_command, config)
27+
super
28+
rescue RedisClient::Error => e
29+
identify_error(e, config)
30+
raise
31+
end
32+
alias call_pipelined call
33+
34+
private
35+
36+
def identify_error(err, config)
37+
err.singleton_class.include(TaggedError)
38+
err.config_instance = config
39+
end
40+
end
41+
end
42+
end
43+
end

lib/redis_client/cluster/node.rb

+9-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
require 'redis_client'
44
require 'redis_client/config'
5+
require 'redis_client/cluster/error_identification'
56
require 'redis_client/cluster/errors'
67
require 'redis_client/cluster/node/primary_only'
78
require 'redis_client/cluster/node/random_replica'
@@ -78,9 +79,11 @@ def []=(index, element)
7879
end
7980

8081
class Config < ::RedisClient::Config
81-
def initialize(scale_read: false, **kwargs)
82+
def initialize(scale_read: false, middlewares: nil, **kwargs)
8283
@scale_read = scale_read
83-
super(**kwargs)
84+
middlewares ||= []
85+
middlewares.unshift ErrorIdentification::Middleware
86+
super(middlewares: middlewares, **kwargs)
8487
end
8588

8689
private
@@ -214,6 +217,10 @@ def reload!
214217
end
215218
end
216219

220+
def owns_error?(err)
221+
any? { |c| ErrorIdentification.client_owns_error?(err, c) }
222+
end
223+
217224
private
218225

219226
def make_topology_class(with_replica, replica_affinity)

lib/redis_client/cluster/router.rb

+6-2
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
6565
raise if e.errors.any?(::RedisClient::CircuitBreaker::OpenCircuitError)
6666

6767
update_cluster_info! if e.errors.values.any? do |err|
68-
err.message.start_with?('CLUSTERDOWN Hash slot not served')
68+
@node.owns_error?(err) && err.message.start_with?('CLUSTERDOWN Hash slot not served')
6969
end
7070

7171
raise
@@ -94,6 +94,8 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me
9494
rescue ::RedisClient::CircuitBreaker::OpenCircuitError
9595
raise
9696
rescue ::RedisClient::CommandError => e
97+
raise unless ErrorIdentification.client_owns_error?(e, node)
98+
9799
if e.message.start_with?('MOVED')
98100
node = assign_redirection_node(e.message)
99101
retry_count -= 1
@@ -111,7 +113,9 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me
111113
retry if retry_count >= 0
112114
end
113115
raise
114-
rescue ::RedisClient::ConnectionError
116+
rescue ::RedisClient::ConnectionError => e
117+
raise unless ErrorIdentification.client_owns_error?(e, node)
118+
115119
update_cluster_info!
116120

117121
raise if retry_count <= 0

test/redis_client/test_cluster.rb

+18
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,24 @@ def test_circuit_breakers
485485
cli&.close
486486
end
487487

488+
def test_only_reshards_own_errors
489+
@client.call_v(%w[SADD testkey testvalue1])
490+
@client.call_v(%w[SADD testkey testvalue2])
491+
slot = ::RedisClient::Cluster::KeySlotConverter.convert('testkey')
492+
router = @client.instance_variable_get(:@router)
493+
correct_primary_key = router.find_node_key_by_key('testkey', primary: true)
494+
broken_primary_key = (router.node_keys - [correct_primary_key]).first
495+
assert_raises(RedisClient::CommandError) do
496+
@client.sscan('testkey', retry_count: 0) do
497+
raise RedisClient::CommandError, "MOVED #{slot} #{broken_primary_key}"
498+
end
499+
end
500+
501+
# The exception should not have causes @client to update its shard mappings, because it didn't
502+
# come from a RedisClient instance that @client knows about.
503+
assert_equal correct_primary_key, router.find_node_key_by_key('testkey', primary: true)
504+
end
505+
488506
private
489507

490508
def wait_for_replication

0 commit comments

Comments
 (0)