Skip to content

Commit de2a8bc

Browse files
author
OpenShift Bot
committed
Merge pull request #9204 from deads2k/fix-project-watch
Merged by openshift-bot
2 parents 00310df + f2d3dfd commit de2a8bc

File tree

4 files changed

+76
-90
lines changed

4 files changed

+76
-90
lines changed

pkg/project/auth/cache.go

+17-21
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ func (s *neverSkipSynchronizer) SkipSynchronize(prevState string, versionedObjec
105105

106106
// AuthorizationCache maintains a cache on the set of namespaces a user or group can access.
107107
type AuthorizationCache struct {
108+
// allKnownNamespaces we track all the known namespaces, so we can detect deletes.
109+
// TODO remove this in favor of a list/watch mechanism for projects
110+
allKnownNamespaces sets.String
108111
namespaceStore cache.Store
109112
namespaceInterface kclient.NamespaceInterface
110113
lastSyncResourceVersioner LastSyncResourceVersioner
@@ -132,6 +135,7 @@ type AuthorizationCache struct {
132135
// NewAuthorizationCache creates a new AuthorizationCache
133136
func NewAuthorizationCache(reviewer Reviewer, namespaceInterface kclient.NamespaceInterface, policyClient policyclient.ReadOnlyPolicyClient) *AuthorizationCache {
134137
result := &AuthorizationCache{
138+
allKnownNamespaces: sets.String{},
135139
namespaceStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
136140
namespaceInterface: namespaceInterface,
137141
lastSyncResourceVersioner: &unchangingLastSyncResourceVersioner{},
@@ -203,7 +207,7 @@ func (ac *AuthorizationCache) RemoveWatcher(watcher CacheWatcher) {
203207
}
204208

205209
// synchronizeNamespaces synchronizes access over each namespace and returns a set of namespace names that were looked at in last sync
206-
func (ac *AuthorizationCache) synchronizeNamespaces(userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) *sets.String {
210+
func (ac *AuthorizationCache) synchronizeNamespaces(userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) sets.String {
207211
namespaceSet := sets.NewString()
208212
items := ac.namespaceStore.List()
209213
for i := range items {
@@ -217,7 +221,7 @@ func (ac *AuthorizationCache) synchronizeNamespaces(userSubjectRecordStore cache
217221
utilruntime.HandleError(fmt.Errorf("error synchronizing: %v", err))
218222
}
219223
}
220-
return &namespaceSet
224+
return namespaceSet
221225
}
222226

223227
// synchronizePolicies synchronizes access over each policy
@@ -257,16 +261,20 @@ func (ac *AuthorizationCache) synchronizePolicyBindings(userSubjectRecordStore c
257261
}
258262

259263
// purgeDeletedNamespaces will remove all namespaces enumerated in a reviewRecordStore that are not in the namespace set
260-
func purgeDeletedNamespaces(namespaceSet *sets.String, userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) {
264+
func (ac *AuthorizationCache) purgeDeletedNamespaces(oldNamespaces, newNamespaces sets.String, userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) {
261265
reviewRecordItems := reviewRecordStore.List()
262266
for i := range reviewRecordItems {
263267
reviewRecord := reviewRecordItems[i].(*reviewRecord)
264-
if !namespaceSet.Has(reviewRecord.namespace) {
268+
if !newNamespaces.Has(reviewRecord.namespace) {
265269
deleteNamespaceFromSubjects(userSubjectRecordStore, reviewRecord.users, reviewRecord.namespace)
266270
deleteNamespaceFromSubjects(groupSubjectRecordStore, reviewRecord.groups, reviewRecord.namespace)
267271
reviewRecordStore.Delete(reviewRecord)
268272
}
269273
}
274+
275+
for namespace := range oldNamespaces.Difference(newNamespaces) {
276+
ac.notifyWatchers(namespace, nil, sets.String{}, sets.String{})
277+
}
270278
}
271279

272280
// invalidateCache returns true if there was a change in the cluster namespace that holds cluster policy and policy bindings
@@ -327,17 +335,18 @@ func (ac *AuthorizationCache) synchronize() {
327335
}
328336

329337
// iterate over caches and synchronize our three caches
330-
namespaceSet := ac.synchronizeNamespaces(userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)
338+
newKnownNamespaces := ac.synchronizeNamespaces(userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)
331339
ac.synchronizePolicies(userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)
332340
ac.synchronizePolicyBindings(userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)
333-
purgeDeletedNamespaces(namespaceSet, userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)
341+
ac.purgeDeletedNamespaces(ac.allKnownNamespaces, newKnownNamespaces, userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)
334342

335343
// if we did a full rebuild, now we swap the fully rebuilt cache
336344
if invalidateCache {
337345
ac.userSubjectRecordStore = userSubjectRecordStore
338346
ac.groupSubjectRecordStore = groupSubjectRecordStore
339347
ac.reviewRecordStore = reviewRecordStore
340348
}
349+
ac.allKnownNamespaces = newKnownNamespaces
341350

342351
// we were able to update our cache since this last observation period
343352
ac.lastState = currentState
@@ -486,24 +495,11 @@ func addSubjectsToNamespace(subjectRecordStore cache.Store, subjects []string, n
486495
}
487496
}
488497

489-
func (ac *AuthorizationCache) notifyWatchers(namespace string, exists *reviewRecord, latestUsers, latestGroups sets.String) {
490-
existingGroups := sets.String{}
491-
existingUsers := sets.String{}
492-
if exists != nil {
493-
existingGroups = sets.NewString(exists.groups...)
494-
existingUsers = sets.NewString(exists.users...)
495-
}
496-
497-
// calculate once to avoid fanning out.
498-
removedUsers := existingUsers.Difference(latestUsers)
499-
removedGroups := existingGroups.Difference(latestGroups)
500-
addedUsers := latestUsers.Difference(existingUsers)
501-
addedGroups := latestGroups.Difference(existingGroups)
502-
498+
func (ac *AuthorizationCache) notifyWatchers(namespace string, exists *reviewRecord, users, groups sets.String) {
503499
ac.watcherLock.Lock()
504500
defer ac.watcherLock.Unlock()
505501
for _, watcher := range ac.watchers {
506-
watcher.GroupMembershipChanged(namespace, latestUsers, latestGroups, removedUsers, removedGroups, addedUsers, addedGroups)
502+
watcher.GroupMembershipChanged(namespace, users, groups)
507503
}
508504
}
509505

pkg/project/auth/watch.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
type CacheWatcher interface {
2222
// GroupMembershipChanged is called serially for all changes for all watchers. This method MUST NOT BLOCK.
2323
// The serial nature makes reasoning about the code easy, but if you block in this method you will doom all watchers.
24-
GroupMembershipChanged(namespaceName string, latestUsers, latestGroups, removedUsers, removedGroups, addedUsers, addedGroups sets.String)
24+
GroupMembershipChanged(namespaceName string, users, groups sets.String)
2525
}
2626

2727
type WatchableCache interface {
@@ -106,15 +106,13 @@ func NewUserProjectWatcher(username string, groups []string, projectCache *proje
106106
return w
107107
}
108108

109-
func (w *userProjectWatcher) GroupMembershipChanged(namespaceName string, latestUsers, lastestGroups, removedUsers, removedGroups, addedUsers, addedGroups sets.String) {
110-
hasAccess := latestUsers.Has(w.username) || lastestGroups.HasAny(w.groups...)
111-
removed := !hasAccess && (removedUsers.Has(w.username) || removedGroups.HasAny(w.groups...))
109+
func (w *userProjectWatcher) GroupMembershipChanged(namespaceName string, users, groups sets.String) {
110+
hasAccess := users.Has(w.username) || groups.HasAny(w.groups...)
111+
_, known := w.knownProjects[namespaceName]
112112

113113
switch {
114-
case removed:
115-
if _, known := w.knownProjects[namespaceName]; !known {
116-
return
117-
}
114+
// this means that we were removed from the project
115+
case !hasAccess && known:
118116
delete(w.knownProjects, namespaceName)
119117

120118
select {

pkg/project/auth/watch_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func TestFullIncoming(t *testing.T) {
5959
watcher.cacheIncoming <- watch.Event{Type: watch.Added}
6060

6161
// this call should not block and we should see a failure
62-
watcher.GroupMembershipChanged("ns-01", sets.NewString("bob"), sets.String{}, sets.String{}, sets.String{}, sets.NewString("bob"), sets.String{})
62+
watcher.GroupMembershipChanged("ns-01", sets.NewString("bob"), sets.String{})
6363
if len(fakeAuthCache.removed) != 1 {
6464
t.Errorf("should have removed self")
6565
}
@@ -103,7 +103,7 @@ func TestAddModifyDeleteEventsByUser(t *testing.T) {
103103
watcher, _ := newTestWatcher("bob", nil, newNamespaces("ns-01")...)
104104
go watcher.Watch()
105105

106-
watcher.GroupMembershipChanged("ns-01", sets.NewString("bob"), sets.String{}, sets.String{}, sets.String{}, sets.NewString("bob"), sets.String{})
106+
watcher.GroupMembershipChanged("ns-01", sets.NewString("bob"), sets.String{})
107107
select {
108108
case event := <-watcher.ResultChan():
109109
if event.Type != watch.Added {
@@ -117,14 +117,14 @@ func TestAddModifyDeleteEventsByUser(t *testing.T) {
117117
}
118118

119119
// the object didn't change, we shouldn't observe it
120-
watcher.GroupMembershipChanged("ns-01", sets.NewString("bob"), sets.String{}, sets.String{}, sets.String{}, sets.String{}, sets.String{})
120+
watcher.GroupMembershipChanged("ns-01", sets.NewString("bob"), sets.String{})
121121
select {
122122
case event := <-watcher.ResultChan():
123123
t.Fatalf("unexpected event %v", event)
124124
case <-time.After(3 * time.Second):
125125
}
126126

127-
watcher.GroupMembershipChanged("ns-01", sets.NewString("alice"), sets.String{}, sets.NewString("bob"), sets.String{}, sets.String{}, sets.String{})
127+
watcher.GroupMembershipChanged("ns-01", sets.NewString("alice"), sets.String{})
128128
select {
129129
case event := <-watcher.ResultChan():
130130
if event.Type != watch.Deleted {
@@ -142,7 +142,7 @@ func TestAddModifyDeleteEventsByGroup(t *testing.T) {
142142
watcher, _ := newTestWatcher("bob", []string{"group-one"}, newNamespaces("ns-01")...)
143143
go watcher.Watch()
144144

145-
watcher.GroupMembershipChanged("ns-01", sets.String{}, sets.NewString("group-one"), sets.String{}, sets.String{}, sets.String{}, sets.NewString("group-one"))
145+
watcher.GroupMembershipChanged("ns-01", sets.String{}, sets.NewString("group-one"))
146146
select {
147147
case event := <-watcher.ResultChan():
148148
if event.Type != watch.Added {
@@ -156,14 +156,14 @@ func TestAddModifyDeleteEventsByGroup(t *testing.T) {
156156
}
157157

158158
// the object didn't change, we shouldn't observe it
159-
watcher.GroupMembershipChanged("ns-01", sets.String{}, sets.NewString("group-one"), sets.String{}, sets.String{}, sets.String{}, sets.String{})
159+
watcher.GroupMembershipChanged("ns-01", sets.String{}, sets.NewString("group-one"))
160160
select {
161161
case event := <-watcher.ResultChan():
162162
t.Fatalf("unexpected event %v", event)
163163
case <-time.After(3 * time.Second):
164164
}
165165

166-
watcher.GroupMembershipChanged("ns-01", sets.String{}, sets.NewString("group-two"), sets.String{}, sets.NewString("group-one"), sets.String{}, sets.String{})
166+
watcher.GroupMembershipChanged("ns-01", sets.String{}, sets.NewString("group-two"))
167167
select {
168168
case event := <-watcher.ResultChan():
169169
if event.Type != watch.Deleted {

test/integration/project_test.go

+46-54
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func TestProjectMustExist(t *testing.T) {
156156

157157
func TestProjectWatch(t *testing.T) {
158158
testutil.RequireEtcd(t)
159-
_, clusterAdminKubeConfig, err := testserver.StartTestMasterAPI()
159+
_, clusterAdminKubeConfig, err := testserver.StartTestMaster()
160160
if err != nil {
161161
t.Fatalf("unexpected error: %v", err)
162162
}
@@ -181,21 +181,9 @@ func TestProjectWatch(t *testing.T) {
181181
if _, err := testserver.CreateNewProject(clusterAdminClient, *clusterAdminClientConfig, "ns-01", "bob"); err != nil {
182182
t.Fatalf("unexpected error: %v", err)
183183
}
184+
waitForAdd("ns-01", w, t)
184185

185-
select {
186-
case event := <-w.ResultChan():
187-
if event.Type != watch.Added {
188-
t.Errorf("expected added, got %v", event)
189-
}
190-
project := event.Object.(*projectapi.Project)
191-
if project.Name != "ns-01" {
192-
t.Fatalf("expected %v, got %#v", "ns-01", project)
193-
}
194-
195-
case <-time.After(3 * time.Second):
196-
t.Fatalf("timeout")
197-
}
198-
186+
// TEST FOR ADD/REMOVE ACCESS
199187
joeClient, err := testserver.CreateNewProject(clusterAdminClient, *clusterAdminClientConfig, "ns-02", "joe")
200188
if err != nil {
201189
t.Fatalf("unexpected error: %v", err)
@@ -209,58 +197,31 @@ func TestProjectWatch(t *testing.T) {
209197
if err := addBob.AddRole(); err != nil {
210198
t.Fatalf("unexpected error: %v", err)
211199
}
212-
// wait for the add
213-
for {
214-
select {
215-
case event := <-w.ResultChan():
216-
project := event.Object.(*projectapi.Project)
217-
t.Logf("got %#v %#v", event, project)
218-
if event.Type == watch.Added && project.Name == "ns-02" {
219-
return
220-
}
221-
222-
case <-time.After(3 * time.Second):
223-
t.Fatalf("timeout")
224-
}
225-
}
200+
waitForAdd("ns-02", w, t)
226201

227202
if err := addBob.RemoveRole(); err != nil {
228203
t.Fatalf("unexpected error: %v", err)
229204
}
205+
waitForDelete("ns-02", w, t)
230206

231-
// wait for the delete
232-
for {
233-
select {
234-
case event := <-w.ResultChan():
235-
project := event.Object.(*projectapi.Project)
236-
t.Logf("got %#v %#v", event, project)
237-
if event.Type == watch.Deleted && project.Name == "ns-02" {
238-
return
239-
}
207+
// TEST FOR DELETE PROJECT
208+
if _, err := testserver.CreateNewProject(clusterAdminClient, *clusterAdminClientConfig, "ns-03", "bob"); err != nil {
209+
t.Fatalf("unexpected error: %v", err)
210+
}
211+
waitForAdd("ns-03", w, t)
240212

241-
case <-time.After(3 * time.Second):
242-
t.Fatalf("timeout")
243-
}
213+
if err := bobClient.Projects().Delete("ns-03"); err != nil {
214+
t.Fatalf("unexpected error: %v", err)
244215
}
216+
// wait for the delete
217+
waitForDelete("ns-03", w, t)
245218

246219
// test the "start from beginning watch"
247220
beginningWatch, err := bobClient.Projects().Watch(kapi.ListOptions{ResourceVersion: "0"})
248221
if err != nil {
249222
t.Fatalf("unexpected error: %v", err)
250223
}
251-
select {
252-
case event := <-beginningWatch.ResultChan():
253-
if event.Type != watch.Added {
254-
t.Errorf("expected added, got %v", event)
255-
}
256-
project := event.Object.(*projectapi.Project)
257-
if project.Name != "ns-01" {
258-
t.Fatalf("expected %v, got %#v", "ns-01", project)
259-
}
260-
261-
case <-time.After(3 * time.Second):
262-
t.Fatalf("timeout")
263-
}
224+
waitForAdd("ns-01", beginningWatch, t)
264225

265226
fromNowWatch, err := bobClient.Projects().Watch(kapi.ListOptions{})
266227
if err != nil {
@@ -274,3 +235,34 @@ func TestProjectWatch(t *testing.T) {
274235
}
275236

276237
}
238+
239+
func waitForDelete(projectName string, w watch.Interface, t *testing.T) {
240+
for {
241+
select {
242+
case event := <-w.ResultChan():
243+
project := event.Object.(*projectapi.Project)
244+
t.Logf("got %#v %#v", event, project)
245+
if event.Type == watch.Deleted && project.Name == projectName {
246+
return
247+
}
248+
249+
case <-time.After(30 * time.Second):
250+
t.Fatalf("timeout: %v", projectName)
251+
}
252+
}
253+
}
254+
func waitForAdd(projectName string, w watch.Interface, t *testing.T) {
255+
for {
256+
select {
257+
case event := <-w.ResultChan():
258+
project := event.Object.(*projectapi.Project)
259+
t.Logf("got %#v %#v", event, project)
260+
if event.Type == watch.Added && project.Name == projectName {
261+
return
262+
}
263+
264+
case <-time.After(30 * time.Second):
265+
t.Fatalf("timeout: %v", projectName)
266+
}
267+
}
268+
}

0 commit comments

Comments
 (0)