Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch image quota to shared informers #12088

Merged
merged 1 commit into from
Jan 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cmd/server/origin/master_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func BuildMasterConfig(options configapi.MasterConfig) (*MasterConfig, error) {

kubeletClientConfig := configapi.GetKubeletClientConfig(options)

quotaRegistry := quota.NewAllResourceQuotaRegistry(privilegedLoopbackOpenShiftClient, privilegedLoopbackKubeClientset, kubeInformerFactory)
quotaRegistry := quota.NewAllResourceQuotaRegistry(informerFactory, privilegedLoopbackOpenShiftClient, privilegedLoopbackKubeClientset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this one used for admission? The admission one probably doesn't need the informers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one already accepted informers, but just k8s ones, but I needed both k8s and origin ones. I've also moved this parameter from the end of this function call as the first argument, to make it similar to other methods. And since quota.NewAllResourceQuotaRegistry under covers creates evaluators which are now switched to informers, it's needed this way.

ruleResolver := rulevalidation.NewDefaultRuleResolver(
informerFactory.Policies().Lister(),
informerFactory.PolicyBindings().Lister(),
Expand Down
8 changes: 4 additions & 4 deletions pkg/cmd/server/origin/run_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,13 +495,13 @@ func (c *MasterConfig) RunResourceQuotaManager(cm *cmapp.CMServer) {
}

osClient, kClient := c.ResourceQuotaManagerClients()
resourceQuotaRegistry := quota.NewAllResourceQuotaRegistry(osClient, kClient, c.Informers.KubernetesInformers())
resourceQuotaRegistry := quota.NewAllResourceQuotaRegistry(c.Informers, osClient, kClient)
resourceQuotaControllerOptions := &kresourcequota.ResourceQuotaControllerOptions{
KubeClient: kClient,
ResyncPeriod: controller.StaticResyncPeriodFunc(resourceQuotaSyncPeriod),
Registry: resourceQuotaRegistry,
GroupKindsToReplenish: quota.AllEvaluatedGroupKinds,
ControllerFactory: quotacontroller.NewAllResourceReplenishmentControllerFactory(c.Informers.KubernetesInformers(), osClient, kClient),
ControllerFactory: quotacontroller.NewAllResourceReplenishmentControllerFactory(c.Informers, osClient, kClient),
ReplenishmentResyncPeriod: replenishmentSyncPeriodFunc,
}
go kresourcequota.NewResourceQuotaController(resourceQuotaControllerOptions).Run(concurrentResourceQuotaSyncs, utilwait.NeverStop)
Expand All @@ -517,7 +517,7 @@ func (c *MasterConfig) RunClusterQuotaMappingController() {

func (c *MasterConfig) RunClusterQuotaReconciliationController() {
osClient, kClient := c.ResourceQuotaManagerClients()
resourceQuotaRegistry := quota.NewAllResourceQuotaRegistry(osClient, kClient, c.Informers.KubernetesInformers())
resourceQuotaRegistry := quota.NewAllResourceQuotaRegistry(c.Informers, osClient, kClient)
groupKindsToReplenish := quota.AllEvaluatedGroupKinds

options := clusterquotareconciliation.ClusterQuotaReconcilationControllerOptions{
Expand All @@ -527,7 +527,7 @@ func (c *MasterConfig) RunClusterQuotaReconciliationController() {

Registry: resourceQuotaRegistry,
ResyncPeriod: defaultResourceQuotaSyncPeriod,
ControllerFactory: quotacontroller.NewAllResourceReplenishmentControllerFactory(c.Informers.KubernetesInformers(), osClient, kClient),
ControllerFactory: quotacontroller.NewAllResourceReplenishmentControllerFactory(c.Informers, osClient, kClient),
ReplenishmentResyncPeriod: controller.StaticResyncPeriodFunc(defaultReplenishmentSyncPeriod),
GroupKindsToReplenish: groupKindsToReplenish,
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/quota/admission/resourcequota/admission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package resourcequota

import (
"testing"
"time"

"k8s.io/kubernetes/pkg/admission"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
kfake "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/controller/informers"

"github.com/openshift/origin/pkg/client/testclient"
"github.com/openshift/origin/pkg/controller/shared"
imageapi "github.com/openshift/origin/pkg/image/api"
"github.com/openshift/origin/pkg/quota"
quotautil "github.com/openshift/origin/pkg/quota/util"
Expand All @@ -30,8 +33,10 @@ func TestOriginQuotaAdmissionIsErrorQuotaExceeded(t *testing.T) {
}
kubeClient := kfake.NewSimpleClientset(resourceQuota)
osClient := testclient.NewSimpleFake()
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, 10*time.Minute)
informerFactory := shared.NewInformerFactory(kubeInformerFactory, kubeClient, osClient, shared.DefaultListerWatcherOverrides{}, 10*time.Minute)
plugin := NewOriginResourceQuota(kubeClient).(*originQuotaAdmission)
plugin.SetOriginQuotaRegistry(quota.NewOriginQuotaRegistry(osClient))
plugin.SetOriginQuotaRegistry(quota.NewOriginQuotaRegistry(informerFactory.ImageStreams(), osClient))
if err := plugin.Validate(); err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down
39 changes: 12 additions & 27 deletions pkg/quota/controller/replenishment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,53 +4,38 @@ import (
"fmt"
"reflect"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller/informers"
kresourcequota "k8s.io/kubernetes/pkg/controller/resourcequota"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"

osclient "github.com/openshift/origin/pkg/client"
"github.com/openshift/origin/pkg/controller/shared"
imageapi "github.com/openshift/origin/pkg/image/api"
)

// replenishmentControllerFactory implements ReplenishmentControllerFactory
type replenishmentControllerFactory struct {
osClient osclient.Interface
isInformer shared.ImageStreamInformer
}

var _ kresourcequota.ReplenishmentControllerFactory = &replenishmentControllerFactory{}

// NewReplenishmentControllerFactory returns a factory that knows how to build controllers
// to replenish resources when updated or deleted
func NewReplenishmentControllerFactory(osClient osclient.Interface) kresourcequota.ReplenishmentControllerFactory {
func NewReplenishmentControllerFactory(isInformer shared.ImageStreamInformer) kresourcequota.ReplenishmentControllerFactory {
return &replenishmentControllerFactory{
osClient: osClient,
isInformer: isInformer,
}
}

func (r *replenishmentControllerFactory) NewController(options *kresourcequota.ReplenishmentControllerOptions) (cache.ControllerInterface, error) {
switch options.GroupKind {
case imageapi.Kind("ImageStream"):
_, result := cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return r.osClient.ImageStreams(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return r.osClient.ImageStreams(api.NamespaceAll).Watch(options)
},
},
&imageapi.ImageStream{},
options.ResyncPeriod(),
cache.ResourceEventHandlerFuncs{
UpdateFunc: ImageStreamReplenishmentUpdateFunc(options),
DeleteFunc: kresourcequota.ObjectReplenishmentDeleteFunc(options),
},
)
return result, nil
r.isInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: ImageStreamReplenishmentUpdateFunc(options),
DeleteFunc: kresourcequota.ObjectReplenishmentDeleteFunc(options),
})
return r.isInformer.Informer().GetController(), nil
default:
return nil, fmt.Errorf("no replenishment controller available for %s", options.GroupKind)
}
Expand All @@ -68,9 +53,9 @@ func ImageStreamReplenishmentUpdateFunc(options *kresourcequota.ReplenishmentCon
}

// NewAllResourceReplenishmentControllerFactory returns a ReplenishmentControllerFactory that knows how to replenish all known resources
func NewAllResourceReplenishmentControllerFactory(informerFactory informers.SharedInformerFactory, osClient osclient.Interface, kubeClientSet clientset.Interface) kresourcequota.ReplenishmentControllerFactory {
func NewAllResourceReplenishmentControllerFactory(informerFactory shared.InformerFactory, osClient osclient.Interface, kubeClientSet clientset.Interface) kresourcequota.ReplenishmentControllerFactory {
return kresourcequota.UnionReplenishmentControllerFactory{
kresourcequota.NewReplenishmentControllerFactory(informerFactory, kubeClientSet),
NewReplenishmentControllerFactory(osClient),
kresourcequota.NewReplenishmentControllerFactory(informerFactory.KubernetesInformers(), kubeClientSet),
NewReplenishmentControllerFactory(informerFactory.ImageStreams()),
}
}
12 changes: 6 additions & 6 deletions pkg/quota/image/imagestream_evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (
"k8s.io/kubernetes/pkg/quota/generic"
"k8s.io/kubernetes/pkg/runtime"

osclient "github.com/openshift/origin/pkg/client"
oscache "github.com/openshift/origin/pkg/client/cache"
imageapi "github.com/openshift/origin/pkg/image/api"
)

const imageStreamEvaluatorName = "Evaluator.ImageStream"

// NewImageStreamEvaluator computes resource usage of ImageStreams. Instantiating this is necessary for
// resource quota admission controller to properly work on image stream related objects.
func NewImageStreamEvaluator(isNamespacer osclient.ImageStreamsNamespacer) kquota.Evaluator {
func NewImageStreamEvaluator(store *oscache.StoreToImageStreamLister) kquota.Evaluator {
allResources := []kapi.ResourceName{
imageapi.ResourceImageStreams,
}
Expand All @@ -31,13 +31,13 @@ func NewImageStreamEvaluator(isNamespacer osclient.ImageStreamsNamespacer) kquot
ConstraintsFunc: generic.ObjectCountConstraintsFunc(imageapi.ResourceImageStreams),
UsageFunc: generic.ObjectCountUsageFunc(imageapi.ResourceImageStreams),
ListFuncByNamespace: func(namespace string, options kapi.ListOptions) ([]runtime.Object, error) {
itemList, err := isNamespacer.ImageStreams(namespace).List(options)
list, err := store.ImageStreams(namespace).List(options.LabelSelector)
if err != nil {
return nil, err
}
results := make([]runtime.Object, 0, len(itemList.Items))
for i := range itemList.Items {
results = append(results, &itemList.Items[i])
results := make([]runtime.Object, 0, len(list))
for _, is := range list {
results = append(results, is)
}
return results, nil
},
Expand Down
36 changes: 25 additions & 11 deletions pkg/quota/image/imagestream_evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package image

import (
"testing"
"time"

kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
kquota "k8s.io/kubernetes/pkg/quota"

"github.com/openshift/origin/pkg/client/testclient"
oscache "github.com/openshift/origin/pkg/client/cache"
imagetest "github.com/openshift/origin/pkg/image/admission/testutil"
imageapi "github.com/openshift/origin/pkg/image/api"
)
Expand Down Expand Up @@ -79,10 +81,17 @@ func TestImageStreamEvaluatorUsageStats(t *testing.T) {
expectedISCount: 1,
},
} {
fakeClient := &testclient.Fake{}
fakeClient.AddReactor("list", "imagestreams", imagetest.GetFakeImageStreamListHandler(t, tc.iss...))

evaluator := NewImageStreamEvaluator(fakeClient)
isInformer := cache.NewSharedIndexInformer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a straight counting evaluator, right? If so, I'm not sure you need a test at all. The counting is checked upstream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know I like more then less testing 😉

&cache.ListWatch{},
&imageapi.ImageStream{},
2*time.Minute,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
store := oscache.StoreToImageStreamLister{Indexer: isInformer.GetIndexer()}
for _, is := range tc.iss {
store.Indexer.Add(&is)
}
evaluator := NewImageStreamEvaluator(&store)

stats, err := evaluator.UsageStats(kquota.UsageStatsOptions{Namespace: tc.namespace})
if err != nil {
Expand Down Expand Up @@ -158,18 +167,23 @@ func TestImageStreamEvaluatorUsage(t *testing.T) {
expectedISCount: 1,
},
} {

newIS := &imageapi.ImageStream{
ObjectMeta: kapi.ObjectMeta{
Namespace: "test",
Name: "is",
},
}

fakeClient := &testclient.Fake{}
fakeClient.AddReactor("get", "imagestreams", imagetest.GetFakeImageStreamGetHandler(t, tc.iss...))

evaluator := NewImageStreamEvaluator(fakeClient)
isInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{},
&imageapi.ImageStream{},
2*time.Minute,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
store := oscache.StoreToImageStreamLister{Indexer: isInformer.GetIndexer()}
for _, is := range tc.iss {
store.Indexer.Add(&is)
}
evaluator := NewImageStreamEvaluator(&store)

usage := evaluator.Usage(newIS)
expectedUsage := imagetest.ExpectedResourceListFor(tc.expectedISCount)
Expand Down
10 changes: 5 additions & 5 deletions pkg/quota/image/imagestreamimport_evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"

osclient "github.com/openshift/origin/pkg/client"
oscache "github.com/openshift/origin/pkg/client/cache"
imageapi "github.com/openshift/origin/pkg/image/api"
)

Expand All @@ -21,7 +21,7 @@ const imageStreamImportName = "Evaluator.ImageStreamImport"
// NewImageStreamImportEvaluator computes resource usage for ImageStreamImport objects. This particular kind
// is a virtual resource. It depends on ImageStream usage evaluator to compute image numbers before the
// the admission can work.
func NewImageStreamImportEvaluator(isNamespacer osclient.ImageStreamsNamespacer) kquota.Evaluator {
func NewImageStreamImportEvaluator(store *oscache.StoreToImageStreamLister) kquota.Evaluator {
computeResources := []kapi.ResourceName{
imageapi.ResourceImageStreams,
}
Expand All @@ -34,7 +34,7 @@ func NewImageStreamImportEvaluator(isNamespacer osclient.ImageStreamsNamespacer)
InternalOperationResources: map[admission.Operation][]kapi.ResourceName{admission.Create: computeResources},
MatchedResourceNames: computeResources,
MatchesScopeFunc: matchesScopeFunc,
UsageFunc: makeImageStreamImportAdmissionUsageFunc(isNamespacer),
UsageFunc: makeImageStreamImportAdmissionUsageFunc(store),
ListFuncByNamespace: func(namespace string, options kapi.ListOptions) ([]runtime.Object, error) {
return []runtime.Object{}, nil
},
Expand All @@ -51,7 +51,7 @@ func imageStreamImportConstraintsFunc(required []kapi.ResourceName, object runti
}

// makeImageStreamImportAdmissionUsageFunc returns a function for computing a usage of an image stream import.
func makeImageStreamImportAdmissionUsageFunc(isNamespacer osclient.ImageStreamsNamespacer) generic.UsageFunc {
func makeImageStreamImportAdmissionUsageFunc(store *oscache.StoreToImageStreamLister) generic.UsageFunc {
return func(object runtime.Object) kapi.ResourceList {
isi, ok := object.(*imageapi.ImageStreamImport)
if !ok {
Expand All @@ -66,7 +66,7 @@ func makeImageStreamImportAdmissionUsageFunc(isNamespacer osclient.ImageStreamsN
return usage
}

is, err := isNamespacer.ImageStreams(isi.Namespace).Get(isi.Name)
is, err := store.ImageStreams(isi.Namespace).Get(isi.Name)
if err != nil && !kerrors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("failed to list image streams: %v", err))
}
Expand Down
20 changes: 14 additions & 6 deletions pkg/quota/image/imagestreamimport_evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package image

import (
"testing"
"time"

kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
kquota "k8s.io/kubernetes/pkg/quota"

"github.com/openshift/origin/pkg/client/testclient"
oscache "github.com/openshift/origin/pkg/client/cache"
imagetest "github.com/openshift/origin/pkg/image/admission/testutil"
imageapi "github.com/openshift/origin/pkg/image/api"
)
Expand Down Expand Up @@ -158,11 +160,17 @@ func TestImageStreamImportEvaluatorUsage(t *testing.T) {
expectedISCount: 1,
},
} {

fakeClient := &testclient.Fake{}
fakeClient.AddReactor("get", "imagestreams", imagetest.GetFakeImageStreamGetHandler(t, tc.iss...))

evaluator := NewImageStreamImportEvaluator(fakeClient)
isInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{},
&imageapi.ImageStream{},
2*time.Minute,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
store := oscache.StoreToImageStreamLister{Indexer: isInformer.GetIndexer()}
for _, is := range tc.iss {
store.Indexer.Add(&is)
}
evaluator := NewImageStreamImportEvaluator(&store)

isi := &imageapi.ImageStreamImport{
ObjectMeta: kapi.ObjectMeta{
Expand Down
9 changes: 5 additions & 4 deletions pkg/quota/image/imagestreamtag_evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ import (
utilruntime "k8s.io/kubernetes/pkg/util/runtime"

osclient "github.com/openshift/origin/pkg/client"
oscache "github.com/openshift/origin/pkg/client/cache"
imageapi "github.com/openshift/origin/pkg/image/api"
)

const imageStreamTagEvaluatorName = "Evaluator.ImageStreamTag"

// NewImageStreamTagEvaluator computes resource usage of ImageStreamsTags. Its sole purpose is to handle
// UPDATE admission operations on imageStreamTags resource.
func NewImageStreamTagEvaluator(istNamespacer osclient.ImageStreamTagsNamespacer, isNamespacer osclient.ImageStreamsNamespacer) kquota.Evaluator {
func NewImageStreamTagEvaluator(store *oscache.StoreToImageStreamLister, istNamespacer osclient.ImageStreamTagsNamespacer) kquota.Evaluator {
computeResources := []kapi.ResourceName{
imageapi.ResourceImageStreams,
}
Expand Down Expand Up @@ -56,7 +57,7 @@ func NewImageStreamTagEvaluator(istNamespacer osclient.ImageStreamTagsNamespacer
},
MatchedResourceNames: computeResources,
MatchesScopeFunc: matchesScopeFunc,
UsageFunc: makeImageStreamTagAdmissionUsageFunc(isNamespacer),
UsageFunc: makeImageStreamTagAdmissionUsageFunc(store),
GetFuncByNamespace: getFuncByNamespace,
ListFuncByNamespace: func(namespace string, options kapi.ListOptions) ([]runtime.Object, error) {
return []runtime.Object{}, nil
Expand All @@ -75,7 +76,7 @@ func imageStreamTagConstraintsFunc(required []kapi.ResourceName, object runtime.

// makeImageStreamTagAdmissionUsageFunc returns a function that computes a resource usage for given image
// stream tag during admission.
func makeImageStreamTagAdmissionUsageFunc(isNamespacer osclient.ImageStreamsNamespacer) generic.UsageFunc {
func makeImageStreamTagAdmissionUsageFunc(store *oscache.StoreToImageStreamLister) generic.UsageFunc {
return func(object runtime.Object) kapi.ResourceList {
ist, ok := object.(*imageapi.ImageStreamTag)
if !ok {
Expand All @@ -92,7 +93,7 @@ func makeImageStreamTagAdmissionUsageFunc(isNamespacer osclient.ImageStreamsName
return kapi.ResourceList{}
}

is, err := isNamespacer.ImageStreams(ist.Namespace).Get(isName)
is, err := store.ImageStreams(ist.Namespace).Get(isName)
if err != nil && !kerrors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("failed to get image stream %s/%s: %v", ist.Namespace, isName, err))
}
Expand Down
Loading