Skip to content

Initial rework of sentinels client. #51

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion .github/workflows/test-coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,41 @@ jobs:
name: coverage-${{matrix.os}}-${{matrix.ruby}}
path: .covered.db

test-sentinel:
name: ${{matrix.ruby}} on ${{matrix.os}} (Sentinel)
runs-on: ${{matrix.os}}-latest

strategy:
matrix:
os:
- ubuntu

ruby:
- "3.3"

steps:
- uses: actions/checkout@v4

- name: Install Docker Compose
run: |
sudo apt-get update
sudo apt-get install -y docker-compose

- name: Run tests
timeout-minutes: 10
env:
RUBY_VERSION: ${{matrix.ruby}}
run: docker-compose -f sentinel/docker-compose.yaml up --exit-code-from tests

- uses: actions/upload-artifact@v3
with:
name: coverage-${{matrix.os}}-${{matrix.ruby}}
path: .covered.db

validate:
needs: test
needs:
- test
- test-sentinel
runs-on: ubuntu-latest

steps:
Expand Down
41 changes: 41 additions & 0 deletions .github/workflows/test-sentinel.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: Test Sentinel

on: [push, pull_request]

permissions:
contents: read

env:
CONSOLE_OUTPUT: XTerm

jobs:
test:
name: ${{matrix.ruby}} on ${{matrix.os}}
runs-on: ${{matrix.os}}-latest
continue-on-error: ${{matrix.experimental}}

strategy:
matrix:
os:
- ubuntu

ruby:
- "3.1"
- "3.2"
- "3.3"

experimental: [false]

steps:
- uses: actions/checkout@v4

- name: Install Docker Compose
run: |
sudo apt-get update
sudo apt-get install -y docker-compose

- name: Run tests
timeout-minutes: 10
env:
RUBY_VERSION: ${{matrix.ruby}}
run: docker-compose -f sentinel/docker-compose.yaml up --exit-code-from tests
2 changes: 1 addition & 1 deletion lib/async/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@

require_relative 'redis/version'
require_relative 'redis/client'
require_relative 'redis/sentinels'
require_relative 'redis/sentinel_client'
118 changes: 61 additions & 57 deletions lib/async/redis/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,65 @@ module Redis
class Client
include ::Protocol::Redis::Methods

module Methods
def subscribe(*channels)
context = Context::Subscribe.new(@pool, channels)

return context unless block_given?

begin
yield context
ensure
context.close
end
end

def transaction(&block)
context = Context::Transaction.new(@pool)

return context unless block_given?

begin
yield context
ensure
context.close
end
end

alias multi transaction

def pipeline(&block)
context = Context::Pipeline.new(@pool)

return context unless block_given?

begin
yield context
ensure
context.close
end
end

# Deprecated.
alias nested pipeline

def call(*arguments)
@pool.acquire do |connection|
connection.write_request(arguments)

connection.flush

return connection.read_response
end
end

def close
@pool.close
end
end

include Methods

def initialize(endpoint = Endpoint.local, protocol: endpoint.protocol, **options)
@endpoint = endpoint
@protocol = protocol
Expand All @@ -38,8 +97,8 @@ def initialize(endpoint = Endpoint.local, protocol: endpoint.protocol, **options

# @return [client] if no block provided.
# @yield [client, task] yield the client in an async task.
def self.open(*arguments, &block)
client = self.new(*arguments)
def self.open(*arguments, **options, &block)
client = self.new(*arguments, **options)

return client unless block_given?

Expand All @@ -52,61 +111,6 @@ def self.open(*arguments, &block)
end.wait
end

def close
@pool.close
end

def subscribe(*channels)
context = Context::Subscribe.new(@pool, channels)

return context unless block_given?

begin
yield context
ensure
context.close
end
end

def transaction(&block)
context = Context::Transaction.new(@pool)

return context unless block_given?

begin
yield context
ensure
context.close
end
end

alias multi transaction

def pipeline(&block)
context = Context::Pipeline.new(@pool)

return context unless block_given?

begin
yield context
ensure
context.close
end
end

# Deprecated.
alias nested pipeline

def call(*arguments)
@pool.acquire do |connection|
connection.write_request(arguments)

connection.flush

return connection.read_response
end
end

protected

def connect(**options)
Expand Down
6 changes: 5 additions & 1 deletion lib/async/redis/endpoint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ class Endpoint < ::IO::Endpoint::Generic

def self.local(**options)
self.new(LOCALHOST, **options)
end
end

def self.remote(host, port = 6379, **options)
self.new(URI.parse("redis://#{host}:#{port}"), **options)
end

SCHEMES = {
'redis' => URI::Generic,
Expand Down
153 changes: 153 additions & 0 deletions lib/async/redis/sentinel_client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2020, by David Ortiz.
# Copyright, 2023-2024, by Samuel Williams.

require_relative 'client'
require 'io/stream'

module Async
module Redis
class SentinelClient
DEFAULT_MASTER_NAME = 'mymaster'

include ::Protocol::Redis::Methods
include Client::Methods

# Create a new instance of the SentinelClient.
#
# @property endpoints [Array(Endpoint)] The list of sentinel endpoints.
# @property master_name [String] The name of the master instance, defaults to 'mymaster'.
# @property role [Symbol] The role of the instance that you want to connect to, either `:master` or `:slave`.
# @property protocol [Protocol] The protocol to use when connecting to the actual Redis server, defaults to {Protocol::RESP2}.
def initialize(endpoints, master_name: DEFAULT_MASTER_NAME, role: :master, protocol: Protocol::RESP2, **options)
@endpoints = endpoints
@master_name = master_name
@role = role
@protocol = protocol

# A cache of sentinel connections.
@sentinels = {}

@pool = connect(**options)
end

# @attribute [String] The name of the master instance.
attr :master_name

# @attribute [Symbol] The role of the instance that you want to connect to.
attr :role

def resolve_address(role = @role)
case role
when :master
resolve_master
when :slave
resolve_slave
else
raise ArgumentError, "Unknown instance role #{role}"
end => address

Console.debug(self, "Resolved #{@role} address: #{address}")

address or raise RuntimeError, "Unable to fetch #{role} via Sentinel."
end

def close
super

@sentinels.each do |_, client|
client.close
end
end

def failover(name = @master_name)
sentinels do |client|
return client.call('SENTINEL', 'FAILOVER', name)
end
end

def masters
sentinels do |client|
return client.call('SENTINEL', 'MASTERS').map{|fields| fields.each_slice(2).to_h}
end
end

def master(name = @master_name)
sentinels do |client|
return client.call('SENTINEL', 'MASTER', name).each_slice(2).to_h
end
end

def resolve_master
sentinels do |client|
begin
address = client.call('SENTINEL', 'GET-MASTER-ADDR-BY-NAME', @master_name)
rescue Errno::ECONNREFUSED
next
end

return Endpoint.remote(address[0], address[1]) if address
end

return nil
end

def resolve_slave
sentinels do |client|
begin
reply = client.call('SENTINEL', 'SLAVES', @master_name)
rescue Errno::ECONNREFUSED
next
end

slaves = available_slaves(reply)
next if slaves.empty?

slave = select_slave(slaves)
return Endpoint.remote(slave['ip'], slave['port'])
end

return nil
end

protected

# Override the parent method. The only difference is that this one needs to resolve the master/slave address.
def connect(**options)
Async::Pool::Controller.wrap(**options) do
endpoint = resolve_address
peer = endpoint.connect
stream = ::IO::Stream(peer)

@protocol.client(stream)
end
end

def sentinels
@endpoints.map do |endpoint|
@sentinels[endpoint] ||= Client.new(endpoint)

yield @sentinels[endpoint]
end
end

def available_slaves(reply)
# The reply is an array with the format: [field1, value1, field2,
# value2, etc.].
# When a slave is marked as down by the sentinel, the "flags" field
# (comma-separated array) contains the "s_down" value.
slaves = reply.map{|fields| fields.each_slice(2).to_h}

slaves.reject do |slave|
slave['flags'].split(',').include?('s_down')
end
end

def select_slave(available_slaves)
available_slaves.sample
end
end
end
end
Loading
Loading