Skip to content

Commit 7981eaa

Browse files
committed
add clusterquota reconciliation controller
1 parent de6eea9 commit 7981eaa

File tree

9 files changed

+1101
-218
lines changed

9 files changed

+1101
-218
lines changed

pkg/client/clusteresourcequota.go

+8
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type ClusterResourceQuotaInterface interface {
1515
List(opts kapi.ListOptions) (*quotaapi.ClusterResourceQuotaList, error)
1616
Get(name string) (*quotaapi.ClusterResourceQuota, error)
1717
Create(resourceQuota *quotaapi.ClusterResourceQuota) (*quotaapi.ClusterResourceQuota, error)
18+
Update(resourceQuota *quotaapi.ClusterResourceQuota) (*quotaapi.ClusterResourceQuota, error)
1819
Delete(name string) error
1920
Watch(opts kapi.ListOptions) (watch.Interface, error)
2021
}
@@ -48,6 +49,13 @@ func (c *clusterResourceQuotas) Create(resourceQuota *quotaapi.ClusterResourceQu
4849
return
4950
}
5051

52+
// Update updates an existing deploymentConfig
53+
func (c *clusterResourceQuotas) Update(resourceQuota *quotaapi.ClusterResourceQuota) (result *quotaapi.ClusterResourceQuota, err error) {
54+
result = &quotaapi.ClusterResourceQuota{}
55+
err = c.r.Put().Resource("clusterresourcequotas").Name(resourceQuota.Name).Body(resourceQuota).Do().Into(result)
56+
return
57+
}
58+
5159
func (c *clusterResourceQuotas) Delete(name string) (err error) {
5260
err = c.r.Delete().Resource("clusterresourcequotas").Name(name).Do().Error()
5361
return

pkg/client/testclient/fake_clusterresourcequota.go

+8
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ func (c *FakeClusterResourceQuotas) Create(inObj *quotaapi.ClusterResourceQuota)
4141
return obj.(*quotaapi.ClusterResourceQuota), err
4242
}
4343

44+
func (c *FakeClusterResourceQuotas) Update(inObj *quotaapi.ClusterResourceQuota) (*quotaapi.ClusterResourceQuota, error) {
45+
obj, err := c.Fake.Invokes(ktestclient.NewRootUpdateAction("clusterresourcequotas", inObj), inObj)
46+
if obj == nil {
47+
return nil, err
48+
}
49+
50+
return obj.(*quotaapi.ClusterResourceQuota), err
51+
}
4452
func (c *FakeClusterResourceQuotas) Delete(name string) error {
4553
_, err := c.Fake.Invokes(ktestclient.NewRootDeleteAction("clusterresourcequotas", name), &quotaapi.ClusterResourceQuota{})
4654
return err

pkg/cmd/server/origin/run_components.go

+30
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"k8s.io/kubernetes/pkg/controller"
2020
kresourcequota "k8s.io/kubernetes/pkg/controller/resourcequota"
2121
sacontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
22+
quotainstall "k8s.io/kubernetes/pkg/quota/install"
2223
"k8s.io/kubernetes/pkg/registry/service/allocator"
2324
etcdallocator "k8s.io/kubernetes/pkg/registry/service/allocator/etcd"
2425
"k8s.io/kubernetes/pkg/serviceaccount"
@@ -53,6 +54,7 @@ import (
5354
imageapi "github.com/openshift/origin/pkg/image/api"
5455
quota "github.com/openshift/origin/pkg/quota"
5556
quotacontroller "github.com/openshift/origin/pkg/quota/controller"
57+
"github.com/openshift/origin/pkg/quota/controller/clusterquotareconciliation"
5658
serviceaccountcontrollers "github.com/openshift/origin/pkg/serviceaccounts/controllers"
5759
)
5860

@@ -507,3 +509,31 @@ func (c *MasterConfig) RunClusterQuotaMappingController() {
507509
go c.ClusterQuotaMappingController.Run(5, utilwait.NeverStop)
508510
})
509511
}
512+
513+
func (c *MasterConfig) RunClusterQuotaReconciliationController() {
514+
osClient, kClient := c.ResourceQuotaManagerClients()
515+
resourceQuotaRegistry := quotainstall.NewRegistry(kClient)
516+
groupKindsToReplenish := []unversioned.GroupKind{
517+
kapi.Kind("Pod"),
518+
kapi.Kind("Service"),
519+
kapi.Kind("ReplicationController"),
520+
kapi.Kind("PersistentVolumeClaim"),
521+
kapi.Kind("Secret"),
522+
kapi.Kind("ConfigMap"),
523+
}
524+
525+
options := clusterquotareconciliation.ClusterQuotaReconcilationControllerOptions{
526+
ClusterQuotaInformer: c.Informers.ClusterResourceQuotas(),
527+
ClusterQuotaMapper: c.ClusterQuotaMappingController.GetClusterQuotaMapper(),
528+
ClusterQuotaClient: osClient,
529+
530+
Registry: resourceQuotaRegistry,
531+
ResyncPeriod: defaultResourceQuotaSyncPeriod,
532+
ControllerFactory: kresourcequota.NewReplenishmentControllerFactory(c.Informers.Pods().Informer(), kClient),
533+
ReplenishmentResyncPeriod: controller.StaticResyncPeriodFunc(defaultReplenishmentSyncPeriod),
534+
GroupKindsToReplenish: groupKindsToReplenish,
535+
}
536+
controller := clusterquotareconciliation.NewClusterQuotaReconcilationController(options)
537+
c.ClusterQuotaMappingController.GetClusterQuotaMapper().AddListener(controller)
538+
go controller.Run(5, utilwait.NeverStop)
539+
}

pkg/cmd/server/start/start_master.go

+1
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,7 @@ func startControllers(oc *origin.MasterConfig, kc *kubernetes.MasterConfig) erro
656656
oc.RunImageImportController()
657657
oc.RunOriginNamespaceController()
658658
oc.RunSDNController()
659+
oc.RunClusterQuotaReconciliationController()
659660
oc.RunClusterQuotaMappingController()
660661

661662
_, _, serviceServingCertClient, err := oc.GetServiceAccountClients(bootstrappolicy.ServiceServingCertServiceAccountName)

pkg/quota/controller/clusterquotamapping/clusterquotamapping.go

+1-218
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package clusterquotamapping
22

33
import (
44
"fmt"
5-
"sync"
65
"time"
76

87
"github.com/golang/glog"
@@ -15,7 +14,6 @@ import (
1514
"k8s.io/kubernetes/pkg/controller/framework"
1615
"k8s.io/kubernetes/pkg/labels"
1716
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
18-
"k8s.io/kubernetes/pkg/util/sets"
1917
"k8s.io/kubernetes/pkg/util/wait"
2018
"k8s.io/kubernetes/pkg/util/workqueue"
2119

@@ -24,15 +22,6 @@ import (
2422
quotaapi "github.com/openshift/origin/pkg/quota/api"
2523
)
2624

27-
type ClusterQuotaMapper interface {
28-
// GetClusterQuotasFor returns the list of clusterquota names that this namespace matches. It also
29-
// returns the labels associated with the namespace for the check so that callers can determine staleness
30-
GetClusterQuotasFor(namespaceName string) ([]string, map[string]string)
31-
// GetNamespacesFor returns the list of namespace names that this cluster quota matches. It also
32-
// returns the selector associated with the clusterquota for the check so that callers can determine staleness
33-
GetNamespacesFor(quotaName string) ([]string, *unversioned.LabelSelector)
34-
}
35-
3625
// Look out, here there be dragons!
3726
// There is a race when dealing with the DeltaFifo compression used to back a reflector for a controller that uses two
3827
// SharedInformers for both their watch events AND their caches. The scenario looks like this
@@ -65,15 +54,7 @@ func NewClusterQuotaMappingController(namespaceInformer shared.NamespaceInformer
6554

6655
quotaQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
6756

68-
clusterQuotaMapper: &clusterQuotaMapper{
69-
requiredQuotaToSelector: map[string]*unversioned.LabelSelector{},
70-
requiredNamespaceToLabels: map[string]map[string]string{},
71-
completedQuotaToSelector: map[string]*unversioned.LabelSelector{},
72-
completedNamespaceToLabels: map[string]map[string]string{},
73-
74-
quotaToNamespaces: map[string]sets.String{},
75-
namespaceToQuota: map[string]sets.String{},
76-
},
57+
clusterQuotaMapper: NewClusterQuotaMapper(),
7758
}
7859

7960
namespaceInformer.Informer().AddEventHandler(framework.ResourceEventHandlerFuncs{
@@ -107,204 +88,6 @@ type ClusterQuotaMappingController struct {
10788
clusterQuotaMapper *clusterQuotaMapper
10889
}
10990

110-
// clusterQuotaMapper gives thread safe access to the actual mappings that are being stored.
111-
// Many method use a shareable read lock to check status followed by a non-shareable
112-
// write lock which double checks the condition before proceding. Since locks aren't escalatable
113-
// you have to perform the recheck because someone could have beaten you in.
114-
type clusterQuotaMapper struct {
115-
lock sync.RWMutex
116-
117-
// requiredQuotaToSelector indicates the latest label selector this controller has observed for a quota
118-
requiredQuotaToSelector map[string]*unversioned.LabelSelector
119-
// requiredNamespaceToLabels indicates the latest labels this controller has observed for a namespace
120-
requiredNamespaceToLabels map[string]map[string]string
121-
// completedQuotaToSelector indicates the latest label selector this controller has scanned against namespaces
122-
completedQuotaToSelector map[string]*unversioned.LabelSelector
123-
// completedNamespaceToLabels indicates the latest labels this controller has scanned against cluster quotas
124-
completedNamespaceToLabels map[string]map[string]string
125-
126-
quotaToNamespaces map[string]sets.String
127-
namespaceToQuota map[string]sets.String
128-
}
129-
130-
func (m *clusterQuotaMapper) GetClusterQuotasFor(namespaceName string) ([]string, map[string]string) {
131-
m.lock.RLock()
132-
defer m.lock.RUnlock()
133-
134-
quotas, ok := m.namespaceToQuota[namespaceName]
135-
if !ok {
136-
return []string{}, m.completedNamespaceToLabels[namespaceName]
137-
}
138-
return quotas.List(), m.completedNamespaceToLabels[namespaceName]
139-
}
140-
141-
func (m *clusterQuotaMapper) GetNamespacesFor(quotaName string) ([]string, *unversioned.LabelSelector) {
142-
m.lock.RLock()
143-
defer m.lock.RUnlock()
144-
145-
namespaces, ok := m.quotaToNamespaces[quotaName]
146-
if !ok {
147-
return []string{}, m.completedQuotaToSelector[quotaName]
148-
}
149-
return namespaces.List(), m.completedQuotaToSelector[quotaName]
150-
}
151-
152-
// requireQuota updates the selector requirements for the given quota. This prevents stale updates to the mapping itself.
153-
// returns true if a modification was made
154-
func (m *clusterQuotaMapper) requireQuota(quota *quotaapi.ClusterResourceQuota) bool {
155-
m.lock.RLock()
156-
selector, exists := m.requiredQuotaToSelector[quota.Name]
157-
m.lock.RUnlock()
158-
159-
if selectorMatches(selector, exists, quota) {
160-
return false
161-
}
162-
163-
m.lock.Lock()
164-
defer m.lock.Unlock()
165-
selector, exists = m.requiredQuotaToSelector[quota.Name]
166-
if selectorMatches(selector, exists, quota) {
167-
return false
168-
}
169-
170-
m.requiredQuotaToSelector[quota.Name] = quota.Spec.Selector
171-
return true
172-
}
173-
174-
// completeQuota updates the latest selector used to generate the mappings for this quota. The value is returned
175-
// by the Get methods for the mapping so that callers can determine staleness
176-
func (m *clusterQuotaMapper) completeQuota(quota *quotaapi.ClusterResourceQuota) {
177-
m.lock.Lock()
178-
defer m.lock.Unlock()
179-
m.completedQuotaToSelector[quota.Name] = quota.Spec.Selector
180-
}
181-
182-
// removeQuota deletes a quota from all mappings
183-
func (m *clusterQuotaMapper) removeQuota(quotaName string) {
184-
m.lock.Lock()
185-
defer m.lock.Unlock()
186-
187-
delete(m.requiredQuotaToSelector, quotaName)
188-
delete(m.completedQuotaToSelector, quotaName)
189-
delete(m.quotaToNamespaces, quotaName)
190-
for _, quotas := range m.namespaceToQuota {
191-
quotas.Delete(quotaName)
192-
}
193-
}
194-
195-
// requireNamespace updates the label requirements for the given namespace. This prevents stale updates to the mapping itself.
196-
// returns true if a modification was made
197-
func (m *clusterQuotaMapper) requireNamespace(namespace *kapi.Namespace) bool {
198-
m.lock.RLock()
199-
labels, exists := m.requiredNamespaceToLabels[namespace.Name]
200-
m.lock.RUnlock()
201-
202-
if labelsMatch(labels, exists, namespace) {
203-
return false
204-
}
205-
206-
m.lock.Lock()
207-
defer m.lock.Unlock()
208-
labels, exists = m.requiredNamespaceToLabels[namespace.Name]
209-
if labelsMatch(labels, exists, namespace) {
210-
return false
211-
}
212-
213-
m.requiredNamespaceToLabels[namespace.Name] = namespace.Labels
214-
return true
215-
}
216-
217-
// completeNamespace updates the latest labels used to generate the mappings for this namespace. The value is returned
218-
// by the Get methods for the mapping so that callers can determine staleness
219-
func (m *clusterQuotaMapper) completeNamespace(namespace *kapi.Namespace) {
220-
m.lock.Lock()
221-
defer m.lock.Unlock()
222-
m.completedNamespaceToLabels[namespace.Name] = namespace.Labels
223-
}
224-
225-
// removeNamespace deletes a namespace from all mappings
226-
func (m *clusterQuotaMapper) removeNamespace(namespaceName string) {
227-
m.lock.Lock()
228-
defer m.lock.Unlock()
229-
230-
delete(m.requiredNamespaceToLabels, namespaceName)
231-
delete(m.completedNamespaceToLabels, namespaceName)
232-
delete(m.namespaceToQuota, namespaceName)
233-
for _, namespaces := range m.quotaToNamespaces {
234-
namespaces.Delete(namespaceName)
235-
}
236-
}
237-
238-
func selectorMatches(selector *unversioned.LabelSelector, exists bool, quota *quotaapi.ClusterResourceQuota) bool {
239-
return exists && kapi.Semantic.DeepEqual(selector, quota.Spec.Selector)
240-
}
241-
func labelsMatch(labels map[string]string, exists bool, namespace *kapi.Namespace) bool {
242-
return exists && kapi.Semantic.DeepEqual(labels, namespace.Labels)
243-
}
244-
245-
// setMapping maps (or removes a mapping) between a clusterquota and a namespace
246-
// It returns whether the action worked, whether the quota is out of date, whether the namespace is out of date
247-
// This allows callers to decide whether to pull new information from the cache or simply skip execution
248-
func (m *clusterQuotaMapper) setMapping(quota *quotaapi.ClusterResourceQuota, namespace *kapi.Namespace, remove bool) (bool /*added*/, bool /*quota matches*/, bool /*namespace matches*/) {
249-
m.lock.RLock()
250-
selector, selectorExists := m.requiredQuotaToSelector[quota.Name]
251-
labels, labelsExist := m.requiredNamespaceToLabels[namespace.Name]
252-
m.lock.RUnlock()
253-
254-
if !selectorMatches(selector, selectorExists, quota) {
255-
return false, false, true
256-
}
257-
if !labelsMatch(labels, labelsExist, namespace) {
258-
return false, true, false
259-
}
260-
261-
m.lock.Lock()
262-
defer m.lock.Unlock()
263-
selector, selectorExists = m.requiredQuotaToSelector[quota.Name]
264-
labels, labelsExist = m.requiredNamespaceToLabels[namespace.Name]
265-
if !selectorMatches(selector, selectorExists, quota) {
266-
return false, false, true
267-
}
268-
if !labelsMatch(labels, labelsExist, namespace) {
269-
return false, true, false
270-
}
271-
272-
if remove {
273-
namespaces, ok := m.quotaToNamespaces[quota.Name]
274-
if !ok {
275-
m.quotaToNamespaces[quota.Name] = sets.String{}
276-
} else {
277-
namespaces.Delete(namespace.Name)
278-
}
279-
280-
quotas, ok := m.namespaceToQuota[namespace.Name]
281-
if !ok {
282-
m.namespaceToQuota[namespace.Name] = sets.String{}
283-
} else {
284-
quotas.Delete(quota.Name)
285-
}
286-
287-
return true, true, true
288-
}
289-
290-
namespaces, ok := m.quotaToNamespaces[quota.Name]
291-
if !ok {
292-
m.quotaToNamespaces[quota.Name] = sets.NewString(namespace.Name)
293-
} else {
294-
namespaces.Insert(namespace.Name)
295-
}
296-
297-
quotas, ok := m.namespaceToQuota[namespace.Name]
298-
if !ok {
299-
m.namespaceToQuota[namespace.Name] = sets.NewString(quota.Name)
300-
} else {
301-
quotas.Insert(quota.Name)
302-
}
303-
304-
return true, true, true
305-
306-
}
307-
30891
func (c *ClusterQuotaMappingController) GetClusterQuotaMapper() ClusterQuotaMapper {
30992
return c.clusterQuotaMapper
31093
}

0 commit comments

Comments
 (0)