Skip to content

Commit ab814bc

Browse files
committed
Switch image quota to user shared informers
1 parent 12b6215 commit ab814bc

12 files changed

+112
-77
lines changed

pkg/cmd/server/origin/master_config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func BuildMasterConfig(options configapi.MasterConfig) (*MasterConfig, error) {
221221

222222
kubeletClientConfig := configapi.GetKubeletClientConfig(options)
223223

224-
quotaRegistry := quota.NewAllResourceQuotaRegistry(privilegedLoopbackOpenShiftClient, privilegedLoopbackKubeClientset, kubeInformerFactory)
224+
quotaRegistry := quota.NewAllResourceQuotaRegistry(informerFactory, privilegedLoopbackOpenShiftClient, privilegedLoopbackKubeClientset)
225225
ruleResolver := rulevalidation.NewDefaultRuleResolver(
226226
informerFactory.Policies().Lister(),
227227
informerFactory.PolicyBindings().Lister(),

pkg/cmd/server/origin/run_components.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -495,13 +495,13 @@ func (c *MasterConfig) RunResourceQuotaManager(cm *cmapp.CMServer) {
495495
}
496496

497497
osClient, kClient := c.ResourceQuotaManagerClients()
498-
resourceQuotaRegistry := quota.NewAllResourceQuotaRegistry(osClient, kClient, c.Informers.KubernetesInformers())
498+
resourceQuotaRegistry := quota.NewAllResourceQuotaRegistry(c.Informers, osClient, kClient)
499499
resourceQuotaControllerOptions := &kresourcequota.ResourceQuotaControllerOptions{
500500
KubeClient: kClient,
501501
ResyncPeriod: controller.StaticResyncPeriodFunc(resourceQuotaSyncPeriod),
502502
Registry: resourceQuotaRegistry,
503503
GroupKindsToReplenish: quota.AllEvaluatedGroupKinds,
504-
ControllerFactory: quotacontroller.NewAllResourceReplenishmentControllerFactory(c.Informers.KubernetesInformers(), osClient, kClient),
504+
ControllerFactory: quotacontroller.NewAllResourceReplenishmentControllerFactory(c.Informers, osClient, kClient),
505505
ReplenishmentResyncPeriod: replenishmentSyncPeriodFunc,
506506
}
507507
go kresourcequota.NewResourceQuotaController(resourceQuotaControllerOptions).Run(concurrentResourceQuotaSyncs, utilwait.NeverStop)
@@ -517,7 +517,7 @@ func (c *MasterConfig) RunClusterQuotaMappingController() {
517517

518518
func (c *MasterConfig) RunClusterQuotaReconciliationController() {
519519
osClient, kClient := c.ResourceQuotaManagerClients()
520-
resourceQuotaRegistry := quota.NewAllResourceQuotaRegistry(osClient, kClient, c.Informers.KubernetesInformers())
520+
resourceQuotaRegistry := quota.NewAllResourceQuotaRegistry(c.Informers, osClient, kClient)
521521
groupKindsToReplenish := quota.AllEvaluatedGroupKinds
522522

523523
options := clusterquotareconciliation.ClusterQuotaReconcilationControllerOptions{
@@ -527,7 +527,7 @@ func (c *MasterConfig) RunClusterQuotaReconciliationController() {
527527

528528
Registry: resourceQuotaRegistry,
529529
ResyncPeriod: defaultResourceQuotaSyncPeriod,
530-
ControllerFactory: quotacontroller.NewAllResourceReplenishmentControllerFactory(c.Informers.KubernetesInformers(), osClient, kClient),
530+
ControllerFactory: quotacontroller.NewAllResourceReplenishmentControllerFactory(c.Informers, osClient, kClient),
531531
ReplenishmentResyncPeriod: controller.StaticResyncPeriodFunc(defaultReplenishmentSyncPeriod),
532532
GroupKindsToReplenish: groupKindsToReplenish,
533533
}

pkg/quota/admission/resourcequota/admission_test.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ package resourcequota
22

33
import (
44
"testing"
5+
"time"
56

67
"k8s.io/kubernetes/pkg/admission"
78
kapi "k8s.io/kubernetes/pkg/api"
89
"k8s.io/kubernetes/pkg/api/resource"
910
kfake "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
11+
"k8s.io/kubernetes/pkg/controller/informers"
1012

1113
"github.com/openshift/origin/pkg/client/testclient"
14+
"github.com/openshift/origin/pkg/controller/shared"
1215
imageapi "github.com/openshift/origin/pkg/image/api"
1316
"github.com/openshift/origin/pkg/quota"
1417
quotautil "github.com/openshift/origin/pkg/quota/util"
@@ -30,8 +33,10 @@ func TestOriginQuotaAdmissionIsErrorQuotaExceeded(t *testing.T) {
3033
}
3134
kubeClient := kfake.NewSimpleClientset(resourceQuota)
3235
osClient := testclient.NewSimpleFake()
36+
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, 10*time.Minute)
37+
informerFactory := shared.NewInformerFactory(kubeInformerFactory, kubeClient, osClient, shared.DefaultListerWatcherOverrides{}, 10*time.Minute)
3338
plugin := NewOriginResourceQuota(kubeClient).(*originQuotaAdmission)
34-
plugin.SetOriginQuotaRegistry(quota.NewOriginQuotaRegistry(osClient))
39+
plugin.SetOriginQuotaRegistry(quota.NewOriginQuotaRegistry(informerFactory.ImageStreams(), osClient))
3540
if err := plugin.Validate(); err != nil {
3641
t.Fatalf("unexpected error: %v", err)
3742
}

pkg/quota/controller/replenishment_controller.go

+12-27
Original file line numberDiff line numberDiff line change
@@ -4,53 +4,38 @@ import (
44
"fmt"
55
"reflect"
66

7-
"k8s.io/kubernetes/pkg/api"
87
"k8s.io/kubernetes/pkg/client/cache"
98
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
10-
"k8s.io/kubernetes/pkg/controller/informers"
119
kresourcequota "k8s.io/kubernetes/pkg/controller/resourcequota"
12-
"k8s.io/kubernetes/pkg/runtime"
13-
"k8s.io/kubernetes/pkg/watch"
1410

1511
osclient "github.com/openshift/origin/pkg/client"
12+
"github.com/openshift/origin/pkg/controller/shared"
1613
imageapi "github.com/openshift/origin/pkg/image/api"
1714
)
1815

1916
// replenishmentControllerFactory implements ReplenishmentControllerFactory
2017
type replenishmentControllerFactory struct {
21-
osClient osclient.Interface
18+
isInformer shared.ImageStreamInformer
2219
}
2320

2421
var _ kresourcequota.ReplenishmentControllerFactory = &replenishmentControllerFactory{}
2522

2623
// NewReplenishmentControllerFactory returns a factory that knows how to build controllers
2724
// to replenish resources when updated or deleted
28-
func NewReplenishmentControllerFactory(osClient osclient.Interface) kresourcequota.ReplenishmentControllerFactory {
25+
func NewReplenishmentControllerFactory(isInformer shared.ImageStreamInformer) kresourcequota.ReplenishmentControllerFactory {
2926
return &replenishmentControllerFactory{
30-
osClient: osClient,
27+
isInformer: isInformer,
3128
}
3229
}
3330

3431
func (r *replenishmentControllerFactory) NewController(options *kresourcequota.ReplenishmentControllerOptions) (cache.ControllerInterface, error) {
3532
switch options.GroupKind {
3633
case imageapi.Kind("ImageStream"):
37-
_, result := cache.NewInformer(
38-
&cache.ListWatch{
39-
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
40-
return r.osClient.ImageStreams(api.NamespaceAll).List(options)
41-
},
42-
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
43-
return r.osClient.ImageStreams(api.NamespaceAll).Watch(options)
44-
},
45-
},
46-
&imageapi.ImageStream{},
47-
options.ResyncPeriod(),
48-
cache.ResourceEventHandlerFuncs{
49-
UpdateFunc: ImageStreamReplenishmentUpdateFunc(options),
50-
DeleteFunc: kresourcequota.ObjectReplenishmentDeleteFunc(options),
51-
},
52-
)
53-
return result, nil
34+
r.isInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
35+
UpdateFunc: ImageStreamReplenishmentUpdateFunc(options),
36+
DeleteFunc: kresourcequota.ObjectReplenishmentDeleteFunc(options),
37+
})
38+
return r.isInformer.Informer().GetController(), nil
5439
default:
5540
return nil, fmt.Errorf("no replenishment controller available for %s", options.GroupKind)
5641
}
@@ -68,9 +53,9 @@ func ImageStreamReplenishmentUpdateFunc(options *kresourcequota.ReplenishmentCon
6853
}
6954

7055
// NewAllResourceReplenishmentControllerFactory returns a ReplenishmentControllerFactory that knows how to replenish all known resources
71-
func NewAllResourceReplenishmentControllerFactory(informerFactory informers.SharedInformerFactory, osClient osclient.Interface, kubeClientSet clientset.Interface) kresourcequota.ReplenishmentControllerFactory {
56+
func NewAllResourceReplenishmentControllerFactory(informerFactory shared.InformerFactory, osClient osclient.Interface, kubeClientSet clientset.Interface) kresourcequota.ReplenishmentControllerFactory {
7257
return kresourcequota.UnionReplenishmentControllerFactory{
73-
kresourcequota.NewReplenishmentControllerFactory(informerFactory, kubeClientSet),
74-
NewReplenishmentControllerFactory(osClient),
58+
kresourcequota.NewReplenishmentControllerFactory(informerFactory.KubernetesInformers(), kubeClientSet),
59+
NewReplenishmentControllerFactory(informerFactory.ImageStreams()),
7560
}
7661
}

pkg/quota/image/imagestream_evaluator.go

+16-6
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
11
package image
22

33
import (
4+
"fmt"
5+
46
"k8s.io/kubernetes/pkg/admission"
57
kapi "k8s.io/kubernetes/pkg/api"
68
kquota "k8s.io/kubernetes/pkg/quota"
79
"k8s.io/kubernetes/pkg/quota/generic"
810
"k8s.io/kubernetes/pkg/runtime"
911

10-
osclient "github.com/openshift/origin/pkg/client"
12+
oscache "github.com/openshift/origin/pkg/client/cache"
1113
imageapi "github.com/openshift/origin/pkg/image/api"
1214
)
1315

1416
const imageStreamEvaluatorName = "Evaluator.ImageStream"
1517

1618
// NewImageStreamEvaluator computes resource usage of ImageStreams. Instantiating this is necessary for
1719
// resource quota admission controller to properly work on image stream related objects.
18-
func NewImageStreamEvaluator(isNamespacer osclient.ImageStreamsNamespacer) kquota.Evaluator {
20+
func NewImageStreamEvaluator(store *oscache.StoreToImageStreamLister) kquota.Evaluator {
1921
allResources := []kapi.ResourceName{
2022
imageapi.ResourceImageStreams,
2123
}
@@ -31,13 +33,21 @@ func NewImageStreamEvaluator(isNamespacer osclient.ImageStreamsNamespacer) kquot
3133
ConstraintsFunc: generic.ObjectCountConstraintsFunc(imageapi.ResourceImageStreams),
3234
UsageFunc: generic.ObjectCountUsageFunc(imageapi.ResourceImageStreams),
3335
ListFuncByNamespace: func(namespace string, options kapi.ListOptions) ([]runtime.Object, error) {
34-
itemList, err := isNamespacer.ImageStreams(namespace).List(options)
36+
list, err := store.ImageStreams(namespace).List(options.LabelSelector)
3537
if err != nil {
3638
return nil, err
3739
}
38-
results := make([]runtime.Object, 0, len(itemList.Items))
39-
for i := range itemList.Items {
40-
results = append(results, &itemList.Items[i])
40+
results := make([]runtime.Object, 0, len(list))
41+
for _, is := range list {
42+
objCopy, err := kapi.Scheme.DeepCopy(is)
43+
if err != nil {
44+
return nil, err
45+
}
46+
isCopy, ok := objCopy.(*imageapi.ImageStream)
47+
if !ok {
48+
return nil, fmt.Errorf("expected ImageStream got %#v", objCopy)
49+
}
50+
results = append(results, isCopy)
4151
}
4252
return results, nil
4353
},

pkg/quota/image/imagestream_evaluator_test.go

+25-11
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package image
22

33
import (
44
"testing"
5+
"time"
56

67
kapi "k8s.io/kubernetes/pkg/api"
8+
"k8s.io/kubernetes/pkg/client/cache"
79
kquota "k8s.io/kubernetes/pkg/quota"
810

9-
"github.com/openshift/origin/pkg/client/testclient"
11+
oscache "github.com/openshift/origin/pkg/client/cache"
1012
imagetest "github.com/openshift/origin/pkg/image/admission/testutil"
1113
imageapi "github.com/openshift/origin/pkg/image/api"
1214
)
@@ -79,10 +81,17 @@ func TestImageStreamEvaluatorUsageStats(t *testing.T) {
7981
expectedISCount: 1,
8082
},
8183
} {
82-
fakeClient := &testclient.Fake{}
83-
fakeClient.AddReactor("list", "imagestreams", imagetest.GetFakeImageStreamListHandler(t, tc.iss...))
84-
85-
evaluator := NewImageStreamEvaluator(fakeClient)
84+
isInformer := cache.NewSharedIndexInformer(
85+
&cache.ListWatch{},
86+
&imageapi.ImageStream{},
87+
2*time.Minute,
88+
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
89+
)
90+
store := oscache.StoreToImageStreamLister{Indexer: isInformer.GetIndexer()}
91+
for _, is := range tc.iss {
92+
store.Indexer.Add(&is)
93+
}
94+
evaluator := NewImageStreamEvaluator(&store)
8695

8796
stats, err := evaluator.UsageStats(kquota.UsageStatsOptions{Namespace: tc.namespace})
8897
if err != nil {
@@ -158,18 +167,23 @@ func TestImageStreamEvaluatorUsage(t *testing.T) {
158167
expectedISCount: 1,
159168
},
160169
} {
161-
162170
newIS := &imageapi.ImageStream{
163171
ObjectMeta: kapi.ObjectMeta{
164172
Namespace: "test",
165173
Name: "is",
166174
},
167175
}
168-
169-
fakeClient := &testclient.Fake{}
170-
fakeClient.AddReactor("get", "imagestreams", imagetest.GetFakeImageStreamGetHandler(t, tc.iss...))
171-
172-
evaluator := NewImageStreamEvaluator(fakeClient)
176+
isInformer := cache.NewSharedIndexInformer(
177+
&cache.ListWatch{},
178+
&imageapi.ImageStream{},
179+
2*time.Minute,
180+
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
181+
)
182+
store := oscache.StoreToImageStreamLister{Indexer: isInformer.GetIndexer()}
183+
for _, is := range tc.iss {
184+
store.Indexer.Add(&is)
185+
}
186+
evaluator := NewImageStreamEvaluator(&store)
173187

174188
usage := evaluator.Usage(newIS)
175189
expectedUsage := imagetest.ExpectedResourceListFor(tc.expectedISCount)

pkg/quota/image/imagestreamimport_evaluator.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"k8s.io/kubernetes/pkg/runtime"
1313
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
1414

15-
osclient "github.com/openshift/origin/pkg/client"
15+
oscache "github.com/openshift/origin/pkg/client/cache"
1616
imageapi "github.com/openshift/origin/pkg/image/api"
1717
)
1818

@@ -21,7 +21,7 @@ const imageStreamImportName = "Evaluator.ImageStreamImport"
2121
// NewImageStreamImportEvaluator computes resource usage for ImageStreamImport objects. This particular kind
2222
// is a virtual resource. It depends on ImageStream usage evaluator to compute image numbers before the
2323
// the admission can work.
24-
func NewImageStreamImportEvaluator(isNamespacer osclient.ImageStreamsNamespacer) kquota.Evaluator {
24+
func NewImageStreamImportEvaluator(store *oscache.StoreToImageStreamLister) kquota.Evaluator {
2525
computeResources := []kapi.ResourceName{
2626
imageapi.ResourceImageStreams,
2727
}
@@ -34,7 +34,7 @@ func NewImageStreamImportEvaluator(isNamespacer osclient.ImageStreamsNamespacer)
3434
InternalOperationResources: map[admission.Operation][]kapi.ResourceName{admission.Create: computeResources},
3535
MatchedResourceNames: computeResources,
3636
MatchesScopeFunc: matchesScopeFunc,
37-
UsageFunc: makeImageStreamImportAdmissionUsageFunc(isNamespacer),
37+
UsageFunc: makeImageStreamImportAdmissionUsageFunc(store),
3838
ListFuncByNamespace: func(namespace string, options kapi.ListOptions) ([]runtime.Object, error) {
3939
return []runtime.Object{}, nil
4040
},
@@ -51,7 +51,7 @@ func imageStreamImportConstraintsFunc(required []kapi.ResourceName, object runti
5151
}
5252

5353
// makeImageStreamImportAdmissionUsageFunc returns a function for computing a usage of an image stream import.
54-
func makeImageStreamImportAdmissionUsageFunc(isNamespacer osclient.ImageStreamsNamespacer) generic.UsageFunc {
54+
func makeImageStreamImportAdmissionUsageFunc(store *oscache.StoreToImageStreamLister) generic.UsageFunc {
5555
return func(object runtime.Object) kapi.ResourceList {
5656
isi, ok := object.(*imageapi.ImageStreamImport)
5757
if !ok {
@@ -66,7 +66,7 @@ func makeImageStreamImportAdmissionUsageFunc(isNamespacer osclient.ImageStreamsN
6666
return usage
6767
}
6868

69-
is, err := isNamespacer.ImageStreams(isi.Namespace).Get(isi.Name)
69+
is, err := store.ImageStreams(isi.Namespace).Get(isi.Name)
7070
if err != nil && !kerrors.IsNotFound(err) {
7171
utilruntime.HandleError(fmt.Errorf("failed to list image streams: %v", err))
7272
}

pkg/quota/image/imagestreamimport_evaluator_test.go

+14-6
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package image
22

33
import (
44
"testing"
5+
"time"
56

67
kapi "k8s.io/kubernetes/pkg/api"
8+
"k8s.io/kubernetes/pkg/client/cache"
79
kquota "k8s.io/kubernetes/pkg/quota"
810

9-
"github.com/openshift/origin/pkg/client/testclient"
11+
oscache "github.com/openshift/origin/pkg/client/cache"
1012
imagetest "github.com/openshift/origin/pkg/image/admission/testutil"
1113
imageapi "github.com/openshift/origin/pkg/image/api"
1214
)
@@ -158,11 +160,17 @@ func TestImageStreamImportEvaluatorUsage(t *testing.T) {
158160
expectedISCount: 1,
159161
},
160162
} {
161-
162-
fakeClient := &testclient.Fake{}
163-
fakeClient.AddReactor("get", "imagestreams", imagetest.GetFakeImageStreamGetHandler(t, tc.iss...))
164-
165-
evaluator := NewImageStreamImportEvaluator(fakeClient)
163+
isInformer := cache.NewSharedIndexInformer(
164+
&cache.ListWatch{},
165+
&imageapi.ImageStream{},
166+
2*time.Minute,
167+
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
168+
)
169+
store := oscache.StoreToImageStreamLister{Indexer: isInformer.GetIndexer()}
170+
for _, is := range tc.iss {
171+
store.Indexer.Add(&is)
172+
}
173+
evaluator := NewImageStreamImportEvaluator(&store)
166174

167175
isi := &imageapi.ImageStreamImport{
168176
ObjectMeta: kapi.ObjectMeta{

pkg/quota/image/imagestreamtag_evaluator.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@ import (
1313
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
1414

1515
osclient "github.com/openshift/origin/pkg/client"
16+
oscache "github.com/openshift/origin/pkg/client/cache"
1617
imageapi "github.com/openshift/origin/pkg/image/api"
1718
)
1819

1920
const imageStreamTagEvaluatorName = "Evaluator.ImageStreamTag"
2021

2122
// NewImageStreamTagEvaluator computes resource usage of ImageStreamsTags. Its sole purpose is to handle
2223
// UPDATE admission operations on imageStreamTags resource.
23-
func NewImageStreamTagEvaluator(istNamespacer osclient.ImageStreamTagsNamespacer, isNamespacer osclient.ImageStreamsNamespacer) kquota.Evaluator {
24+
func NewImageStreamTagEvaluator(store *oscache.StoreToImageStreamLister, istNamespacer osclient.ImageStreamTagsNamespacer) kquota.Evaluator {
2425
computeResources := []kapi.ResourceName{
2526
imageapi.ResourceImageStreams,
2627
}
@@ -56,7 +57,7 @@ func NewImageStreamTagEvaluator(istNamespacer osclient.ImageStreamTagsNamespacer
5657
},
5758
MatchedResourceNames: computeResources,
5859
MatchesScopeFunc: matchesScopeFunc,
59-
UsageFunc: makeImageStreamTagAdmissionUsageFunc(isNamespacer),
60+
UsageFunc: makeImageStreamTagAdmissionUsageFunc(store),
6061
GetFuncByNamespace: getFuncByNamespace,
6162
ListFuncByNamespace: func(namespace string, options kapi.ListOptions) ([]runtime.Object, error) {
6263
return []runtime.Object{}, nil
@@ -75,7 +76,7 @@ func imageStreamTagConstraintsFunc(required []kapi.ResourceName, object runtime.
7576

7677
// makeImageStreamTagAdmissionUsageFunc returns a function that computes a resource usage for given image
7778
// stream tag during admission.
78-
func makeImageStreamTagAdmissionUsageFunc(isNamespacer osclient.ImageStreamsNamespacer) generic.UsageFunc {
79+
func makeImageStreamTagAdmissionUsageFunc(store *oscache.StoreToImageStreamLister) generic.UsageFunc {
7980
return func(object runtime.Object) kapi.ResourceList {
8081
ist, ok := object.(*imageapi.ImageStreamTag)
8182
if !ok {
@@ -92,7 +93,7 @@ func makeImageStreamTagAdmissionUsageFunc(isNamespacer osclient.ImageStreamsName
9293
return kapi.ResourceList{}
9394
}
9495

95-
is, err := isNamespacer.ImageStreams(ist.Namespace).Get(isName)
96+
is, err := store.ImageStreams(ist.Namespace).Get(isName)
9697
if err != nil && !kerrors.IsNotFound(err) {
9798
utilruntime.HandleError(fmt.Errorf("failed to get image stream %s/%s: %v", ist.Namespace, isName, err))
9899
}

0 commit comments

Comments
 (0)