Skip to content

Commit 4ee1116

Browse files
authored
perf: lessen reload frequency to mitigate load of servers (#377)
1 parent a1ca9b3 commit 4ee1116

9 files changed

+140
-68
lines changed

lib/redis_client/cluster/node.rb

+17-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class Node
2323
ROLE_FLAGS = %w[master slave].freeze
2424
EMPTY_ARRAY = [].freeze
2525
EMPTY_HASH = {}.freeze
26+
STATE_REFRESH_INTERVAL = (3..10).freeze
2627

2728
private_constant :USE_CHAR_ARRAY_SLOT, :SLOT_SIZE, :MIN_SLOT, :MAX_SLOT,
2829
:DEAD_FLAGS, :ROLE_FLAGS, :EMPTY_ARRAY, :EMPTY_HASH
@@ -103,6 +104,8 @@ def initialize(concurrent_worker, config:, pool: nil, **kwargs)
103104
@config = config
104105
@mutex = Mutex.new
105106
@last_reloaded_at = nil
107+
@reload_times = 0
108+
@random = Random.new
106109
end
107110

108111
def inspect
@@ -419,15 +422,27 @@ def with_reload_lock
419422
# performed the reload.
420423
# Probably in the future we should add a circuit breaker to #reload itself, and stop trying if the cluster is
421424
# obviously not working.
422-
wait_start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
425+
wait_start = obtain_current_time
423426
@mutex.synchronize do
424427
return if @last_reloaded_at && @last_reloaded_at > wait_start
425428

429+
if @last_reloaded_at && @reload_times > 1
430+
# Mitigate load of servers by naive logic. Don't sleep with exponential backoff.
431+
now = obtain_current_time
432+
elapsed = @last_reloaded_at + @random.rand(STATE_REFRESH_INTERVAL) * 1_000_000
433+
return if now < elapsed
434+
end
435+
426436
r = yield
427-
@last_reloaded_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
437+
@last_reloaded_at = obtain_current_time
438+
@reload_times += 1
428439
r
429440
end
430441
end
442+
443+
def obtain_current_time
444+
Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond)
445+
end
431446
end
432447
end
433448
end

lib/redis_client/cluster/router.rb

+10-15
Original file line numberDiff line numberDiff line change
@@ -107,32 +107,29 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me
107107
rescue ::RedisClient::CommandError => e
108108
raise unless ::RedisClient::Cluster::ErrorIdentification.client_owns_error?(e, node)
109109

110+
retry_count -= 1
110111
if e.message.start_with?('MOVED')
111112
node = assign_redirection_node(e.message)
112-
retry_count -= 1
113113
retry if retry_count >= 0
114114
elsif e.message.start_with?('ASK')
115115
node = assign_asking_node(e.message)
116-
retry_count -= 1
117116
if retry_count >= 0
118117
node.call('ASKING')
119118
retry
120119
end
121120
elsif e.message.start_with?('CLUSTERDOWN Hash slot not served')
122121
@node.reload!
123-
retry_count -= 1
124122
retry if retry_count >= 0
125123
end
124+
126125
raise
127126
rescue ::RedisClient::ConnectionError => e
128127
raise unless ::RedisClient::Cluster::ErrorIdentification.client_owns_error?(e, node)
129128

130-
@node.reload!
131-
132-
raise if retry_count <= 0
133-
134129
retry_count -= 1
135-
retry
130+
@node.reload!
131+
retry if retry_count >= 0
132+
raise
136133
end
137134

138135
def scan(*command, seed: nil, **kwargs) # rubocop:disable Metrics/AbcSize
@@ -200,13 +197,13 @@ def find_slot_by_key(key)
200197
::RedisClient::Cluster::KeySlotConverter.convert(key)
201198
end
202199

203-
def find_node(node_key, retry_count: 3)
200+
def find_node(node_key, retry_count: 1)
204201
@node.find_by(node_key)
205202
rescue ::RedisClient::Cluster::Node::ReloadNeeded
206203
raise ::RedisClient::Cluster::NodeMightBeDown if retry_count <= 0
207204

208-
@node.reload!
209205
retry_count -= 1
206+
@node.reload!
210207
retry
211208
end
212209

@@ -236,17 +233,15 @@ def close
236233

237234
private
238235

239-
def send_wait_command(method, command, args, retry_count: 3, &block) # rubocop:disable Metrics/AbcSize
236+
def send_wait_command(method, command, args, retry_count: 1, &block) # rubocop:disable Metrics/AbcSize
240237
@node.call_primaries(method, command, args).select { |r| r.is_a?(Integer) }.sum.then(&TSF.call(block))
241238
rescue ::RedisClient::Cluster::ErrorCollection => e
242239
raise if e.errors.any?(::RedisClient::CircuitBreaker::OpenCircuitError)
243240
raise if retry_count <= 0
244-
raise if e.errors.values.none? do |err|
245-
err.message.include?('WAIT cannot be used with replica instances')
246-
end
241+
raise if e.errors.values.none? { |err| err.message.include?('WAIT cannot be used with replica instances') }
247242

248-
@node.reload!
249243
retry_count -= 1
244+
@node.reload!
250245
retry
251246
end
252247

test/middlewares/redirection_count.rb renamed to test/middlewares/redirect_count.rb

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# frozen_string_literal: true
22

33
module Middlewares
4-
module RedirectionCount
4+
module RedirectCount
55
class Counter
6-
Result = Struct.new('RedirectionCountResult', :moved, :ask, keyword_init: true)
6+
Result = Struct.new('RedirectCountResult', :moved, :ask, keyword_init: true)
77

88
def initialize
99
@moved = 0
@@ -39,9 +39,9 @@ def call(cmd, cfg)
3939
super
4040
rescue ::RedisClient::CommandError => e
4141
if e.message.start_with?('MOVED')
42-
cfg.custom.fetch(:redirection_count).moved
42+
cfg.custom.fetch(:redirect_count).moved
4343
elsif e.message.start_with?('ASK')
44-
cfg.custom.fetch(:redirection_count).ask
44+
cfg.custom.fetch(:redirect_count).ask
4545
end
4646

4747
raise
@@ -51,9 +51,9 @@ def call_pipelined(cmd, cfg)
5151
super
5252
rescue ::RedisClient::CommandError => e
5353
if e.message.start_with?('MOVED')
54-
cfg.custom.fetch(:redirection_count).moved
54+
cfg.custom.fetch(:redirect_count).moved
5555
elsif e.message.start_with?('ASK')
56-
cfg.custom.fetch(:redirection_count).ask
56+
cfg.custom.fetch(:redirect_count).ask
5757
end
5858

5959
raise

test/middlewares/redirection_emulation.rb renamed to test/middlewares/redirect_fake.rb

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
# frozen_string_literal: true
22

33
module Middlewares
4-
module RedirectionEmulation
4+
module RedirectFake
55
Setting = Struct.new(
6-
'RedirectionEmulationMiddlewareSetting',
6+
'RedirectFakeSetting',
77
:slot, :to, :command, keyword_init: true
88
)
99

1010
def call(cmd, cfg)
11-
s = cfg.custom.fetch(:redirect)
11+
s = cfg.custom.fetch(:redirect_fake)
1212
raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command
1313

1414
super
1515
end
1616

1717
def call_pipelined(cmd, cfg)
18-
s = cfg.custom.fetch(:redirect)
18+
s = cfg.custom.fetch(:redirect_fake)
1919
raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command
2020

2121
super

test/redis_client/test_cluster.rb

+15-15
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,19 @@ class TestCluster
77
module Mixin
88
def setup
99
@captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new
10-
@redirection_count = ::Middlewares::RedirectionCount::Counter.new
10+
@redirect_count = ::Middlewares::RedirectCount::Counter.new
1111
@client = new_test_client
1212
@client.call('FLUSHDB')
1313
wait_for_replication
1414
@captured_commands.clear
15-
@redirection_count.clear
15+
@redirect_count.clear
1616
end
1717

1818
def teardown
1919
@client&.call('FLUSHDB')
2020
wait_for_replication
2121
@client&.close
22-
flunk(@redirection_count.get) unless @redirection_count.zero?
22+
flunk(@redirect_count.get) unless @redirect_count.zero?
2323
end
2424

2525
def test_config
@@ -850,10 +850,10 @@ def test_only_reshards_own_errors
850850
client2 = new_test_client(
851851
middlewares: [
852852
::RedisClient::Cluster::ErrorIdentification::Middleware,
853-
::Middlewares::RedirectionEmulation
853+
::Middlewares::RedirectFake
854854
],
855855
custom: {
856-
redirect: ::Middlewares::RedirectionEmulation::Setting.new(
856+
redirect_fake: ::Middlewares::RedirectFake::Setting.new(
857857
slot: slot, to: broken_primary_key, command: %w[SET testkey client2]
858858
)
859859
}
@@ -925,8 +925,8 @@ class PrimaryOnly < TestingWrapper
925925
include Mixin
926926

927927
def new_test_client(
928-
custom: { captured_commands: @captured_commands, redirection_count: @redirection_count },
929-
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount],
928+
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
929+
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
930930
**opts
931931
)
932932
config = ::RedisClient::ClusterConfig.new(
@@ -946,8 +946,8 @@ class ScaleReadRandom < TestingWrapper
946946
include Mixin
947947

948948
def new_test_client(
949-
custom: { captured_commands: @captured_commands, redirection_count: @redirection_count },
950-
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount],
949+
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
950+
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
951951
**opts
952952
)
953953
config = ::RedisClient::ClusterConfig.new(
@@ -969,8 +969,8 @@ class ScaleReadRandomWithPrimary < TestingWrapper
969969
include Mixin
970970

971971
def new_test_client(
972-
custom: { captured_commands: @captured_commands, redirection_count: @redirection_count },
973-
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount],
972+
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
973+
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
974974
**opts
975975
)
976976
config = ::RedisClient::ClusterConfig.new(
@@ -992,8 +992,8 @@ class ScaleReadLatency < TestingWrapper
992992
include Mixin
993993

994994
def new_test_client(
995-
custom: { captured_commands: @captured_commands, redirection_count: @redirection_count },
996-
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount],
995+
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
996+
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
997997
**opts
998998
)
999999
config = ::RedisClient::ClusterConfig.new(
@@ -1015,8 +1015,8 @@ class Pooled < TestingWrapper
10151015
include Mixin
10161016

10171017
def new_test_client(
1018-
custom: { captured_commands: @captured_commands, redirection_count: @redirection_count },
1019-
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount],
1018+
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
1019+
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
10201020
**opts
10211021
)
10221022
config = ::RedisClient::ClusterConfig.new(

test/test_against_cluster_broken.rb

+5-2
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@ class TestAgainstClusterBroken < TestingWrapper
77

88
def setup
99
@captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new
10+
@redirect_count = ::Middlewares::RedirectCount::Counter.new
1011
@client = ::RedisClient.cluster(
1112
nodes: TEST_NODE_URIS,
1213
replica: true,
1314
fixed_hostname: TEST_FIXED_HOSTNAME,
14-
custom: { captured_commands: @captured_commands },
15-
middlewares: [::Middlewares::CommandCapture],
15+
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
16+
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
1617
**TEST_GENERIC_OPTIONS
1718
).new_client
1819
@client.call('echo', 'init')
@@ -22,11 +23,13 @@ def setup
2223
**TEST_GENERIC_OPTIONS.merge(timeout: 30.0)
2324
)
2425
@captured_commands.clear
26+
@redirect_count.clear
2527
end
2628

2729
def teardown
2830
@client&.close
2931
@controller&.close
32+
print "#{@redirect_count.get}, ClusterNodesCall: #{@captured_commands.count('cluster', 'nodes')} = "
3033
end
3134

3235
def test_a_replica_is_down

test/test_against_cluster_scale.rb

+5-2
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,24 @@ def self.test_order
1111

1212
def setup
1313
@captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new
14+
@redirect_count = ::Middlewares::RedirectCount::Counter.new
1415
@client = ::RedisClient.cluster(
1516
nodes: TEST_NODE_URIS,
1617
replica: true,
1718
fixed_hostname: TEST_FIXED_HOSTNAME,
18-
custom: { captured_commands: @captured_commands },
19-
middlewares: [::Middlewares::CommandCapture],
19+
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
20+
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
2021
**TEST_GENERIC_OPTIONS
2122
).new_client
2223
@client.call('echo', 'init')
2324
@captured_commands.clear
25+
@redirect_count.clear
2426
end
2527

2628
def teardown
2729
@client&.close
2830
@controller&.close
31+
print "#{@redirect_count.get}, ClusterNodesCall: #{@captured_commands.count('cluster', 'nodes')} = "
2932
end
3033

3134
def test_01_scale_out

0 commit comments

Comments
 (0)