Skip to content

Commit aea78bd

Browse files
authored
grpc: Add perTargetDialOption type and global list (#7234)
1 parent 2d2f417 commit aea78bd

File tree

4 files changed

+96
-37
lines changed

4 files changed

+96
-37
lines changed

clientconn.go

+26-32
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,16 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error)
152152
for _, opt := range opts {
153153
opt.apply(&cc.dopts)
154154
}
155+
156+
// Determine the resolver to use.
157+
if err := cc.initParsedTargetAndResolverBuilder(); err != nil {
158+
return nil, err
159+
}
160+
161+
for _, opt := range globalPerTargetDialOptions {
162+
opt.DialOptionForTarget(cc.parsedTarget.URL).apply(&cc.dopts)
163+
}
164+
155165
chainUnaryClientInterceptors(cc)
156166
chainStreamClientInterceptors(cc)
157167

@@ -168,25 +178,16 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error)
168178
}
169179
cc.mkp = cc.dopts.copts.KeepaliveParams
170180

171-
// Register ClientConn with channelz.
172-
cc.channelzRegistration(target)
173-
174-
// TODO: Ideally it should be impossible to error from this function after
175-
// channelz registration. This will require removing some channelz logs
176-
// from the following functions that can error. Errors can be returned to
177-
// the user, and successful logs can be emitted here, after the checks have
178-
// passed and channelz is subsequently registered.
179-
180-
// Determine the resolver to use.
181-
if err := cc.parseTargetAndFindResolver(); err != nil {
182-
channelz.RemoveEntry(cc.channelz.ID)
183-
return nil, err
184-
}
185-
if err = cc.determineAuthority(); err != nil {
186-
channelz.RemoveEntry(cc.channelz.ID)
181+
if err = cc.initAuthority(); err != nil {
187182
return nil, err
188183
}
189184

185+
// Register ClientConn with channelz. Note that this is only done after
186+
// channel creation cannot fail.
187+
cc.channelzRegistration(target)
188+
channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", cc.parsedTarget)
189+
channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
190+
190191
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
191192
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
192193

@@ -587,11 +588,11 @@ type ClientConn struct {
587588

588589
// The following are initialized at dial time, and are read-only after that.
589590
target string // User's dial target.
590-
parsedTarget resolver.Target // See parseTargetAndFindResolver().
591-
authority string // See determineAuthority().
591+
parsedTarget resolver.Target // See initParsedTargetAndResolverBuilder().
592+
authority string // See initAuthority().
592593
dopts dialOptions // Default and user specified dial options.
593594
channelz *channelz.Channel // Channelz object.
594-
resolverBuilder resolver.Builder // See parseTargetAndFindResolver().
595+
resolverBuilder resolver.Builder // See initParsedTargetAndResolverBuilder().
595596
idlenessMgr *idle.Manager
596597

597598
// The following provide their own synchronization, and therefore don't
@@ -1673,22 +1674,19 @@ func (cc *ClientConn) connectionError() error {
16731674
return cc.lastConnectionError
16741675
}
16751676

1676-
// parseTargetAndFindResolver parses the user's dial target and stores the
1677-
// parsed target in `cc.parsedTarget`.
1677+
// initParsedTargetAndResolverBuilder parses the user's dial target and stores
1678+
// the parsed target in `cc.parsedTarget`.
16781679
//
16791680
// The resolver to use is determined based on the scheme in the parsed target
16801681
// and the same is stored in `cc.resolverBuilder`.
16811682
//
16821683
// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
1683-
func (cc *ClientConn) parseTargetAndFindResolver() error {
1684-
channelz.Infof(logger, cc.channelz, "original dial target is: %q", cc.target)
1684+
func (cc *ClientConn) initParsedTargetAndResolverBuilder() error {
1685+
logger.Infof("original dial target is: %q", cc.target)
16851686

16861687
var rb resolver.Builder
16871688
parsedTarget, err := parseTarget(cc.target)
1688-
if err != nil {
1689-
channelz.Infof(logger, cc.channelz, "dial target %q parse failed: %v", cc.target, err)
1690-
} else {
1691-
channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", parsedTarget)
1689+
if err == nil {
16921690
rb = cc.getResolver(parsedTarget.URL.Scheme)
16931691
if rb != nil {
16941692
cc.parsedTarget = parsedTarget
@@ -1707,15 +1705,12 @@ func (cc *ClientConn) parseTargetAndFindResolver() error {
17071705
defScheme = resolver.GetDefaultScheme()
17081706
}
17091707

1710-
channelz.Infof(logger, cc.channelz, "fallback to scheme %q", defScheme)
17111708
canonicalTarget := defScheme + ":///" + cc.target
17121709

17131710
parsedTarget, err = parseTarget(canonicalTarget)
17141711
if err != nil {
1715-
channelz.Infof(logger, cc.channelz, "dial target %q parse failed: %v", canonicalTarget, err)
17161712
return err
17171713
}
1718-
channelz.Infof(logger, cc.channelz, "parsed dial target is: %+v", parsedTarget)
17191714
rb = cc.getResolver(parsedTarget.URL.Scheme)
17201715
if rb == nil {
17211716
return fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme)
@@ -1805,7 +1800,7 @@ func encodeAuthority(authority string) string {
18051800
// credentials do not match the authority configured through the dial option.
18061801
//
18071802
// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
1808-
func (cc *ClientConn) determineAuthority() error {
1803+
func (cc *ClientConn) initAuthority() error {
18091804
dopts := cc.dopts
18101805
// Historically, we had two options for users to specify the serverName or
18111806
// authority for a channel. One was through the transport credentials
@@ -1838,6 +1833,5 @@ func (cc *ClientConn) determineAuthority() error {
18381833
} else {
18391834
cc.authority = encodeAuthority(endpoint)
18401835
}
1841-
channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
18421836
return nil
18431837
}

default_dial_option_server_option_test.go

+32-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package grpc
2020

2121
import (
2222
"fmt"
23+
"net/url"
2324
"strings"
2425
"testing"
2526

@@ -40,6 +41,7 @@ func (s) TestAddGlobalDialOptions(t *testing.T) {
4041
// Set and check the DialOptions
4142
opts := []DialOption{WithTransportCredentials(insecure.NewCredentials()), WithTransportCredentials(insecure.NewCredentials()), WithTransportCredentials(insecure.NewCredentials())}
4243
internal.AddGlobalDialOptions.(func(opt ...DialOption))(opts...)
44+
defer internal.ClearGlobalDialOptions()
4345
for i, opt := range opts {
4446
if globalDialOptions[i] != opt {
4547
t.Fatalf("Unexpected global dial option at index %d: %v != %v", i, globalDialOptions[i], opt)
@@ -64,21 +66,50 @@ func (s) TestAddGlobalDialOptions(t *testing.T) {
6466
func (s) TestDisableGlobalOptions(t *testing.T) {
6567
// Set transport credentials as a global option.
6668
internal.AddGlobalDialOptions.(func(opt ...DialOption))(WithTransportCredentials(insecure.NewCredentials()))
69+
defer internal.ClearGlobalDialOptions()
6770
// Dial with the disable global options dial option. This dial should fail
6871
// due to the global dial options with credentials not being picked up due
6972
// to global options being disabled.
7073
noTSecStr := "no transport security set"
7174
if _, err := Dial("fake", internal.DisableGlobalDialOptions.(func() DialOption)()); !strings.Contains(fmt.Sprint(err), noTSecStr) {
7275
t.Fatalf("Dialing received unexpected error: %v, want error containing \"%v\"", err, noTSecStr)
7376
}
74-
internal.ClearGlobalDialOptions()
77+
}
78+
79+
type testPerTargetDialOption struct{}
80+
81+
func (do *testPerTargetDialOption) DialOptionForTarget(parsedTarget url.URL) DialOption {
82+
if parsedTarget.Scheme == "passthrough" {
83+
return WithTransportCredentials(insecure.NewCredentials()) // credentials provided, should pass NewClient.
84+
}
85+
return EmptyDialOption{} // no credentials, should fail NewClient
86+
}
87+
88+
// TestGlobalPerTargetDialOption configures a global per target dial option that
89+
// produces transport credentials for channels using "passthrough" scheme.
90+
// Channels that use the passthrough scheme should be successfully created due
91+
// to picking up transport credentials, whereas other channels should fail at
92+
// creation due to not having transport credentials.
93+
func (s) TestGlobalPerTargetDialOption(t *testing.T) {
94+
internal.AddGlobalPerTargetDialOptions.(func(opt any))(&testPerTargetDialOption{})
95+
defer internal.ClearGlobalPerTargetDialOptions()
96+
noTSecStr := "no transport security set"
97+
if _, err := NewClient("dns:///fake"); !strings.Contains(fmt.Sprint(err), noTSecStr) {
98+
t.Fatalf("Dialing received unexpected error: %v, want error containing \"%v\"", err, noTSecStr)
99+
}
100+
cc, err := NewClient("passthrough:///nice")
101+
if err != nil {
102+
t.Fatalf("Dialing with insecure credentials failed: %v", err)
103+
}
104+
cc.Close()
75105
}
76106

77107
func (s) TestAddGlobalServerOptions(t *testing.T) {
78108
const maxRecvSize = 998765
79109
// Set and check the ServerOptions
80110
opts := []ServerOption{Creds(insecure.NewCredentials()), MaxRecvMsgSize(maxRecvSize)}
81111
internal.AddGlobalServerOptions.(func(opt ...ServerOption))(opts...)
112+
defer internal.ClearGlobalServerOptions()
82113
for i, opt := range opts {
83114
if globalServerOptions[i] != opt {
84115
t.Fatalf("Unexpected global server option at index %d: %v != %v", i, globalServerOptions[i], opt)

dialoptions.go

+22
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package grpc
2121
import (
2222
"context"
2323
"net"
24+
"net/url"
2425
"time"
2526

2627
"google.golang.org/grpc/backoff"
@@ -43,6 +44,14 @@ func init() {
4344
internal.ClearGlobalDialOptions = func() {
4445
globalDialOptions = nil
4546
}
47+
internal.AddGlobalPerTargetDialOptions = func(opt any) {
48+
if ptdo, ok := opt.(perTargetDialOption); ok {
49+
globalPerTargetDialOptions = append(globalPerTargetDialOptions, ptdo)
50+
}
51+
}
52+
internal.ClearGlobalPerTargetDialOptions = func() {
53+
globalPerTargetDialOptions = nil
54+
}
4655
internal.WithBinaryLogger = withBinaryLogger
4756
internal.JoinDialOptions = newJoinDialOption
4857
internal.DisableGlobalDialOptions = newDisableGlobalDialOptions
@@ -89,6 +98,19 @@ type DialOption interface {
8998

9099
var globalDialOptions []DialOption
91100

101+
// perTargetDialOption takes a parsed target and returns a dial option to apply.
102+
//
103+
// This gets called after NewClient() parses the target, and allows per target
104+
// configuration set through a returned DialOption. The DialOption will not take
105+
// effect if specifies a resolver builder, as that Dial Option is factored in
106+
// while parsing target.
107+
type perTargetDialOption interface {
108+
// DialOption returns a Dial Option to apply.
109+
DialOptionForTarget(parsedTarget url.URL) DialOption
110+
}
111+
112+
var globalPerTargetDialOptions []perTargetDialOption
113+
92114
// EmptyDialOption does not alter the dial configuration. It can be embedded in
93115
// another structure to build custom dial options.
94116
//

internal/internal.go

+16-4
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,14 @@ var (
106106
// This is used in the 1.0 release of gcp/observability, and thus must not be
107107
// deleted or changed.
108108
ClearGlobalDialOptions func()
109+
110+
// AddGlobalPerTargetDialOptions adds a PerTargetDialOption that will be
111+
// configured for newly created ClientConns.
112+
AddGlobalPerTargetDialOptions any // func (opt any)
113+
// ClearGlobalPerTargetDialOptions clears the slice of global late apply
114+
// dial options.
115+
ClearGlobalPerTargetDialOptions func()
116+
109117
// JoinDialOptions combines the dial options passed as arguments into a
110118
// single dial option.
111119
JoinDialOptions any // func(...grpc.DialOption) grpc.DialOption
@@ -126,7 +134,8 @@ var (
126134
// deleted or changed.
127135
BinaryLogger any // func(binarylog.Logger) grpc.ServerOption
128136

129-
// SubscribeToConnectivityStateChanges adds a grpcsync.Subscriber to a provided grpc.ClientConn
137+
// SubscribeToConnectivityStateChanges adds a grpcsync.Subscriber to a
138+
// provided grpc.ClientConn.
130139
SubscribeToConnectivityStateChanges any // func(*grpc.ClientConn, grpcsync.Subscriber)
131140

132141
// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using
@@ -195,14 +204,17 @@ var (
195204
// resource name.
196205
TriggerXDSResourceNameNotFoundClient any // func(string, string) error
197206

198-
// FromOutgoingContextRaw returns the un-merged, intermediary contents of metadata.rawMD.
207+
// FromOutgoingContextRaw returns the un-merged, intermediary contents of
208+
// metadata.rawMD.
199209
FromOutgoingContextRaw any // func(context.Context) (metadata.MD, [][]string, bool)
200210

201-
// UserSetDefaultScheme is set to true if the user has overridden the default resolver scheme.
211+
// UserSetDefaultScheme is set to true if the user has overridden the
212+
// default resolver scheme.
202213
UserSetDefaultScheme bool = false
203214
)
204215

205-
// HealthChecker defines the signature of the client-side LB channel health checking function.
216+
// HealthChecker defines the signature of the client-side LB channel health
217+
// checking function.
206218
//
207219
// The implementation is expected to create a health checking RPC stream by
208220
// calling newStream(), watch for the health status of serviceName, and report

0 commit comments

Comments
 (0)