Skip to content

Commit f133e2b

Browse files
authored
fix: ensure recoverability from cluster down for pubsub and transaction (#381)
1 parent b0fbffa commit f133e2b

File tree

2 files changed

+39
-21
lines changed

2 files changed

+39
-21
lines changed

lib/redis_client/cluster/pub_sub.rb

+33-20
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,13 @@ def next_event(timeout = nil) # rubocop:disable Metrics/AbcSize, Metrics/Cycloma
8989

9090
case event = @queue.pop(true)
9191
when ::RedisClient::CommandError
92-
if event.message.start_with?('MOVED', 'CLUSTERDOWN Hash slot not served')
93-
@router.renew_cluster_state
94-
break start_over
95-
end
92+
raise event unless event.message.start_with?('MOVED', 'CLUSTERDOWN Hash slot not served')
9693

97-
raise event
94+
@router.renew_cluster_state
95+
break start_over
96+
when ::RedisClient::ConnectionError
97+
@router.renew_cluster_state
98+
break start_over
9899
when StandardError then raise event
99100
when Array then break event
100101
end
@@ -114,25 +115,20 @@ def _call(command)
114115
end
115116
end
116117

117-
def call_to_single_state(command, retry_count: 1)
118+
def call_to_single_state(command)
118119
node_key = @router.find_node_key(command)
119-
@state_dict[node_key] ||= State.new(@router.find_node(node_key).pubsub, @queue)
120-
@state_dict[node_key].call(command)
121-
rescue ::RedisClient::ConnectionError
122-
@state_dict[node_key].close
123-
@state_dict.delete(node_key)
124-
@router.renew_cluster_state
125-
retry_count -= 1
126-
retry_count >= 0 ? retry : raise
120+
121+
handle_connection_error(node_key) do
122+
@state_dict[node_key] ||= State.new(@router.find_node(node_key).pubsub, @queue)
123+
@state_dict[node_key].call(command)
124+
end
127125
end
128126

129127
def call_to_all_states(command)
130128
@state_dict.each do |node_key, state|
131-
state.call(command)
132-
rescue ::RedisClient::ConnectionError
133-
@state_dict[node_key].close
134-
@state_dict.delete(node_key)
135-
@router.renew_cluster_state
129+
handle_connection_error(node_key, ignore: true) do
130+
state.call(command)
131+
end
136132
end
137133
end
138134

@@ -152,10 +148,27 @@ def calc_max_duration(timeout)
152148
timeout.nil? || timeout < 0 ? 0 : timeout * 1_000_000
153149
end
154150

151+
def handle_connection_error(node_key, ignore: false)
152+
yield
153+
rescue ::RedisClient::ConnectionError
154+
@state_dict[node_key].close
155+
@state_dict.delete(node_key)
156+
@router.renew_cluster_state
157+
raise unless ignore
158+
end
159+
155160
def start_over
156161
@state_dict.each_value(&:close)
157162
@state_dict.clear
158-
@commands.each { |command| _call(command) }
163+
@commands.each do |command|
164+
loop do
165+
_call(command)
166+
break
167+
rescue ::RedisClient::ConnectionError
168+
sleep 1.0
169+
end
170+
end
171+
159172
nil
160173
end
161174
end

lib/redis_client/cluster/router.rb

+6-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,12 @@ def assign_node(command)
175175
def find_node_key_by_key(key, seed: nil, primary: false)
176176
if key && !key.empty?
177177
slot = ::RedisClient::Cluster::KeySlotConverter.convert(key)
178-
primary ? @node.find_node_key_of_primary(slot) : @node.find_node_key_of_replica(slot)
178+
node_key = primary ? @node.find_node_key_of_primary(slot) : @node.find_node_key_of_replica(slot)
179+
if node_key.nil?
180+
renew_cluster_state
181+
raise ::RedisClient::Cluster::NodeMightBeDown
182+
end
183+
node_key
179184
else
180185
primary ? @node.any_primary_node_key(seed: seed) : @node.any_replica_node_key(seed: seed)
181186
end

0 commit comments

Comments
 (0)