Skip to content

Commit 7e5898e

Browse files
authored
xds: unify xDS client creation APIs meant for testing (#7268)
1 parent 5d7bd7a commit 7e5898e

30 files changed

+646
-492
lines changed

internal/internal.go

+3-10
Original file line numberDiff line numberDiff line change
@@ -193,16 +193,9 @@ var (
193193

194194
ChannelzTurnOffForTesting func()
195195

196-
// TriggerXDSResourceNameNotFoundForTesting triggers the resource-not-found
197-
// error for a given resource type and name. This is usually triggered when
198-
// the associated watch timer fires. For testing purposes, having this
199-
// function makes events more predictable than relying on timer events.
200-
TriggerXDSResourceNameNotFoundForTesting any // func(func(xdsresource.Type, string), string, string) error
201-
202-
// TriggerXDSResourceNameNotFoundClient invokes the testing xDS Client
203-
// singleton to invoke resource not found for a resource type name and
204-
// resource name.
205-
TriggerXDSResourceNameNotFoundClient any // func(string, string) error
196+
// TriggerXDSResourceNotFoundForTesting causes the provided xDS Client to
197+
// invoke resource-not-found error for the given resource type and name.
198+
TriggerXDSResourceNotFoundForTesting any // func(xdsclient.XDSClient, xdsresource.Type, string) error
206199

207200
// FromOutgoingContextRaw returns the un-merged, intermediary contents of
208201
// metadata.rawMD.

internal/testutils/xds/bootstrap/bootstrap.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,17 @@ func Contents(opts Options) ([]byte, error) {
120120
// resources with empty authority.
121121
auths := map[string]authority{"": {}}
122122
for n, auURI := range opts.Authorities {
123-
auths[n] = authority{XdsServers: []server{{
124-
ServerURI: auURI,
125-
ChannelCreds: []creds{{Type: "insecure"}},
126-
ServerFeatures: cfg.XdsServers[0].ServerFeatures,
127-
}}}
123+
// If the authority server URI is empty, set it to an empty authority
124+
// config, resulting in it using the top-level xds server config.
125+
a := authority{}
126+
if auURI != "" {
127+
a = authority{XdsServers: []server{{
128+
ServerURI: auURI,
129+
ChannelCreds: []creds{{Type: "insecure"}},
130+
ServerFeatures: cfg.XdsServers[0].ServerFeatures,
131+
}}}
132+
}
133+
auths[n] = a
128134
}
129135
cfg.Authorities = auths
130136

internal/xds/bootstrap/bootstrap.go

+3
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,9 @@ func (a *Authority) UnmarshalJSON(data []byte) error {
292292
// Config provides the xDS client with several key bits of information that it
293293
// requires in its interaction with the management server. The Config is
294294
// initialized from the bootstrap file.
295+
//
296+
// Users must use one of the NewConfigXxx() functions to create a Config
297+
// instance, and not initialize it manually.
295298
type Config struct {
296299
// XDSServer is the management server to connect to.
297300
//

test/xds/xds_server_test.go

-199
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"google.golang.org/grpc/codes"
3131
"google.golang.org/grpc/connectivity"
3232
"google.golang.org/grpc/credentials/insecure"
33-
"google.golang.org/grpc/internal"
3433
"google.golang.org/grpc/internal/grpcsync"
3534
"google.golang.org/grpc/internal/testutils"
3635
"google.golang.org/grpc/internal/testutils/xds/e2e"
@@ -224,204 +223,6 @@ func (s) TestRDSNack(t *testing.T) {
224223
waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
225224
}
226225

227-
// TestResourceNotFoundRDS tests the case where an LDS points to an RDS which
228-
// returns resource not found. Before getting the resource not found, the xDS
229-
// Server has not received all configuration needed, so it should Accept and
230-
// Close any new connections. After it has received the resource not found
231-
// error, the server should move to serving, successfully Accept Connections,
232-
// and fail at the L7 level with resource not found specified.
233-
func (s) TestResourceNotFoundRDS(t *testing.T) {
234-
managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
235-
defer cleanup()
236-
lis, err := testutils.LocalTCPListener()
237-
if err != nil {
238-
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
239-
}
240-
// Setup the management server to respond with a listener resource that
241-
// specifies a route name to watch, and no RDS resource corresponding to
242-
// this route name.
243-
host, port, err := hostPortFromListener(lis)
244-
if err != nil {
245-
t.Fatalf("failed to retrieve host and port of server: %v", err)
246-
}
247-
248-
listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
249-
resources := e2e.UpdateOptions{
250-
NodeID: nodeID,
251-
Listeners: []*v3listenerpb.Listener{listener},
252-
SkipValidation: true,
253-
}
254-
255-
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
256-
defer cancel()
257-
if err := managementServer.Update(ctx, resources); err != nil {
258-
t.Fatal(err)
259-
}
260-
serving := grpcsync.NewEvent()
261-
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
262-
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
263-
if args.Mode == connectivity.ServingModeServing {
264-
serving.Fire()
265-
}
266-
})
267-
268-
server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
269-
if err != nil {
270-
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
271-
}
272-
defer server.Stop()
273-
testgrpc.RegisterTestServiceServer(server, &testService{})
274-
go func() {
275-
if err := server.Serve(lis); err != nil {
276-
t.Errorf("Serve() failed: %v", err)
277-
}
278-
}()
279-
280-
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
281-
if err != nil {
282-
t.Fatalf("failed to dial local test server: %v", err)
283-
}
284-
defer cc.Close()
285-
286-
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
287-
288-
// Invoke resource not found - this should result in L7 RPC error with
289-
// unavailable receive on serving as a result, should trigger it to go
290-
// serving. Poll as watch might not be started yet to trigger resource not
291-
// found.
292-
loop:
293-
for {
294-
if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("RouteConfigResource", "routeName"); err != nil {
295-
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
296-
}
297-
select {
298-
case <-serving.Done():
299-
break loop
300-
case <-ctx.Done():
301-
t.Fatalf("timed out waiting for serving mode to go serving")
302-
case <-time.After(time.Millisecond):
303-
}
304-
}
305-
waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
306-
}
307-
308-
// TestServingModeChanges tests the Server's logic as it transitions from Not
309-
// Ready to Ready, then to Not Ready. Before it goes Ready, connections should
310-
// be accepted and closed. After it goes ready, RPC's should proceed as normal
311-
// according to matched route configuration. After it transitions back into not
312-
// ready (through an explicit LDS Resource Not Found), previously running RPC's
313-
// should be gracefully closed and still work, and new RPC's should fail.
314-
func (s) TestServingModeChanges(t *testing.T) {
315-
managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
316-
defer cleanup()
317-
lis, err := testutils.LocalTCPListener()
318-
if err != nil {
319-
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
320-
}
321-
// Setup the management server to respond with a listener resource that
322-
// specifies a route name to watch. Due to not having received the full
323-
// configuration, this should cause the server to be in mode Serving.
324-
host, port, err := hostPortFromListener(lis)
325-
if err != nil {
326-
t.Fatalf("failed to retrieve host and port of server: %v", err)
327-
}
328-
329-
listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
330-
resources := e2e.UpdateOptions{
331-
NodeID: nodeID,
332-
Listeners: []*v3listenerpb.Listener{listener},
333-
SkipValidation: true,
334-
}
335-
336-
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
337-
defer cancel()
338-
if err := managementServer.Update(ctx, resources); err != nil {
339-
t.Fatal(err)
340-
}
341-
342-
serving := grpcsync.NewEvent()
343-
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
344-
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
345-
if args.Mode == connectivity.ServingModeServing {
346-
serving.Fire()
347-
}
348-
})
349-
350-
server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
351-
if err != nil {
352-
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
353-
}
354-
defer server.Stop()
355-
testgrpc.RegisterTestServiceServer(server, &testService{})
356-
go func() {
357-
if err := server.Serve(lis); err != nil {
358-
t.Errorf("Serve() failed: %v", err)
359-
}
360-
}()
361-
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
362-
if err != nil {
363-
t.Fatalf("failed to dial local test server: %v", err)
364-
}
365-
defer cc.Close()
366-
367-
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
368-
routeConfig := e2e.RouteConfigNonForwardingAction("routeName")
369-
resources = e2e.UpdateOptions{
370-
NodeID: nodeID,
371-
Listeners: []*v3listenerpb.Listener{listener},
372-
Routes: []*v3routepb.RouteConfiguration{routeConfig},
373-
}
374-
defer cancel()
375-
if err := managementServer.Update(ctx, resources); err != nil {
376-
t.Fatal(err)
377-
}
378-
379-
select {
380-
case <-ctx.Done():
381-
t.Fatal("timeout waiting for the xDS Server to go Serving")
382-
case <-serving.Done():
383-
}
384-
385-
// A unary RPC should work once it transitions into serving. (need this same
386-
// assertion from LDS resource not found triggering it).
387-
waitForSuccessfulRPC(ctx, t, cc)
388-
389-
// Start a stream before switching the server to not serving. Due to the
390-
// stream being created before the graceful stop of the underlying
391-
// connection, it should be able to continue even after the server switches
392-
// to not serving.
393-
c := testgrpc.NewTestServiceClient(cc)
394-
stream, err := c.FullDuplexCall(ctx)
395-
if err != nil {
396-
t.Fatalf("cc.FullDuplexCall failed: %f", err)
397-
}
398-
399-
// Invoke the lds resource not found - this should cause the server to
400-
// switch to not serving. This should gracefully drain connections, and fail
401-
// RPC's after. (how to assert accepted + closed) does this make it's way to
402-
// application layer? (should work outside of resource not found...
403-
404-
// Invoke LDS Resource not found here (tests graceful close)
405-
if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("ListenerResource", listener.GetName()); err != nil {
406-
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
407-
}
408-
409-
// New RPCs on that connection should eventually start failing. Due to
410-
// Graceful Stop any started streams continue to work.
411-
if err = stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
412-
t.Fatalf("stream.Send() failed: %v, should continue to work due to graceful stop", err)
413-
}
414-
if err = stream.CloseSend(); err != nil {
415-
t.Fatalf("stream.CloseSend() failed: %v, should continue to work due to graceful stop", err)
416-
}
417-
if _, err = stream.Recv(); err != io.EOF {
418-
t.Fatalf("unexpected error: %v, expected an EOF error", err)
419-
}
420-
421-
// New RPCs on that connection should eventually start failing.
422-
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
423-
}
424-
425226
// TestMultipleUpdatesImmediatelySwitch tests the case where you get an LDS
426227
// specifying RDS A, B, and C (with A being matched to). The Server should be in
427228
// not serving until it receives all 3 RDS Configurations, and then transition

xds/googledirectpath/googlec2p_test.go

+1-7
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"time"
2727

2828
"github.com/google/go-cmp/cmp"
29-
"github.com/google/go-cmp/cmp/cmpopts"
3029
"google.golang.org/grpc"
3130
"google.golang.org/grpc/credentials/insecure"
3231
"google.golang.org/grpc/internal/envconfig"
@@ -232,14 +231,9 @@ func TestBuildXDS(t *testing.T) {
232231
if tt.tdURI != "" {
233232
wantConfig.XDSServer.ServerURI = tt.tdURI
234233
}
235-
cmpOpts := cmp.Options{
236-
cmpopts.IgnoreFields(bootstrap.ServerConfig{}, "Creds"),
237-
cmp.AllowUnexported(bootstrap.ServerConfig{}),
238-
protocmp.Transform(),
239-
}
240234
select {
241235
case gotConfig := <-configCh:
242-
if diff := cmp.Diff(wantConfig, gotConfig, cmpOpts); diff != "" {
236+
if diff := cmp.Diff(wantConfig, gotConfig, protocmp.Transform()); diff != "" {
243237
t.Fatalf("Unexpected diff in bootstrap config (-want +got):\n%s", diff)
244238
}
245239
case <-time.After(time.Second):

xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func registerWrappedCDSPolicyWithNewSubConnOverride(t *testing.T, ch chan *xdscr
137137
func setupForSecurityTests(t *testing.T, bootstrapContents []byte, clientCreds, serverCreds credentials.TransportCredentials) (*grpc.ClientConn, string) {
138138
t.Helper()
139139

140-
xdsClient, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
140+
xdsClient, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents})
141141
if err != nil {
142142
t.Fatalf("Failed to create xDS client: %v", err)
143143
}

xds/internal/balancer/cdsbalancer/cdsbalancer_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *gr
228228
})
229229
t.Cleanup(cleanup)
230230

231-
xdsC, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
231+
xdsC, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents})
232232
if err != nil {
233233
t.Fatalf("Failed to create xDS client: %v", err)
234234
}
@@ -344,7 +344,7 @@ func (s) TestConfigurationUpdate_EmptyCluster(t *testing.T) {
344344
// Setup a management server and an xDS client to talk to it.
345345
_, _, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
346346
t.Cleanup(cleanup)
347-
xdsClient, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
347+
xdsClient, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents})
348348
if err != nil {
349349
t.Fatalf("Failed to create xDS client: %v", err)
350350
}

xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,16 @@ import (
3535
"google.golang.org/grpc/internal/stubserver"
3636
"google.golang.org/grpc/internal/testutils/pickfirst"
3737
"google.golang.org/grpc/internal/testutils/xds/e2e"
38-
"google.golang.org/grpc/internal/xds/bootstrap"
3938
"google.golang.org/grpc/peer"
4039
"google.golang.org/grpc/resolver"
4140
"google.golang.org/grpc/resolver/manual"
4241
"google.golang.org/grpc/serviceconfig"
4342
"google.golang.org/grpc/status"
44-
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
4543
"google.golang.org/grpc/xds/internal/xdsclient"
4644
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
4745
"google.golang.org/protobuf/types/known/wrapperspb"
4846

4947
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
50-
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
5148
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
5249
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
5350
testgrpc "google.golang.org/grpc/interop/grpc_testing"
@@ -1107,12 +1104,13 @@ func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) {
11071104

11081105
// Create an xDS client talking to the above management server, configured
11091106
// with a short watch expiry timeout.
1110-
xdsClient, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
1111-
XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
1112-
NodeProto: &v3corepb.Node{Id: nodeID},
1113-
}, defaultTestWatchExpiryTimeout, time.Duration(0))
1107+
bc, err := e2e.DefaultBootstrapContents(nodeID, mgmtServer.Address)
11141108
if err != nil {
1115-
t.Fatalf("failed to create xds client: %v", err)
1109+
t.Fatalf("Failed to create bootstrap configuration: %v", err)
1110+
}
1111+
xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout})
1112+
if err != nil {
1113+
t.Fatalf("Failed to create an xDS client: %v", err)
11161114
}
11171115
defer close()
11181116

xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func setupAndDial(t *testing.T, bootstrapContents []byte) (*grpc.ClientConn, fun
7272
t.Helper()
7373

7474
// Create an xDS client for use by the cluster_resolver LB policy.
75-
xdsC, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
75+
xdsC, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents})
7676
if err != nil {
7777
t.Fatalf("Failed to create xDS client: %v", err)
7878
}

0 commit comments

Comments
 (0)