Skip to content

Add ParseURL function for cluster mode #1924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Oct 6, 2022
Merged
120 changes: 119 additions & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"fmt"
"math"
"net"
"net/url"
"runtime"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -131,6 +133,123 @@ func (opt *ClusterOptions) init() {
}
}

// ParseClusterURL parses a URL into ClusterOptions that can be used to connect to Redis.
// The URL must be in the form:
// redis://<user>:<password>@<host>:<port>
// or
// rediss://<user>:<password>@<host>:<port>
// To add additional addresses, specify the query parameter, "addr" one or more times. e.g:
// redis://<user>:<password>@<host>:<port>?addr=<host2>:<port2>&addr=<host3>:<port3>
// or
// rediss://<user>:<password>@<host>:<port>?addr=<host2>:<port2>&addr=<host3>:<port3>
//
// Most Option fields can be set using query parameters, with the following restrictions:
// - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
// - only scalar type fields are supported (bool, int, time.Duration)
// - for time.Duration fields, values must be a valid input for time.ParseDuration();
// additionally a plain integer as value (i.e. without unit) is intepreted as seconds
// - to disable a duration field, use value less than or equal to 0; to use the default
// value, leave the value blank or remove the parameter
// - only the last value is interpreted if a parameter is given multiple times
// - fields "network", "addr", "username" and "password" can only be set using other
// URL attributes (scheme, host, userinfo, resp.), query paremeters using these
// names will be treated as unknown parameters
// - unknown parameter names will result in an error
// Example:
// redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791
// is equivalent to:
// &ClusterOptions{
// Addr: ["localhost:6789", "localhost:6790", "localhost:6791"]
// DialTimeout: 3 * time.Second, // no time unit = seconds
// ReadTimeout: 6 * time.Second,
// }
func ParseClusterURL(redisURL string) (*ClusterOptions, error) {
o := &ClusterOptions{}

u, err := url.Parse(redisURL)
if err != nil {
return nil, err
}

// add base URL to the array of addresses
// more addresses may be added through the URL params
h, p := getHostPortWithDefaults(u)
o.Addrs = append(o.Addrs, net.JoinHostPort(h, p))

// setup username, password, and other configurations
o, err = setupClusterConn(u, h, o)
if err != nil {
return nil, err
}

return o, nil
}

// setupClusterConn gets the username and password from the URL and the query parameters.
func setupClusterConn(u *url.URL, host string, o *ClusterOptions) (*ClusterOptions, error) {
switch u.Scheme {
case "rediss":
o.TLSConfig = &tls.Config{ServerName: host}
fallthrough
case "redis":
o.Username, o.Password = getUserPassword(u)
default:
return nil, fmt.Errorf("redis: invalid URL scheme: %s", u.Scheme)
}

// retrieve the configuration from the query parameters
o, err := setupClusterQueryParams(u, o)
if err != nil {
return nil, err
}

return o, nil
}

// setupClusterQueryParams converts query parameters in u to option value in o.
func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, error) {
q := queryOptions{q: u.Query()}

o.MaxRedirects = q.int("max_redirects")
o.ReadOnly = q.bool("read_only")
o.RouteByLatency = q.bool("route_by_latency")
o.RouteByLatency = q.bool("route_randomly")
o.MaxRetries = q.int("max_retries")
o.MinRetryBackoff = q.duration("min_retry_backoff")
o.MaxRetryBackoff = q.duration("max_retry_backoff")
o.DialTimeout = q.duration("dial_timeout")
o.ReadTimeout = q.duration("read_timeout")
o.WriteTimeout = q.duration("write_timeout")
o.PoolFIFO = q.bool("pool_fifo")
o.PoolSize = q.int("pool_size")
o.MinIdleConns = q.int("min_idle_conns")
o.PoolTimeout = q.duration("pool_timeout")
o.ConnMaxLifetime = q.duration("conn_max_lifetime")
o.ConnMaxIdleTime = q.duration("conn_max_idle_time")

if q.err != nil {
return nil, q.err
}

// addr can be specified as many times as needed
addrs := q.strings("addr")
for _, addr := range addrs {
h, p, err := net.SplitHostPort(addr)
if err != nil || h == "" || p == "" {
return nil, fmt.Errorf("redis: unable to parse addr param: %s", addr)
}

o.Addrs = append(o.Addrs, net.JoinHostPort(h, p))
}

// any parameters left?
if r := q.remaining(); len(r) > 0 {
return nil, fmt.Errorf("redis: unexpected option: %s", strings.Join(r, ", "))
}

return o, nil
}

func (opt *ClusterOptions) clientOptions() *Options {
return &Options{
Dialer: opt.Dialer,
Expand Down Expand Up @@ -1537,7 +1656,6 @@ func (c *ClusterClient) SSubscribe(ctx context.Context, channels ...string) *Pub
return pubsub
}


func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
}
Expand Down
139 changes: 139 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ package redis_test

import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
"strconv"
"strings"
"sync"
"testing"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"

"github.com/go-redis/redis/v9"
"github.com/go-redis/redis/v9/internal/hashtag"
Expand Down Expand Up @@ -1296,3 +1300,138 @@ var _ = Describe("ClusterClient timeout", func() {
testTimeout()
})
})

func TestParseClusterURL(t *testing.T) {
cases := []struct {
test string
url string
o *redis.ClusterOptions // expected value
err error
}{
{
test: "ParseRedisURL",
url: "redis://localhost:123",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}},
}, {
test: "ParseRedissURL",
url: "rediss://localhost:123",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
}, {
test: "MissingRedisPort",
url: "redis://localhost",
o: &redis.ClusterOptions{Addrs: []string{"localhost:6379"}},
}, {
test: "MissingRedissPort",
url: "rediss://localhost",
o: &redis.ClusterOptions{Addrs: []string{"localhost:6379"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
}, {
test: "MultipleRedisURLs",
url: "redis://localhost:123?addr=localhost:1234&addr=localhost:12345",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234", "localhost:12345"}},
}, {
test: "MultipleRedissURLs",
url: "rediss://localhost:123?addr=localhost:1234&addr=localhost:12345",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234", "localhost:12345"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
}, {
test: "OnlyPassword",
url: "redis://:bar@localhost:123",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Password: "bar"},
}, {
test: "OnlyUser",
url: "redis://foo@localhost:123",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Username: "foo"},
}, {
test: "RedisUsernamePassword",
url: "redis://foo:bar@localhost:123",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Username: "foo", Password: "bar"},
}, {
test: "RedissUsernamePassword",
url: "rediss://foo:bar@localhost:123?addr=localhost:1234",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, Username: "foo", Password: "bar", TLSConfig: &tls.Config{ServerName: "localhost"}},
}, {
test: "QueryParameters",
url: "redis://localhost:123?read_timeout=2&pool_fifo=true&addr=localhost:1234",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, ReadTimeout: 2 * time.Second, PoolFIFO: true},
}, {
test: "DisabledTimeout",
url: "redis://localhost:123?conn_max_idle_time=0",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: -1},
}, {
test: "DisabledTimeoutNeg",
url: "redis://localhost:123?conn_max_idle_time=-1",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: -1},
}, {
test: "UseDefault",
url: "redis://localhost:123?conn_max_idle_time=",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
}, {
test: "UseDefaultMissing=",
url: "redis://localhost:123?conn_max_idle_time",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
}, {
test: "InvalidQueryAddr",
url: "rediss://foo:bar@localhost:123?addr=rediss://foo:barr@localhost:1234",
err: errors.New(`redis: unable to parse addr param: rediss://foo:barr@localhost:1234`),
}, {
test: "InvalidInt",
url: "redis://localhost?pool_size=five",
err: errors.New(`redis: invalid pool_size number: strconv.Atoi: parsing "five": invalid syntax`),
}, {
test: "InvalidBool",
url: "redis://localhost?pool_fifo=yes",
err: errors.New(`redis: invalid pool_fifo boolean: expected true/false/1/0 or an empty string, got "yes"`),
}, {
test: "UnknownParam",
url: "redis://localhost?abc=123",
err: errors.New("redis: unexpected option: abc"),
}, {
test: "InvalidScheme",
url: "https://google.com",
err: errors.New("redis: invalid URL scheme: https"),
},
}

for i := range cases {
tc := cases[i]
t.Run(tc.test, func(t *testing.T) {
t.Parallel()

actual, err := redis.ParseClusterURL(tc.url)
if tc.err == nil && err != nil {
t.Fatalf("unexpected error: %q", err)
return
}
if tc.err != nil && err == nil {
t.Fatalf("expected error: got %+v", actual)
return
}
if tc.err != nil && err != nil {
if tc.err.Error() != err.Error() {
t.Fatalf("got %q, expected %q", err, tc.err)
}
return
}
comprareOptions(t, actual, tc.o)
})
}
}

func comprareOptions(t *testing.T, actual, expected *redis.ClusterOptions) {
t.Helper()
assert.Equal(t, expected.Addrs, actual.Addrs)
assert.Equal(t, expected.TLSConfig, actual.TLSConfig)
assert.Equal(t, expected.Username, actual.Username)
assert.Equal(t, expected.Password, actual.Password)
assert.Equal(t, expected.MaxRetries, actual.MaxRetries)
assert.Equal(t, expected.MinRetryBackoff, actual.MinRetryBackoff)
assert.Equal(t, expected.MaxRetryBackoff, actual.MaxRetryBackoff)
assert.Equal(t, expected.DialTimeout, actual.DialTimeout)
assert.Equal(t, expected.ReadTimeout, actual.ReadTimeout)
assert.Equal(t, expected.WriteTimeout, actual.WriteTimeout)
assert.Equal(t, expected.PoolFIFO, actual.PoolFIFO)
assert.Equal(t, expected.PoolSize, actual.PoolSize)
assert.Equal(t, expected.MinIdleConns, actual.MinIdleConns)
assert.Equal(t, expected.ConnMaxLifetime, actual.ConnMaxLifetime)
assert.Equal(t, expected.ConnMaxIdleTime, actual.ConnMaxIdleTime)
assert.Equal(t, expected.PoolTimeout, actual.PoolTimeout)
}
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@ require (
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.20.2
github.com/stretchr/testify v1.5.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading