Skip to content

Add parsing of multiple URLs for cluster mode in redis #1923

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"fmt"
"math"
"net"
"net/url"
"runtime"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -131,6 +133,143 @@ func (opt *ClusterOptions) init() {
}
}

// ParseClusterURLs parses an array of URLs into ClusterOptions that can be used to connect to Redis.
// The strings in the array must be in the form:
// redis://<user>:<password>@<host>:<port>
// or
// rediss://<user>:<password>@<host>:<port>
// All strings in the array must use the same scheme, username, and password.
//
// Most Option fields can be set using query parameters, with the following restrictions:
// - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
// - only scalar type fields are supported (bool, int, time.Duration)
// - for time.Duration fields, values must be a valid input for time.ParseDuration();
// additionally a plain integer as value (i.e. without unit) is intepreted as seconds
// - to disable a duration field, use value less than or equal to 0; to use the default
// value, leave the value blank or remove the parameter
// - only the last value is interpreted if a parameter is given multiple times
// - fields "network", "addr", "username" and "password" can only be set using other
// URL attributes (scheme, host, userinfo, resp.), query paremeters using these
// names will be treated as unknown parameters
// - unknown parameter names will result in an error
// - query parameters must be the same for all urls. If they differ, the URL in the array will define it.
// Examples:
// [
// redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&max_retries=2,
// redis://user:password@localhost:6790?dial_timeout=3&read_timeout=6s&max_retries=2,
// redis://user:password@localhost:6791?dial_timeout=3&read_timeout=6s&max_retries=5,
// ]
// is equivalent to:
// &ClusterOptions{
// Addr: ["localhost:6789", "localhost:6790", "localhost:6791"]
// DialTimeout: 3 * time.Second, // no time unit = seconds
// ReadTimeout: 6 * time.Second,
// MaxRetries: 5, // last one in the array is used
// }
func ParseClusterURLs(redisURLs []string) (*ClusterOptions, error) {
o := &ClusterOptions{}
previousScheme := ""

// loop through all the URLs and retrieve the addresses as well as the
// cluster options
for _, redisURL := range redisURLs {
u, err := url.Parse(redisURL)
if err != nil {
return nil, err
}

h, p, err := net.SplitHostPort(u.Host)
if err != nil {
h = u.Host
}
if h == "" {
h = "localhost"
}
if p == "" {
p = "6379"
}
o.Addrs = append(o.Addrs, net.JoinHostPort(h, p))

// all URLS must use the same scheme
if previousScheme != "" && u.Scheme != previousScheme {
return nil, fmt.Errorf("redis: mismatch schemes: %s and %s", previousScheme, u.Scheme)
}
previousScheme = u.Scheme

// setup username, password, and other configurations
o, err = setupClusterConn(u, h, o)
if err != nil {
return nil, err
}
}
return o, nil
}

// setupClusterConn gets the username and password from the URL and the query parameters
func setupClusterConn(u *url.URL, host string, o *ClusterOptions) (*ClusterOptions, error) {
// retrieve the configuration from the query parameters
o, err := setupClusterQueryParams(u, o)
if err != nil {
return nil, err
}

switch u.Scheme {
case "rediss":
o.TLSConfig = &tls.Config{ServerName: host}
fallthrough
case "redis":
// get the username & password - they must be consistent across urls
u, p := getUserPassword(u)

if o.Username != "" && o.Username != u {
return nil, fmt.Errorf("redis: mismatch usernames: %s and %s", o.Username, u)
}
if o.Password != "" && o.Password != p {
return nil, fmt.Errorf("redis: mismatch passwords")
}

o.Username, o.Password = u, p

return o, nil
default:
return nil, fmt.Errorf("redis: invalid URL scheme: %s", u.Scheme)
}
}

// setupClusterQueryParams converts query parameters in u to option value in o.
func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, error) {
q := queryOptions{q: u.Query()}

o.MaxRedirects = q.int("max_redirects")
o.ReadOnly = q.bool("read_only")
o.RouteByLatency = q.bool("route_by_latency")
o.RouteByLatency = q.bool("route_randomly")
o.MaxRetries = q.int("max_retries")
o.MinRetryBackoff = q.duration("min_retry_backoff")
o.MaxRetryBackoff = q.duration("max_retry_backoff")
o.DialTimeout = q.duration("dial_timeout")
o.ReadTimeout = q.duration("read_timeout")
o.WriteTimeout = q.duration("write_timeout")
o.PoolFIFO = q.bool("pool_fifo")
o.PoolSize = q.int("pool_size")
o.MinIdleConns = q.int("min_idle_conns")
o.MaxConnAge = q.duration("max_conn_age")
o.PoolTimeout = q.duration("pool_timeout")
o.IdleTimeout = q.duration("idle_timeout")
o.IdleCheckFrequency = q.duration("idle_check_frequency")

if q.err != nil {
return nil, q.err
}

// any parameters left?
if r := q.remaining(); len(r) > 0 {
return nil, fmt.Errorf("redis: unexpected option: %s", strings.Join(r, ", "))
}

return o, nil
}

func (opt *ClusterOptions) clientOptions() *Options {
const disableIdleCheck = -1

Expand Down
198 changes: 198 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package redis_test

import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
"reflect"
"strconv"
"strings"
"sync"
"testing"
"time"

. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -1281,3 +1285,197 @@ var _ = Describe("ClusterClient timeout", func() {
testTimeout()
})
})

func TestParseClusterURLs(t *testing.T) {
cases := []struct {
test string
urls []string
o *redis.ClusterOptions // expected value
err error
}{
{
test: "ParseRedisURL",
urls: []string{"redis://localhost:123"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}},
}, {
test: "ParseRedissURL",
urls: []string{"rediss://localhost:123"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, TLSConfig: &tls.Config{ /* no deep comparison */ }},
}, {
test: "MissingRedisPort",
urls: []string{"redis://localhost"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:6379"}},
}, {
test: "MissingRedissPort",
urls: []string{"rediss://localhost"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:6379"}, TLSConfig: &tls.Config{ /* no deep comparison */ }},
}, {
test: "MultipleRedisURLs",
urls: []string{"redis://localhost:123", "redis://localhost:1234"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}},
}, {
test: "MultipleRedissURLs",
urls: []string{"rediss://localhost:123", "rediss://localhost:1234"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, TLSConfig: &tls.Config{ /* no deep comparison */ }},
}, {
test: "OnlyPassword",
urls: []string{"redis://:bar@localhost:123"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Password: "bar"},
}, {
test: "OnlyUser",
urls: []string{"redis://foo@localhost:123"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Username: "foo"},
}, {
test: "RedisUsernamePassword",
urls: []string{"redis://foo:bar@localhost:123"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Username: "foo", Password: "bar"},
}, {
test: "RedissUsernamePassword",
urls: []string{"rediss://foo:bar@localhost:123", "rediss://foo:bar@localhost:1234"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, Username: "foo", Password: "bar", TLSConfig: &tls.Config{ /* no deep comparison */ }},
}, {
test: "QueryParameters",
urls: []string{"redis://localhost:123?read_timeout=2&pool_fifo=true"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ReadTimeout: 2 * time.Second, PoolFIFO: true},
}, {
test: "UseFinalQueryParameters",
urls: []string{"redis://localhost:123?read_timeout=2&pool_fifo=true", "redis://localhost:1234?read_timeout=3&pool_fifo=true"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, ReadTimeout: 3 * time.Second, PoolFIFO: true},
}, {
test: "DisabledTimeout",
urls: []string{"redis://localhost:123?idle_timeout=0"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, IdleTimeout: -1},
}, {
test: "DisabledTimeoutNeg",
urls: []string{"redis://localhost:123?idle_timeout=-1"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, IdleTimeout: -1},
}, {
test: "UseDefault",
urls: []string{"redis://localhost:123?idle_timeout="},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, IdleTimeout: 0},
}, {
test: "UseDefaultMissing=",
urls: []string{"redis://localhost:123?idle_timeout"},
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, IdleTimeout: 0},
}, {
test: "RedisPasswordMismatch",
urls: []string{"redis://foo:bar@localhost:123", "redis://foo:barr@localhost:1234"},
err: errors.New(`redis: mismatch passwords`),
}, {
test: "RedisUsernameMismatch",
urls: []string{"redis://fooo:bar@localhost:123", "redis://foo:bar@localhost:1234"},
err: errors.New(`redis: mismatch usernames: fooo and foo`),
}, {
test: "RedissPasswordMismatch",
urls: []string{"rediss://foo:bar@localhost:123", "rediss://foo:barr@localhost:1234"},
err: errors.New(`redis: mismatch passwords`),
}, {
test: "RedissUsernameMismatch",
urls: []string{"rediss://foo:bar@localhost:123", "rediss://fooo:bar@localhost:1234"},
err: errors.New(`redis: mismatch usernames: foo and fooo`),
}, {
test: "SchemeMismatch",
urls: []string{"rediss://foo:bar@localhost:123", "redis://foo:bar@localhost:1234"},
err: errors.New(`redis: mismatch schemes: rediss and redis`),
}, {
test: "SchemeMismatch",
urls: []string{"redis://foo:bar@localhost:123", "localhost:1234"},
err: errors.New(`redis: mismatch schemes: redis and localhost`),
}, {
test: "InvalidInt",
urls: []string{"redis://localhost?pool_size=five"},
err: errors.New(`redis: invalid pool_size number: strconv.Atoi: parsing "five": invalid syntax`),
}, {
test: "InvalidBool",
urls: []string{"redis://localhost?pool_fifo=yes"},
err: errors.New(`redis: invalid pool_fifo boolean: expected true/false/1/0 or an empty string, got "yes"`),
}, {
test: "UnknownParam",
urls: []string{"redis://localhost?abc=123"},
err: errors.New("redis: unexpected option: abc"),
}, {
test: "InvalidScheme",
urls: []string{"https://google.com"},
err: errors.New("redis: invalid URL scheme: https"),
},
}

for i := range cases {
tc := cases[i]
t.Run(tc.test, func(t *testing.T) {
t.Parallel()

actual, err := redis.ParseClusterURLs(tc.urls)
if tc.err == nil && err != nil {
t.Fatalf("unexpected error: %q", err)
return
}
if tc.err != nil && err != nil {
if tc.err.Error() != err.Error() {
t.Fatalf("got %q, expected %q", err, tc.err)
}
return
}
comprareOptions(t, actual, tc.o)
})
}
}

func comprareOptions(t *testing.T, actual, expected *redis.ClusterOptions) {
t.Helper()

if !reflect.DeepEqual(actual.Addrs, expected.Addrs) {
t.Errorf("got %q, want %q", actual.Addrs, expected.Addrs)
}
if actual.TLSConfig == nil && expected.TLSConfig != nil {
t.Errorf("got nil TLSConfig, expected a TLSConfig")
}
if actual.TLSConfig != nil && expected.TLSConfig == nil {
t.Errorf("got TLSConfig, expected no TLSConfig")
}
if actual.Username != expected.Username {
t.Errorf("Username: got %q, expected %q", actual.Username, expected.Username)
}
if actual.Password != expected.Password {
t.Errorf("Password: got %q, expected %q", actual.Password, expected.Password)
}
if actual.MaxRetries != expected.MaxRetries {
t.Errorf("MaxRetries: got %v, expected %v", actual.MaxRetries, expected.MaxRetries)
}
if actual.MinRetryBackoff != expected.MinRetryBackoff {
t.Errorf("MinRetryBackoff: got %v, expected %v", actual.MinRetryBackoff, expected.MinRetryBackoff)
}
if actual.MaxRetryBackoff != expected.MaxRetryBackoff {
t.Errorf("MaxRetryBackoff: got %v, expected %v", actual.MaxRetryBackoff, expected.MaxRetryBackoff)
}
if actual.DialTimeout != expected.DialTimeout {
t.Errorf("DialTimeout: got %v, expected %v", actual.DialTimeout, expected.DialTimeout)
}
if actual.ReadTimeout != expected.ReadTimeout {
t.Errorf("ReadTimeout: got %v, expected %v", actual.ReadTimeout, expected.ReadTimeout)
}
if actual.WriteTimeout != expected.WriteTimeout {
t.Errorf("WriteTimeout: got %v, expected %v", actual.WriteTimeout, expected.WriteTimeout)
}
if actual.PoolFIFO != expected.PoolFIFO {
t.Errorf("PoolFIFO: got %v, expected %v", actual.PoolFIFO, expected.PoolFIFO)
}
if actual.PoolSize != expected.PoolSize {
t.Errorf("PoolSize: got %v, expected %v", actual.PoolSize, expected.PoolSize)
}
if actual.MinIdleConns != expected.MinIdleConns {
t.Errorf("MinIdleConns: got %v, expected %v", actual.MinIdleConns, expected.MinIdleConns)
}
if actual.MaxConnAge != expected.MaxConnAge {
t.Errorf("MaxConnAge: got %v, expected %v", actual.MaxConnAge, expected.MaxConnAge)
}
if actual.PoolTimeout != expected.PoolTimeout {
t.Errorf("PoolTimeout: got %v, expected %v", actual.PoolTimeout, expected.PoolTimeout)
}
if actual.IdleTimeout != expected.IdleTimeout {
t.Errorf("IdleTimeout: got %v, expected %v", actual.IdleTimeout, expected.IdleTimeout)
}
if actual.IdleCheckFrequency != expected.IdleCheckFrequency {
t.Errorf("IdleCheckFrequency: got %v, expected %v", actual.IdleCheckFrequency, expected.IdleCheckFrequency)
}
}