Skip to content

test: fix cluster broken case #390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ defaults:
jobs:
main:
name: Main
timeout-minutes: 10
timeout-minutes: 15
runs-on: ubuntu-latest
strategy:
fail-fast: false
max-parallel: 10
matrix:
include:
- {redis: '7.2', ruby: '3.3'}
- {task: test_cluster_broken, restart: 'no', startup: '6'}
- {redis: '7.2', ruby: '3.3', compose: compose.ssl.yaml}
- {redis: '7.2', ruby: '3.3', driver: 'hiredis'}
- {redis: '7.2', ruby: '3.3', driver: 'hiredis', compose: compose.ssl.yaml}
Expand All @@ -38,14 +37,15 @@ jobs:
- {task: test_cluster_state, pattern: 'ScaleReadRandom', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadRandomWithPrimary', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {ruby: 'jruby'}
- {ruby: 'truffleruby'}
- {task: test_cluster_down}
- {redis: '8', ruby: '3.3', compose: compose.valkey.yaml, replica: '2'}
- {redis: '7.2', ruby: '3.2', compose: compose.auth.yaml}
- {redis: '7.0', ruby: '3.1'}
- {redis: '6.2', ruby: '3.0'}
- {redis: '5.0', ruby: '2.7'}
- {task: test_cluster_down}
- {task: test_cluster_broken, restart: 'no', startup: '6'}
- {ruby: 'jruby'}
- {ruby: 'truffleruby'}
- {task: test_cluster_scale, pattern: 'Single', compose: compose.scale.yaml, startup: '8'}
- {task: test_cluster_scale, pattern: 'Pipeline', compose: compose.scale.yaml, startup: '8'}
- {task: test_cluster_scale, pattern: 'Transaction', compose: compose.scale.yaml, startup: '8'}
Expand Down
60 changes: 30 additions & 30 deletions test/cluster_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def failover
replica_info = rows.find { |row| row.primary_id == primary_info.id }

wait_replication_delay(@clients, replica_size: @replica_size, timeout: @timeout)
replica_info.client.call('CLUSTER', 'FAILOVER', 'TAKEOVER')
replica_info.client.call_once('CLUSTER', 'FAILOVER', 'TAKEOVER')
wait_failover(
@clients,
primary_node_key: primary_info.node_key,
Expand All @@ -117,24 +117,24 @@ def start_resharding(slot:, src_node_key:, dest_node_key:)
dest_host, dest_port = dest_info.node_key.split(':')

# @see https://redis.io/commands/cluster-setslot/#redis-cluster-live-resharding-explained
dest_client.call('CLUSTER', 'SETSLOT', slot, 'IMPORTING', src_node_id)
src_client.call('CLUSTER', 'SETSLOT', slot, 'MIGRATING', dest_node_id)
dest_client.call_once('CLUSTER', 'SETSLOT', slot, 'IMPORTING', src_node_id)
src_client.call_once('CLUSTER', 'SETSLOT', slot, 'MIGRATING', dest_node_id)

db_idx = '0'
timeout_msec = @timeout.to_i * 1000

number_of_keys = src_client.call('CLUSTER', 'COUNTKEYSINSLOT', slot)
keys = src_client.call('CLUSTER', 'GETKEYSINSLOT', slot, number_of_keys)
number_of_keys = src_client.call_once('CLUSTER', 'COUNTKEYSINSLOT', slot)
keys = src_client.call_once('CLUSTER', 'GETKEYSINSLOT', slot, number_of_keys)
print_debug("#{src_client.config.host}:#{src_client.config.port} => #{dest_client.config.host}:#{dest_client.config.port} ... #{keys}")
return if keys.empty?

begin
src_client.call('MIGRATE', dest_host, dest_port, '', db_idx, timeout_msec, 'KEYS', *keys)
src_client.call_once('MIGRATE', dest_host, dest_port, '', db_idx, timeout_msec, 'KEYS', *keys)
rescue ::RedisClient::CommandError => e
raise unless e.message.start_with?('IOERR')

# retry once
src_client.call('MIGRATE', dest_host, dest_port, '', db_idx, timeout_msec, 'REPLACE', 'KEYS', *keys)
src_client.call_once('MIGRATE', dest_host, dest_port, '', db_idx, timeout_msec, 'REPLACE', 'KEYS', *keys)
end

wait_replication_delay(@clients, replica_size: @replica_size, timeout: @timeout)
Expand All @@ -151,7 +151,7 @@ def finish_resharding(slot:, src_node_key:, dest_node_key:)
rest = rows.reject { |r| r.replica? || r.client.equal?(src) || r.client.equal?(dest) }.map(&:client)

([dest, src] + rest).each do |cli|
cli.call('CLUSTER', 'SETSLOT', slot, 'NODE', id)
cli.call_once('CLUSTER', 'SETSLOT', slot, 'NODE', id)
print_debug("#{cli.config.host}:#{cli.config.port} ... CLUSTER SETSLOT #{slot} NODE #{id}")
rescue ::RedisClient::CommandError => e
raise unless e.message.start_with?('ERR Please use SETSLOT only with masters.')
Expand All @@ -174,12 +174,12 @@ def scale_out(primary_url:, replica_url:)
@shard_size += 1
@number_of_replicas = @replica_size * @shard_size

primary.call('CLUSTER', 'MEET', target_host, target_port)
replica.call('CLUSTER', 'MEET', target_host, target_port)
primary.call_once('CLUSTER', 'MEET', target_host, target_port)
replica.call_once('CLUSTER', 'MEET', target_host, target_port)
wait_meeting(@clients, max_attempts: @max_attempts)

primary_id = primary.call('CLUSTER', 'MYID')
replica.call('CLUSTER', 'REPLICATE', primary_id)
primary_id = primary.call_once('CLUSTER', 'MYID')
replica.call_once('CLUSTER', 'REPLICATE', primary_id)
save_config(@clients)
wait_for_cluster_to_be_ready(skip_clients: [primary, replica])

Expand Down Expand Up @@ -213,16 +213,16 @@ def scale_in
threads = @clients.map do |cli|
Thread.new(cli) do |c|
c.pipelined do |pi|
pi.call('CLUSTER', 'FORGET', replica_info.id)
pi.call('CLUSTER', 'FORGET', primary_info.id)
pi.call_once('CLUSTER', 'FORGET', replica_info.id)
pi.call_once('CLUSTER', 'FORGET', primary_info.id)
end
rescue ::RedisClient::Error
# ignore
end
end
threads.each(&:join)
replica.call('CLUSTER', 'RESET', 'SOFT')
primary.call('CLUSTER', 'RESET', 'SOFT')
replica.call_once('CLUSTER', 'RESET', 'SOFT')
primary.call_once('CLUSTER', 'RESET', 'SOFT')
@clients.reject! { |c| c.equal?(primary) || c.equal?(replica) }
@shard_size -= 1
@number_of_replicas = @replica_size * @shard_size
Expand Down Expand Up @@ -266,7 +266,7 @@ def close

def flush_all_data(clients)
clients.each do |c|
c.call('FLUSHALL')
c.call_once('FLUSHALL')
print_debug("#{c.config.host}:#{c.config.port} ... FLUSHALL")
rescue ::RedisClient::CommandError, ::RedisClient::ReadOnlyError
# READONLY You can't write against a read only replica.
Expand All @@ -277,7 +277,7 @@ def flush_all_data(clients)

def reset_cluster(clients)
clients.each do |c|
c.call('CLUSTER', 'RESET', 'HARD')
c.call_once('CLUSTER', 'RESET', 'HARD')
print_debug("#{c.config.host}:#{c.config.port} ... CLUSTER RESET HARD")
rescue ::RedisClient::ConnectionError => e
print_debug("#{c.config.host}:#{c.config.port} ... CLUSTER RESET HARD: #{e.class}: #{e.message}")
Expand All @@ -294,15 +294,15 @@ def assign_slots(clients, shard_size:)
slot_idx = 0
primaries.zip(slot_sizes).each do |c, s|
slot_range = slot_idx..slot_idx + s - 1
c.call('CLUSTER', 'ADDSLOTS', *slot_range.to_a)
c.call_once('CLUSTER', 'ADDSLOTS', *slot_range.to_a)
slot_idx += s
print_debug("#{c.config.host}:#{c.config.port} ... CLUSTER ADDSLOTS #{slot_range.to_a}")
end
end

def save_config_epoch(clients)
clients.each_with_index do |c, i|
c.call('CLUSTER', 'SET-CONFIG-EPOCH', i + 1)
c.call_once('CLUSTER', 'SET-CONFIG-EPOCH', i + 1)
print_debug("#{c.config.host}:#{c.config.port} ... CLUSTER SET-CONFIG-EPOCH #{i + 1}")
rescue ::RedisClient::CommandError
# ERR Node config epoch is already non-zero
Expand All @@ -315,7 +315,7 @@ def meet_each_other(clients)
rows = parse_cluster_nodes(rows)
target_host, target_port = rows.first.node_key.split(':')
clients.drop(1).each do |c|
c.call('CLUSTER', 'MEET', target_host, target_port)
c.call_once('CLUSTER', 'MEET', target_host, target_port)
print_debug("#{c.config.host}:#{c.config.port} ... CLUSTER MEET #{target_host}:#{target_port}")
end
end
Expand All @@ -335,19 +335,19 @@ def replicate(clients, shard_size:, replica_size:)
replicas = take_replicas(clients, shard_size: shard_size)

replicas.each_slice(replica_size).each_with_index do |subset, i|
primary_id = primaries[i].call('CLUSTER', 'MYID')
primary_id = primaries[i].call_once('CLUSTER', 'MYID')

loop do
begin
subset.each do |replica|
replica.call('CLUSTER', 'REPLICATE', primary_id)
replica.call_once('CLUSTER', 'REPLICATE', primary_id)
print_debug("#{replica.config.host}:#{replica.config.port} ... CLUSTER REPLICATE #{primaries[i].config.host}:#{primaries[i].config.port}")
end
rescue ::RedisClient::CommandError => e
print_debug(e.message)
# ERR Unknown node [node-id]
sleep SLEEP_SEC
primary_id = primaries[i].call('CLUSTER', 'MYID')
primary_id = primaries[i].call_once('CLUSTER', 'MYID')
next
end

Expand All @@ -358,7 +358,7 @@ def replicate(clients, shard_size:, replica_size:)

def save_config(clients)
clients.each do |c|
c.call('CLUSTER', 'SAVECONFIG')
c.call_once('CLUSTER', 'SAVECONFIG')
print_debug("#{c.config.host}:#{c.config.port} ... CLUSTER SAVECONFIG")
end
end
Expand Down Expand Up @@ -412,7 +412,7 @@ def wait_cluster_recovering(clients, max_attempts:, skip_clients: [])
key = 0
wait_for_state(clients, max_attempts: max_attempts) do |client|
print_debug("#{client.config.host}:#{client.config.port} ... GET #{key}")
client.call('GET', key) if primary_client?(client) && !skip_clients.include?(client)
client.call_once('GET', key) if primary_client?(client) && !skip_clients.include?(client)
true
rescue ::RedisClient::CommandError => e
if e.message.start_with?('CLUSTERDOWN')
Expand Down Expand Up @@ -443,11 +443,11 @@ def wait_for_state(clients, max_attempts:)
end

def hashify_cluster_info(client)
client.call('CLUSTER', 'INFO').split("\r\n").to_h { |v| v.split(':') }
client.call_once('CLUSTER', 'INFO').split("\r\n").to_h { |v| v.split(':') }
end

def fetch_cluster_nodes(client)
client.call('CLUSTER', 'NODES').split("\n").map(&:split)
client.call_once('CLUSTER', 'NODES').split("\n").map(&:split)
end

def associate_with_clients_and_nodes(clients)
Expand Down Expand Up @@ -502,11 +502,11 @@ def take_replicas(clients, shard_size:)
end

def primary_client?(client)
client.call('ROLE').first == 'master'
client.call_once('ROLE').first == 'master'
end

def replica_client?(client)
client.call('ROLE').first == 'slave'
client.call_once('ROLE').first == 'slave'
end

def print_debug(msg)
Expand Down
Loading