Skip to content

Commit 4fcd9c2

Browse files
authored
Merge pull request #2433 from murgatroid99/grpc-js-xds_watcher_validation_errors
grpc-js-xds: Fix handling of resource validation errors
2 parents 9271984 + edeeda6 commit 4fcd9c2

File tree

3 files changed

+185
-21
lines changed

3 files changed

+185
-21
lines changed

packages/grpc-js-xds/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@grpc/grpc-js-xds",
3-
"version": "1.8.1",
3+
"version": "1.8.2",
44
"description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.",
55
"main": "build/src/index.js",
66
"scripts": {

packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*
1616
*/
1717

18-
import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
18+
import { experimental, logVerbosity, Metadata, status, StatusObject } from "@grpc/grpc-js";
1919
import { Any__Output } from "../generated/google/protobuf/Any";
2020

2121
const TRACER_NAME = 'xds_client';
@@ -157,43 +157,47 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
157157
return Array.from(this.subscriptions.keys());
158158
}
159159
handleResponses(responses: ResourcePair<ResponseType>[]): HandleResponseResult {
160-
const validResponses: ResponseType[] = [];
161160
let result: HandleResponseResult = {
162161
accepted: [],
163162
rejected: [],
164163
missing: []
165164
}
165+
const allResourceNames = new Set<string>();
166166
for (const {resource, raw} of responses) {
167167
const resourceName = this.getResourceName(resource);
168+
allResourceNames.add(resourceName);
169+
const subscriptionEntry = this.subscriptions.get(resourceName);
168170
if (this.validateResponse(resource)) {
169-
validResponses.push(resource);
170171
result.accepted.push({
171172
name: resourceName,
172173
raw: raw});
174+
if (subscriptionEntry) {
175+
for (const watcher of subscriptionEntry.watchers) {
176+
watcher.onValidUpdate(resource);
177+
}
178+
clearTimeout(subscriptionEntry.resourceTimer);
179+
subscriptionEntry.cachedResponse = resource;
180+
if (subscriptionEntry.deletionIgnored) {
181+
experimental.log(logVerbosity.INFO, `Received resource with previously ignored deletion: ${resourceName}`);
182+
subscriptionEntry.deletionIgnored = false;
183+
}
184+
}
173185
} else {
174186
this.trace('Validation failed for message ' + JSON.stringify(resource));
175187
result.rejected.push({
176188
name: resourceName,
177189
raw: raw,
178190
error: `Validation failed for resource ${resourceName}`
179191
});
180-
}
181-
}
182-
const allResourceNames = new Set<string>();
183-
for (const resource of validResponses) {
184-
const resourceName = this.getResourceName(resource);
185-
allResourceNames.add(resourceName);
186-
const subscriptionEntry = this.subscriptions.get(resourceName);
187-
if (subscriptionEntry) {
188-
const watchers = subscriptionEntry.watchers;
189-
for (const watcher of watchers) {
190-
watcher.onValidUpdate(resource);
191-
}
192-
clearTimeout(subscriptionEntry.resourceTimer);
193-
subscriptionEntry.cachedResponse = resource;
194-
if (subscriptionEntry.deletionIgnored) {
195-
experimental.log(logVerbosity.INFO, 'Received resource with previously ignored deletion: ' + resourceName);
196-
subscriptionEntry.deletionIgnored = false;
192+
if (subscriptionEntry) {
193+
for (const watcher of subscriptionEntry.watchers) {
194+
watcher.onTransientError({
195+
code: status.UNAVAILABLE,
196+
details: `Validation failed for resource ${resourceName}`,
197+
metadata: new Metadata()
198+
});
199+
}
200+
clearTimeout(subscriptionEntry.resourceTimer);
197201
}
198202
}
199203
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Copyright 2023 gRPC authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
import * as assert from 'assert';
19+
import { register } from "../src";
20+
import { Backend } from "./backend";
21+
import { XdsTestClient } from "./client";
22+
import { FakeCluster, FakeRouteGroup } from "./framework";
23+
import { XdsServer } from "./xds-server";
24+
25+
register();
26+
27+
describe('Validation errors', () => {
28+
let xdsServer: XdsServer;
29+
let client: XdsTestClient;
30+
beforeEach(done => {
31+
xdsServer = new XdsServer();
32+
xdsServer.startServer(error => {
33+
done(error);
34+
});
35+
});
36+
afterEach(() => {
37+
client?.close();
38+
xdsServer?.shutdownServer();
39+
});
40+
it('Should continue to use a valid resource after receiving an invalid EDS update', done => {
41+
const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]);
42+
const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]);
43+
routeGroup.startAllBackends().then(() => {
44+
xdsServer.setEdsResource(cluster.getEndpointConfig());
45+
xdsServer.setCdsResource(cluster.getClusterConfig());
46+
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
47+
xdsServer.setLdsResource(routeGroup.getListener());
48+
client = new XdsTestClient('route1', xdsServer);
49+
client.startCalls(100);
50+
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
51+
// After backends receive calls, set invalid EDS resource
52+
const invalidEdsResource = {cluster_name: cluster.getEndpointConfig().cluster_name, endpoints: [{}]};
53+
xdsServer.setEdsResource(invalidEdsResource);
54+
let seenNack = false;
55+
xdsServer.addResponseListener((typeUrl, responseState) => {
56+
if (responseState.state === 'NACKED') {
57+
if (seenNack) {
58+
return;
59+
}
60+
seenNack = true;
61+
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
62+
client.stopCalls();
63+
done();
64+
});
65+
}
66+
});
67+
}, reason => done(reason));
68+
}, reason => done(reason));
69+
});
70+
it('Should continue to use a valid resource after receiving an invalid CDS update', done => {
71+
const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]);
72+
const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]);
73+
routeGroup.startAllBackends().then(() => {
74+
xdsServer.setEdsResource(cluster.getEndpointConfig());
75+
xdsServer.setCdsResource(cluster.getClusterConfig());
76+
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
77+
xdsServer.setLdsResource(routeGroup.getListener());
78+
client = new XdsTestClient('route1', xdsServer);
79+
client.startCalls(100);
80+
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
81+
// After backends receive calls, set invalid CDS resource
82+
const invalidCdsResource = {name: cluster.getClusterConfig().name};
83+
xdsServer.setCdsResource(invalidCdsResource);
84+
let seenNack = false;
85+
xdsServer.addResponseListener((typeUrl, responseState) => {
86+
if (responseState.state === 'NACKED') {
87+
if (seenNack) {
88+
return;
89+
}
90+
seenNack = true;
91+
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
92+
client.stopCalls();
93+
done();
94+
});
95+
}
96+
});
97+
}, reason => done(reason));
98+
}, reason => done(reason));
99+
});
100+
it('Should continue to use a valid resource after receiving an invalid RDS update', done => {
101+
const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]);
102+
const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]);
103+
routeGroup.startAllBackends().then(() => {
104+
xdsServer.setEdsResource(cluster.getEndpointConfig());
105+
xdsServer.setCdsResource(cluster.getClusterConfig());
106+
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
107+
xdsServer.setLdsResource(routeGroup.getListener());
108+
client = new XdsTestClient('route1', xdsServer);
109+
client.startCalls(100);
110+
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
111+
// After backends receive calls, set invalid RDS resource
112+
const invalidRdsResource = {name: routeGroup.getRouteConfiguration().name, virtual_hosts: [{domains: ['**']}]};
113+
xdsServer.setRdsResource(invalidRdsResource);
114+
let seenNack = false;
115+
xdsServer.addResponseListener((typeUrl, responseState) => {
116+
if (responseState.state === 'NACKED') {
117+
if (seenNack) {
118+
return;
119+
}
120+
seenNack = true;
121+
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
122+
client.stopCalls();
123+
done();
124+
});
125+
}
126+
});
127+
}, reason => done(reason));
128+
}, reason => done(reason));
129+
});
130+
it('Should continue to use a valid resource after receiving an invalid LDS update', done => {
131+
const cluster = new FakeCluster('cluster1', [{backends: [new Backend()], locality: {region: 'region1'}}]);
132+
const routeGroup = new FakeRouteGroup('route1', [{cluster: cluster}]);
133+
routeGroup.startAllBackends().then(() => {
134+
xdsServer.setEdsResource(cluster.getEndpointConfig());
135+
xdsServer.setCdsResource(cluster.getClusterConfig());
136+
xdsServer.setRdsResource(routeGroup.getRouteConfiguration());
137+
xdsServer.setLdsResource(routeGroup.getListener());
138+
client = new XdsTestClient('route1', xdsServer);
139+
client.startCalls(100);
140+
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
141+
// After backends receive calls, set invalid LDS resource
142+
const invalidLdsResource = {name: routeGroup.getListener().name};
143+
xdsServer.setLdsResource(invalidLdsResource);
144+
let seenNack = false;
145+
xdsServer.addResponseListener((typeUrl, responseState) => {
146+
if (responseState.state === 'NACKED') {
147+
if (seenNack) {
148+
return;
149+
}
150+
seenNack = true;
151+
routeGroup.waitForAllBackendsToReceiveTraffic().then(() => {
152+
client.stopCalls();
153+
done();
154+
});
155+
}
156+
});
157+
}, reason => done(reason));
158+
}, reason => done(reason));
159+
});
160+
});

0 commit comments

Comments
 (0)