@@ -113,6 +113,7 @@ func TestDeltaRemoveResources(t *testing.T) {
113
113
watches := make (map [string ]chan cache.DeltaResponse )
114
114
streams := make (map [string ]* stream.StreamState )
115
115
116
+ // At this stage the cache is empty, so a watch is opened
116
117
for _ , typ := range testTypes {
117
118
watches [typ ] = make (chan cache.DeltaResponse , 1 )
118
119
state := stream .NewStreamState (true , make (map [string ]string ))
@@ -127,13 +128,17 @@ func TestDeltaRemoveResources(t *testing.T) {
127
128
}, streams [typ ], watches [typ ])
128
129
}
129
130
130
- require .NoError (t , c .SetSnapshot (context .Background (), key , fixture .snapshot ()))
131
+ snapshot := fixture .snapshot ()
132
+ snapshot .Resources [types .Endpoint ] = cache .NewResources (fixture .version , []types.Resource {
133
+ testEndpoint ,
134
+ resource .MakeEndpoint ("otherCluster" , 8080 ),
135
+ })
136
+ require .NoError (t , c .SetSnapshot (context .Background (), key , snapshot ))
131
137
132
138
for _ , typ := range testTypes {
133
139
t .Run (typ , func (t * testing.T ) {
134
140
select {
135
141
case out := <- watches [typ ]:
136
- snapshot := fixture .snapshot ()
137
142
assertResourceMapEqual (t , cache .IndexRawResourcesByName (out .(* cache.RawDeltaResponse ).Resources ), snapshot .GetResources (typ ))
138
143
nextVersionMap := out .GetNextVersionMap ()
139
144
streams [typ ].SetResourceVersions (nextVersionMap )
@@ -158,21 +163,21 @@ func TestDeltaRemoveResources(t *testing.T) {
158
163
159
164
assert .Equal (t , len (testTypes ), c .GetStatusInfo (key ).GetNumDeltaWatches (), "watches should be created for the latest version" )
160
165
161
- // set a partially versioned snapshot with no endpoints
166
+ // set a partially versioned snapshot with only one endpoint
162
167
snapshot2 := fixture .snapshot ()
163
- snapshot2 .Resources [types .Endpoint ] = cache .NewResources (fixture .version2 , []types.Resource {})
168
+ snapshot2 .Resources [types .Endpoint ] = cache .NewResources (fixture .version2 , []types.Resource {
169
+ testEndpoint , // this cluster is not changed, we do not expect it back in "resources"
170
+ })
164
171
require .NoError (t , c .SetSnapshot (context .Background (), key , snapshot2 ))
165
172
166
173
// validate response for endpoints
167
174
select {
168
175
case out := <- watches [testTypes [0 ]]:
169
- snapshot2 := fixture .snapshot ()
170
- snapshot2 .Resources [types .Endpoint ] = cache .NewResources (fixture .version2 , []types.Resource {})
171
- assertResourceMapEqual (t , cache .IndexRawResourcesByName (out .(* cache.RawDeltaResponse ).Resources ), snapshot2 .GetResources (rsrc .EndpointType ))
176
+ assert .Empty (t , out .(* cache.RawDeltaResponse ).Resources )
177
+ assert .Equal (t , []string {"otherCluster" }, out .(* cache.RawDeltaResponse ).RemovedResources )
172
178
nextVersionMap := out .GetNextVersionMap ()
173
-
174
179
// make sure the version maps are different since we no longer are tracking any endpoint resources
175
- require . Equal (t , nextVersionMap , streams [testTypes [0 ]].GetKnownResources (), "versionMap for the endpoint resource type did not change" )
180
+ assert . NotEqual (t , nextVersionMap , streams [testTypes [0 ]].GetKnownResources (), "versionMap for the endpoint resource type did not change" )
176
181
case <- time .After (time .Second ):
177
182
assert .Fail (t , "failed to receive snapshot response" )
178
183
}
0 commit comments