Skip to content

Commit 998c937

Browse files
[sotw][issue-540] Return full state when applicable for watches in linear cache
Signed-off-by: Valerian Roche <[email protected]>
1 parent 9273570 commit 998c937

File tree

4 files changed

+179
-14
lines changed

4 files changed

+179
-14
lines changed

pkg/cache/v3/linear.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
106106
versionMap: nil,
107107
version: 0,
108108
versionVector: make(map[string]uint64),
109+
log: log.NewDefaultLogger(),
109110
}
110111
for _, opt := range opts {
111112
opt(out)
@@ -117,11 +118,24 @@ func (cache *LinearCache) respond(watch ResponseWatch, staleResources []string)
117118
var resources []types.ResourceWithTTL
118119
// TODO: optimize the resources slice creations across different clients
119120
if len(staleResources) == 0 {
121+
// Wildcard case, we return all resources in the cache
120122
resources = make([]types.ResourceWithTTL, 0, len(cache.resources))
121123
for _, resource := range cache.resources {
122124
resources = append(resources, types.ResourceWithTTL{Resource: resource})
123125
}
126+
} else if ResourceRequiresFullStateInSotw(cache.typeURL) {
127+
// Non-wildcard request for a type requiring full state response
128+
// We need to return all requested resources, if existing, for this type
129+
requestedResources := watch.Request.GetResourceNames()
130+
resources = make([]types.ResourceWithTTL, 0, len(requestedResources))
131+
for _, resource := range requestedResources {
132+
resource := cache.resources[resource]
133+
if resource != nil {
134+
resources = append(resources, types.ResourceWithTTL{Resource: resource})
135+
}
136+
}
124137
} else {
138+
// Non-wildcard request for other types. Only return stale resources
125139
resources = make([]types.ResourceWithTTL, 0, len(staleResources))
126140
for _, name := range staleResources {
127141
resource := cache.resources[name]
@@ -327,8 +341,12 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
327341
case err != nil:
328342
stale = true
329343
staleResources = request.GetResourceNames()
344+
cache.log.Debugf("Watch is stale as version failed to parse %s", err.Error())
330345
case len(request.GetResourceNames()) == 0:
331-
stale = lastVersion != cache.version
346+
stale = (lastVersion != cache.version)
347+
if stale {
348+
cache.log.Debugf("Watch is stale as cache version %d differs for wildcard watch %d", cache.version, lastVersion)
349+
}
332350
default:
333351
for _, name := range request.GetResourceNames() {
334352
// When a resource is removed, its version defaults 0 and it is not considered stale.
@@ -337,20 +355,26 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
337355
staleResources = append(staleResources, name)
338356
}
339357
}
358+
if stale {
359+
cache.log.Debugf("Watch is stale with stale resources %v", staleResources)
360+
}
340361
}
341362
if stale {
342363
cache.respond(watch, staleResources)
343364
return nil
344365
}
345366
// Create open watches since versions are up to date.
346367
if len(request.GetResourceNames()) == 0 {
368+
cache.log.Infof("[linear cache] open watch for %s all resources, system version %q", cache.typeURL, cache.getVersion())
347369
cache.watchAll[watch] = struct{}{}
348370
return func() {
349371
cache.mu.Lock()
350372
defer cache.mu.Unlock()
351373
delete(cache.watchAll, watch)
352374
}
353375
}
376+
377+
cache.log.Infof("[linear cache] open watch for %s resources %v, system version %q", cache.typeURL, request.ResourceNames, cache.getVersion())
354378
for _, name := range request.GetResourceNames() {
355379
set, exists := cache.watches[name]
356380
if !exists {

pkg/cache/v3/linear_test.go

Lines changed: 104 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,12 @@ import (
2525
"github.com/stretchr/testify/require"
2626
"google.golang.org/protobuf/types/known/wrapperspb"
2727

28+
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
2829
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
30+
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
2931
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
32+
"github.com/envoyproxy/go-control-plane/pkg/log"
33+
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
3034
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
3135
)
3236

@@ -38,18 +42,18 @@ func testResource(s string) types.Resource {
3842
return wrapperspb.String(s)
3943
}
4044

41-
func verifyResponse(t *testing.T, ch <-chan Response, version string, num int) {
45+
func verifyResponseContent(t *testing.T, ch <-chan Response, expectedType string, expectedVersion string) (Response, *discovery.DiscoveryResponse) {
4246
t.Helper()
4347
var r Response
4448
select {
4549
case r = <-ch:
4650
case <-time.After(1 * time.Second):
4751
t.Error("failed to receive response after 1 second")
48-
return
52+
return nil, nil
4953
}
5054

51-
if r.GetRequest().GetTypeUrl() != testType {
52-
t.Errorf("unexpected empty request type URL: %q", r.GetRequest().GetTypeUrl())
55+
if r.GetRequest().GetTypeUrl() != expectedType {
56+
t.Errorf("unexpected request type URL: %q", r.GetRequest().GetTypeUrl())
5357
}
5458
if r.GetContext() == nil {
5559
t.Errorf("unexpected empty response context")
@@ -61,18 +65,41 @@ func verifyResponse(t *testing.T, ch <-chan Response, version string, num int) {
6165
if out.GetVersionInfo() == "" {
6266
t.Error("unexpected response empty version")
6367
}
64-
if n := len(out.GetResources()); n != num {
65-
t.Errorf("unexpected number of responses: got %d, want %d", n, num)
66-
}
67-
if version != "" && out.GetVersionInfo() != version {
68-
t.Errorf("unexpected version: got %q, want %q", out.GetVersionInfo(), version)
68+
if expectedVersion != "" && out.GetVersionInfo() != expectedVersion {
69+
t.Errorf("unexpected version: got %q, want %q", out.GetVersionInfo(), expectedVersion)
6970
}
70-
if out.GetTypeUrl() != testType {
71+
if out.GetTypeUrl() != expectedType {
7172
t.Errorf("unexpected type URL: %q", out.GetTypeUrl())
7273
}
7374
if len(r.GetRequest().GetResourceNames()) != 0 && len(r.GetRequest().GetResourceNames()) < len(out.Resources) {
7475
t.Errorf("received more resources (%d) than requested (%d)", len(r.GetRequest().GetResourceNames()), len(out.Resources))
7576
}
77+
return r, out
78+
}
79+
80+
func verifyResponse(t *testing.T, ch <-chan Response, expectedVersion string, expectedResourcesNb int) {
81+
t.Helper()
82+
_, r := verifyResponseContent(t, ch, testType, expectedVersion)
83+
if r == nil {
84+
return
85+
}
86+
if n := len(r.GetResources()); n != expectedResourcesNb {
87+
t.Errorf("unexpected number of responses: got %d, want %d", n, expectedResourcesNb)
88+
}
89+
}
90+
91+
func verifyResponseResources(t *testing.T, ch <-chan Response, expectedType string, expectedVersion string, expectedResources ...string) {
92+
t.Helper()
93+
r, _ := verifyResponseContent(t, ch, expectedType, expectedVersion)
94+
if r == nil {
95+
return
96+
}
97+
out := r.(*RawResponse)
98+
resourceNames := []string{}
99+
for _, res := range out.Resources {
100+
resourceNames = append(resourceNames, GetResourceName(res.Resource))
101+
}
102+
assert.ElementsMatch(t, resourceNames, expectedResources)
76103
}
77104

78105
type resourceInfo struct {
@@ -172,6 +199,7 @@ func checkVersionMapSet(t *testing.T, c *LinearCache) {
172199
}
173200

174201
func mustBlock(t *testing.T, w <-chan Response) {
202+
t.Helper()
175203
select {
176204
case <-w:
177205
t.Error("watch must block")
@@ -180,6 +208,7 @@ func mustBlock(t *testing.T, w <-chan Response) {
180208
}
181209

182210
func mustBlockDelta(t *testing.T, w <-chan DeltaResponse) {
211+
t.Helper()
183212
select {
184213
case <-w:
185214
t.Error("watch must block")
@@ -188,6 +217,7 @@ func mustBlockDelta(t *testing.T, w <-chan DeltaResponse) {
188217
}
189218

190219
func hashResource(t *testing.T, resource types.Resource) string {
220+
t.Helper()
191221
marshaledResource, err := MarshalResource(resource)
192222
if err != nil {
193223
t.Fatal(err)
@@ -815,7 +845,7 @@ func TestLinearSotwWatches(t *testing.T) {
815845
}}
816846
err = cache.UpdateResources(map[string]types.Resource{"a": a}, nil)
817847
require.NoError(t, err)
818-
verifyResponse(t, w, cache.getVersion(), 1)
848+
verifyResponseResources(t, w, testType, cache.getVersion(), "a")
819849
checkVersionMapNotSet(t, cache)
820850

821851
assert.Empty(t, cache.watches["a"])
@@ -839,7 +869,7 @@ func TestLinearSotwWatches(t *testing.T) {
839869
assert.Empty(t, cache.watches["c"])
840870

841871
require.NoError(t, err)
842-
verifyResponse(t, w, cache.getVersion(), 1)
872+
verifyResponseResources(t, w, testType, cache.getVersion(), "b")
843873
checkVersionMapNotSet(t, cache)
844874

845875
w = make(chan Response, 1)
@@ -853,11 +883,72 @@ func TestLinearSotwWatches(t *testing.T) {
853883
}}
854884
err = cache.UpdateResources(map[string]types.Resource{"c": c}, nil)
855885
require.NoError(t, err)
856-
verifyResponse(t, w, cache.getVersion(), 1)
886+
verifyResponseResources(t, w, testType, cache.getVersion(), "c")
857887
checkVersionMapNotSet(t, cache)
858888

859889
assert.Empty(t, cache.watches["a"])
860890
assert.Empty(t, cache.watches["b"])
861891
assert.Empty(t, cache.watches["c"])
862892
})
893+
894+
t.Run("watches return full state for types requesting it", func(t *testing.T) {
895+
a := &cluster.Cluster{Name: "a"}
896+
b := &cluster.Cluster{Name: "b"}
897+
c := &cluster.Cluster{Name: "c"}
898+
// ClusterType requires all resources to always be returned
899+
cache := NewLinearCache(resource.ClusterType, WithInitialResources(map[string]types.Resource{
900+
"a": a,
901+
"b": b,
902+
"c": c,
903+
}), WithLogger(log.NewTestLogger(t)))
904+
assert.Equal(t, 3, cache.NumResources())
905+
906+
// Non-wildcard request
907+
nonWildcardState := stream.NewStreamState(false, nil)
908+
w1 := make(chan Response, 1)
909+
_ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, nonWildcardState, w1)
910+
mustBlock(t, w1)
911+
checkVersionMapNotSet(t, cache)
912+
913+
// wildcard request
914+
wildcardState := stream.NewStreamState(true, nil)
915+
w2 := make(chan Response, 1)
916+
_ = cache.CreateWatch(&Request{ResourceNames: nil, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, wildcardState, w2)
917+
mustBlock(t, w2)
918+
checkVersionMapNotSet(t, cache)
919+
920+
// request not requesting b
921+
otherState := stream.NewStreamState(false, nil)
922+
w3 := make(chan Response, 1)
923+
_ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "c", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, otherState, w3)
924+
mustBlock(t, w3)
925+
checkVersionMapNotSet(t, cache)
926+
927+
b.AltStatName = "othername"
928+
err := cache.UpdateResources(map[string]types.Resource{"b": b}, nil)
929+
require.NoError(t, err)
930+
931+
// Other watch has not triggered
932+
mustBlock(t, w3)
933+
934+
verifyResponseResources(t, w1, resource.ClusterType, cache.getVersion(), "a", "b") // a is also returned as cluster requires full state
935+
verifyResponseResources(t, w2, resource.ClusterType, cache.getVersion(), "a", "b", "c") // a and c are also returned wildcard
936+
937+
// Recreate the watches
938+
w1 = make(chan Response, 1)
939+
_ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, nonWildcardState, w1)
940+
mustBlock(t, w1)
941+
w2 = make(chan Response, 1)
942+
_ = cache.CreateWatch(&Request{ResourceNames: nil, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, wildcardState, w2)
943+
mustBlock(t, w2)
944+
945+
// Update d, new resource in the cache
946+
d := &cluster.Cluster{Name: "d"}
947+
err = cache.UpdateResource("d", d)
948+
require.NoError(t, err)
949+
950+
verifyResponseResources(t, w1, resource.ClusterType, cache.getVersion(), "a", "b", "d")
951+
verifyResponseResources(t, w2, resource.ClusterType, cache.getVersion(), "a", "b", "c", "d")
952+
verifyResponseResources(t, w3, resource.ClusterType, cache.getVersion(), "a", "c", "d")
953+
})
863954
}

pkg/cache/v3/resource.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,23 @@ func GetResourceName(res types.Resource) string {
9797
}
9898
}
9999

100+
// ResourceRequiresFullStateInSotw indicates whether when building the reply in Sotw,
101+
// the response must include all existing resources or can return only the modified ones
102+
func ResourceRequiresFullStateInSotw(typeURL resource.Type) bool {
103+
// From https://www.envoyproxy.io/docs/envoy/v1.28.0/api-docs/xds_protocol#grouping-resources-into-responses,
104+
// when using sotw the control-plane MUST return all requested resources (or simply all if wildcard)
105+
// for some types. This is relied on by xds-grpc which is explicitly requesting clusters but expect
106+
// to receive all existing resources
107+
switch typeURL {
108+
case resource.ClusterType:
109+
return true
110+
case resource.ListenerType:
111+
return true
112+
default:
113+
return false
114+
}
115+
}
116+
100117
// GetResourceName returns the resource names for a list of valid xDS response types.
101118
func GetResourceNames(resources []types.Resource) []string {
102119
out := make([]string, len(resources))

pkg/log/test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package log
2+
3+
import "testing"
4+
5+
type testLogger struct {
6+
t testing.TB
7+
}
8+
9+
var _ Logger = testLogger{}
10+
11+
func NewTestLogger(t testing.TB) Logger {
12+
return testLogger{t}
13+
}
14+
15+
// Debugf logs a message at level debug on the test logger.
16+
func (l testLogger) Debugf(msg string, args ...interface{}) {
17+
l.t.Logf("[debug] "+msg, args...)
18+
}
19+
20+
// Infof logs a message at level info on the test logger.
21+
func (l testLogger) Infof(msg string, args ...interface{}) {
22+
l.t.Logf("[info] "+msg, args...)
23+
}
24+
25+
// Warnf logs a message at level warn on the test logger.
26+
func (l testLogger) Warnf(msg string, args ...interface{}) {
27+
l.t.Logf("[warn] "+msg, args...)
28+
}
29+
30+
// Errorf logs a message at level error on the test logger.
31+
func (l testLogger) Errorf(msg string, args ...interface{}) {
32+
l.t.Logf("[error] "+msg, args...)
33+
}

0 commit comments

Comments
 (0)