Skip to content

Commit 65cef5a

Browse files
author
KJ Tsanaktsidis
committed
Add middleware to ensure Router only handles errors for its clients
If you nest usages of RedisClient (perhaps becuase you're scanning from one client whilst inserting into a different cluster, perhaps), it's possible for errors which were raised from one cluster's RedisClients to instead be handled by the wrong cluster. This is bad, because it can mean processing bogus topology "updates" when e.g. MOVED or ASKING responses get caught by the wrong client. To fix this, we can add a middleware to our RedisClient instances (through the documented middleware interface), and tag errors at the source based on what config object they came from. This can then be used inside the router to identify errors that it should and should not handle.
1 parent 09484ea commit 65cef5a

File tree

4 files changed

+76
-4
lines changed

4 files changed

+76
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# frozen_string_literal: true
2+
3+
class RedisClient
4+
class Cluster
5+
module ErrorIdentification
6+
def self.client_owns_error?(err, client)
7+
err.is_a?(TaggedError) && err.from?(client)
8+
end
9+
10+
module TaggedError
11+
attr_accessor :config_instance
12+
13+
def from?(client)
14+
client.config.equal?(config_instance)
15+
end
16+
end
17+
18+
module Middleware
19+
def connect(config)
20+
super
21+
rescue RedisClient::Error => e
22+
identify_error(e, config)
23+
raise
24+
end
25+
26+
def call(_command, config)
27+
super
28+
rescue RedisClient::Error => e
29+
identify_error(e, config)
30+
raise
31+
end
32+
alias call_pipelined call
33+
34+
private
35+
36+
def identify_error(err, config)
37+
err.singleton_class.include(TaggedError)
38+
err.config_instance = config
39+
end
40+
end
41+
end
42+
end
43+
end

lib/redis_client/cluster/node.rb

+9-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
require 'redis_client'
44
require 'redis_client/config'
5+
require 'redis_client/cluster/error_identification'
56
require 'redis_client/cluster/errors'
67
require 'redis_client/cluster/node/primary_only'
78
require 'redis_client/cluster/node/random_replica'
@@ -78,9 +79,11 @@ def []=(index, element)
7879
end
7980

8081
class Config < ::RedisClient::Config
81-
def initialize(scale_read: false, **kwargs)
82+
def initialize(scale_read: false, middlewares: nil, **kwargs)
8283
@scale_read = scale_read
83-
super(**kwargs)
84+
middlewares ||= []
85+
middlewares.unshift ErrorIdentification::Middleware
86+
super(middlewares: middlewares, **kwargs)
8487
end
8588

8689
private
@@ -214,6 +217,10 @@ def reload!
214217
end
215218
end
216219

220+
def owns_error?(err)
221+
any? { |c| ErrorIdentification.client_owns_error?(err, c) }
222+
end
223+
217224
private
218225

219226
def make_topology_class(with_replica, replica_affinity)

lib/redis_client/cluster/router.rb

+6-2
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
6565
raise if e.errors.any?(::RedisClient::CircuitBreaker::OpenCircuitError)
6666

6767
update_cluster_info! if e.errors.values.any? do |err|
68-
err.message.start_with?('CLUSTERDOWN Hash slot not served')
68+
@node.owns_error?(err) && err.message.start_with?('CLUSTERDOWN Hash slot not served')
6969
end
7070

7171
raise
@@ -94,6 +94,8 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me
9494
rescue ::RedisClient::CircuitBreaker::OpenCircuitError
9595
raise
9696
rescue ::RedisClient::CommandError => e
97+
raise unless ErrorIdentification.client_owns_error?(e, node)
98+
9799
if e.message.start_with?('MOVED')
98100
node = assign_redirection_node(e.message)
99101
retry_count -= 1
@@ -111,7 +113,9 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me
111113
retry if retry_count >= 0
112114
end
113115
raise
114-
rescue ::RedisClient::ConnectionError
116+
rescue ::RedisClient::ConnectionError => e
117+
raise unless ErrorIdentification.client_owns_error?(e, node)
118+
115119
update_cluster_info!
116120

117121
raise if retry_count <= 0

test/redis_client/test_cluster.rb

+18
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,24 @@ def test_circuit_breakers
485485
cli&.close
486486
end
487487

488+
def test_only_reshards_own_errors
489+
@client.call_v(%w[SADD testkey testvalue1])
490+
@client.call_v(%w[SADD testkey testvalue2])
491+
slot = ::RedisClient::Cluster::KeySlotConverter.convert('testkey')
492+
router = @client.instance_variable_get(:@router)
493+
correct_primary_key = router.find_node_key_by_key('testkey', primary: true)
494+
broken_primary_key = (router.node_keys - [correct_primary_key]).first
495+
assert_raises(RedisClient::CommandError) do
496+
@client.sscan('testkey', retry_count: 0) do
497+
raise RedisClient::CommandError, "MOVED #{slot} #{broken_primary_key}"
498+
end
499+
end
500+
501+
# The exception should not have causes @client to update its shard mappings, because it didn't
502+
# come from a RedisClient instance that @client knows about.
503+
assert_equal correct_primary_key, router.find_node_key_by_key('testkey', primary: true)
504+
end
505+
488506
private
489507

490508
def wait_for_replication

0 commit comments

Comments
 (0)