Skip to content

Commit 521c66f

Browse files
committed
run kube controllers separately based on their command
1 parent d2bfaa2 commit 521c66f

File tree

3 files changed

+256
-110
lines changed

3 files changed

+256
-110
lines changed

pkg/cmd/server/start/controllers.go

+1-42
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
package start
22

33
import (
4-
"k8s.io/apimachinery/pkg/runtime"
5-
"k8s.io/apimachinery/pkg/runtime/schema"
64
"k8s.io/client-go/rest"
75
cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app"
86
cmappoptions "k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
9-
kexternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
107
"k8s.io/kubernetes/pkg/cloudprovider"
118
"k8s.io/kubernetes/pkg/controller"
129

@@ -46,45 +43,7 @@ func getControllerContext(options configapi.MasterConfig, controllerManagerOptio
4643
AuthenticationClient: kubeExternal.Authentication(),
4744
Namespace: "kube-system",
4845
},
49-
InformerFactory: genericInformers{
50-
SharedInformerFactory: informers.GetExternalKubeInformers(),
51-
generic: []GenericResourceInformer{
52-
// use our existing internal informers to satisfy the generic informer requests (which don't require strong
53-
// types).
54-
genericInternalResourceInformerFunc(func(resource schema.GroupVersionResource) (kexternalinformers.GenericInformer, error) {
55-
return informers.appInformers.ForResource(resource)
56-
}),
57-
genericInternalResourceInformerFunc(func(resource schema.GroupVersionResource) (kexternalinformers.GenericInformer, error) {
58-
return informers.authorizationInformers.ForResource(resource)
59-
}),
60-
genericInternalResourceInformerFunc(func(resource schema.GroupVersionResource) (kexternalinformers.GenericInformer, error) {
61-
return informers.buildInformers.ForResource(resource)
62-
}),
63-
genericInternalResourceInformerFunc(func(resource schema.GroupVersionResource) (kexternalinformers.GenericInformer, error) {
64-
return informers.imageInformers.ForResource(resource)
65-
}),
66-
genericInternalResourceInformerFunc(func(resource schema.GroupVersionResource) (kexternalinformers.GenericInformer, error) {
67-
return informers.quotaInformers.ForResource(resource)
68-
}),
69-
genericInternalResourceInformerFunc(func(resource schema.GroupVersionResource) (kexternalinformers.GenericInformer, error) {
70-
return informers.securityInformers.ForResource(resource)
71-
}),
72-
genericInternalResourceInformerFunc(func(resource schema.GroupVersionResource) (kexternalinformers.GenericInformer, error) {
73-
return informers.templateInformers.ForResource(resource)
74-
}),
75-
informers.externalKubeInformers,
76-
genericInternalResourceInformerFunc(func(resource schema.GroupVersionResource) (kexternalinformers.GenericInformer, error) {
77-
return informers.internalKubeInformers.ForResource(resource)
78-
}),
79-
},
80-
bias: map[schema.GroupVersionResource]schema.GroupVersionResource{
81-
{Group: "rbac.authorization.k8s.io", Resource: "rolebindings", Version: "v1beta1"}: {Group: "rbac.authorization.k8s.io", Resource: "rolebindings", Version: runtime.APIVersionInternal},
82-
{Group: "rbac.authorization.k8s.io", Resource: "clusterrolebindings", Version: "v1beta1"}: {Group: "rbac.authorization.k8s.io", Resource: "clusterrolebindings", Version: runtime.APIVersionInternal},
83-
{Group: "rbac.authorization.k8s.io", Resource: "roles", Version: "v1beta1"}: {Group: "rbac.authorization.k8s.io", Resource: "roles", Version: runtime.APIVersionInternal},
84-
{Group: "rbac.authorization.k8s.io", Resource: "clusterroles", Version: "v1beta1"}: {Group: "rbac.authorization.k8s.io", Resource: "clusterroles", Version: runtime.APIVersionInternal},
85-
{Group: "", Resource: "securitycontextconstraints", Version: "v1"}: {Group: "", Resource: "securitycontextconstraints", Version: runtime.APIVersionInternal},
86-
},
87-
},
46+
InformerFactory: newGenericInformers(informers),
8847
Options: *controllerManagerOptions,
8948
AvailableResources: availableResources,
9049
Cloud: cloudProvider,

pkg/cmd/server/start/start_kube_controller_manager.go

+242-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,27 @@
11
package start
22

33
import (
4-
"github.com/openshift/origin/pkg/cmd/server/bootstrappolicy"
4+
"strconv"
5+
6+
"github.com/golang/glog"
7+
"github.com/spf13/pflag"
8+
9+
"k8s.io/apimachinery/pkg/runtime"
10+
"k8s.io/apimachinery/pkg/runtime/schema"
11+
kerrors "k8s.io/apimachinery/pkg/util/errors"
12+
controllerapp "k8s.io/kubernetes/cmd/kube-controller-manager/app"
13+
controlleroptions "k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
514
"k8s.io/kubernetes/pkg/api/v1"
615
kapiv1 "k8s.io/kubernetes/pkg/api/v1"
16+
kexternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
17+
kinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
18+
"k8s.io/kubernetes/pkg/controller"
719
"k8s.io/kubernetes/pkg/volume"
20+
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
21+
22+
"github.com/openshift/origin/pkg/cmd/server/bootstrappolicy"
23+
cmdflags "github.com/openshift/origin/pkg/cmd/util/flags"
24+
"k8s.io/kubernetes/pkg/apis/componentconfig"
825
)
926

1027
// newPersistentVolumeRecyclerPodTemplate provides a function which makes our recycler pod template for use in the kube-controller-manager
@@ -26,3 +43,227 @@ func newPersistentVolumeRecyclerPodTemplate(recyclerImageName string) func() *v1
2643
return defaultScrubPod
2744
}
2845
}
46+
47+
// newControllerContext provides a function which overrides the default and plugs a different set of informers in
48+
func newControllerContext(informers *informers) func(s *controlleroptions.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (controllerapp.ControllerContext, error) {
49+
oldContextFunc := controllerapp.CreateControllerContext
50+
return func(s *controlleroptions.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (controllerapp.ControllerContext, error) {
51+
ret, err := oldContextFunc(s, rootClientBuilder, clientBuilder, stop)
52+
if err != nil {
53+
return controllerapp.ControllerContext{}, err
54+
}
55+
56+
// Overwrite the informers. Since nothing accessed the existing informers that we're overwriting, they are inert.
57+
// TODO Remove this. It keeps in-process memory utilization down, but we shouldn't do it.
58+
ret.InformerFactory = newGenericInformers(informers)
59+
60+
return ret, nil
61+
}
62+
}
63+
64+
func kubeControllerManagerAddFlags(cmserver *controlleroptions.CMServer) func(flags *pflag.FlagSet) {
65+
return func(flags *pflag.FlagSet) {
66+
cmserver.AddFlags(flags, controllerapp.KnownControllers(), controllerapp.ControllersDisabledByDefault.List())
67+
}
68+
}
69+
70+
func newKubeControllerManager(kubeconfigFile, saPrivateKeyFile, saRootCAFile, podEvictionTimeout string, dynamicProvisioningEnabled bool, cmdLineArgs map[string][]string) (*controlleroptions.CMServer, error) {
71+
if cmdLineArgs == nil {
72+
cmdLineArgs = map[string][]string{}
73+
}
74+
75+
if _, ok := cmdLineArgs["controllers"]; !ok {
76+
cmdLineArgs["controllers"] = []string{
77+
"*", // start everything but the exceptions}
78+
// we don't appear to use this
79+
"-ttl",
80+
// we have to configure this separately until it is generic
81+
"-horizontalpodautoscaling",
82+
// we carry patches on this. For now....
83+
"-serviceaccount-token",
84+
}
85+
}
86+
if _, ok := cmdLineArgs["service-account-private-key-file"]; !ok {
87+
cmdLineArgs["service-account-private-key-file"] = []string{saPrivateKeyFile}
88+
}
89+
if _, ok := cmdLineArgs["root-ca-file"]; !ok {
90+
cmdLineArgs["root-ca-file"] = []string{saRootCAFile}
91+
}
92+
if _, ok := cmdLineArgs["kubeconfig"]; !ok {
93+
cmdLineArgs["kubeconfig"] = []string{kubeconfigFile}
94+
}
95+
if _, ok := cmdLineArgs["pod-eviction-timeout"]; !ok {
96+
cmdLineArgs["pod-eviction-timeout"] = []string{podEvictionTimeout}
97+
}
98+
if _, ok := cmdLineArgs["enable-dynamic-provisioning"]; !ok {
99+
cmdLineArgs["enable-dynamic-provisioning"] = []string{strconv.FormatBool(dynamicProvisioningEnabled)}
100+
}
101+
102+
// disable serving http since we didn't used to expose it
103+
if _, ok := cmdLineArgs["port"]; !ok {
104+
cmdLineArgs["port"] = []string{"-1"}
105+
}
106+
107+
// these force "default" values to match what we want
108+
if _, ok := cmdLineArgs["use-service-account-credentials"]; !ok {
109+
cmdLineArgs["use-service-account-credentials"] = []string{"true"}
110+
}
111+
if _, ok := cmdLineArgs["cluster-signing-cert-file"]; !ok {
112+
cmdLineArgs["cluster-signing-cert-file"] = []string{""}
113+
}
114+
if _, ok := cmdLineArgs["cluster-signing-key-file"]; !ok {
115+
cmdLineArgs["cluster-signing-key-file"] = []string{""}
116+
}
117+
if _, ok := cmdLineArgs["experimental-cluster-signing-duration"]; !ok {
118+
cmdLineArgs["experimental-cluster-signing-duration"] = []string{"0s"}
119+
}
120+
if _, ok := cmdLineArgs["leader-elect-retry-period"]; !ok {
121+
cmdLineArgs["leader-elect-retry-period"] = []string{"3s"}
122+
}
123+
if _, ok := cmdLineArgs["leader-elect-resource-lock"]; !ok {
124+
cmdLineArgs["leader-elect-resource-lock"] = []string{"configmaps"}
125+
}
126+
127+
// resolve arguments
128+
controllerManager := controlleroptions.NewCMServer()
129+
if err := cmdflags.Resolve(cmdLineArgs, kubeControllerManagerAddFlags(controllerManager)); len(err) > 0 {
130+
return nil, kerrors.NewAggregate(err)
131+
}
132+
133+
// TODO make this configurable or discoverable. This is going to prevent us from running the stock GC controller
134+
// IF YOU ADD ANYTHING TO THIS LIST, MAKE SURE THAT YOU UPDATE THEIR STRATEGIES TO PREVENT GC FINALIZERS
135+
controllerManager.GCIgnoredResources = append(controllerManager.GCIgnoredResources,
136+
// explicitly disabled from GC for now - not enough value to track them
137+
componentconfig.GroupResource{Group: "authorization.openshift.io", Resource: "rolebindingrestrictions"},
138+
componentconfig.GroupResource{Group: "network.openshift.io", Resource: "clusternetworks"},
139+
componentconfig.GroupResource{Group: "network.openshift.io", Resource: "egressnetworkpolicies"},
140+
componentconfig.GroupResource{Group: "network.openshift.io", Resource: "hostsubnets"},
141+
componentconfig.GroupResource{Group: "network.openshift.io", Resource: "netnamespaces"},
142+
componentconfig.GroupResource{Group: "oauth.openshift.io", Resource: "oauthclientauthorizations"},
143+
componentconfig.GroupResource{Group: "oauth.openshift.io", Resource: "oauthclients"},
144+
componentconfig.GroupResource{Group: "quota.openshift.io", Resource: "clusterresourcequotas"},
145+
componentconfig.GroupResource{Group: "user.openshift.io", Resource: "groups"},
146+
componentconfig.GroupResource{Group: "user.openshift.io", Resource: "identities"},
147+
componentconfig.GroupResource{Group: "user.openshift.io", Resource: "users"},
148+
componentconfig.GroupResource{Group: "image.openshift.io", Resource: "images"},
149+
150+
// virtual resource
151+
componentconfig.GroupResource{Group: "project.openshift.io", Resource: "projects"},
152+
// these resources contain security information in their names, and we don't need to track them
153+
componentconfig.GroupResource{Group: "oauth.openshift.io", Resource: "oauthaccesstokens"},
154+
componentconfig.GroupResource{Group: "oauth.openshift.io", Resource: "oauthauthorizetokens"},
155+
// exposed already as cronjobs
156+
componentconfig.GroupResource{Group: "batch", Resource: "scheduledjobs"},
157+
// exposed already as extensions v1beta1 by other controllers
158+
componentconfig.GroupResource{Group: "apps", Resource: "deployments"},
159+
// exposed as autoscaling v1
160+
componentconfig.GroupResource{Group: "extensions", Resource: "horizontalpodautoscalers"},
161+
// exposed as security.openshift.io v1
162+
componentconfig.GroupResource{Group: "", Resource: "securitycontextconstraints"},
163+
)
164+
165+
return controllerManager, nil
166+
}
167+
168+
func runEmbeddedKubeControllerManager(kubeconfigFile, saPrivateKeyFile, saRootCAFile, podEvictionTimeout string, dynamicProvisioningEnabled bool, cmdLineArgs map[string][]string,
169+
recyclerImage string, informers *informers) {
170+
volume.NewPersistentVolumeRecyclerPodTemplate = newPersistentVolumeRecyclerPodTemplate(recyclerImage)
171+
controllerapp.CreateControllerContext = newControllerContext(informers)
172+
controllerapp.StartInformers = func(stop <-chan struct{}) {
173+
informers.Start(stop)
174+
}
175+
176+
// TODO we need a real identity for this. Right now it's just using the loopback connection like it used to.
177+
controllerManager, err := newKubeControllerManager(kubeconfigFile, saPrivateKeyFile, saRootCAFile, podEvictionTimeout, dynamicProvisioningEnabled, cmdLineArgs)
178+
if err != nil {
179+
glog.Fatal(err)
180+
}
181+
// this does a second leader election, but doing the second leader election will allow us to move out process in
182+
// 3.8 if we so choose.
183+
if err := controllerapp.Run(controllerManager); err != nil {
184+
glog.Fatal(err)
185+
}
186+
}
187+
188+
type GenericResourceInformer interface {
189+
ForResource(resource schema.GroupVersionResource) (kinformers.GenericInformer, error)
190+
}
191+
192+
// genericInternalResourceInformerFunc will return an internal informer for any resource matching
193+
// its group resource, instead of the external version. Only valid for use where the type is accessed
194+
// via generic interfaces, such as the garbage collector with ObjectMeta.
195+
type genericInternalResourceInformerFunc func(resource schema.GroupVersionResource) (kinformers.GenericInformer, error)
196+
197+
func (fn genericInternalResourceInformerFunc) ForResource(resource schema.GroupVersionResource) (kinformers.GenericInformer, error) {
198+
resource.Version = runtime.APIVersionInternal
199+
return fn(resource)
200+
}
201+
202+
type genericInformers struct {
203+
kinformers.SharedInformerFactory
204+
generic []GenericResourceInformer
205+
// bias is a map that tries loading an informer from another GVR before using the original
206+
bias map[schema.GroupVersionResource]schema.GroupVersionResource
207+
}
208+
209+
func newGenericInformers(informers *informers) genericInformers {
210+
return genericInformers{
211+
SharedInformerFactory: informers.GetExternalKubeInformers(),
212+
generic: []GenericResourceInformer{
213+
// use our existing internal informers to satisfy the generic informer requests (which don't require strong
214+
// types).
215+
genericInternalResourceInformerFunc(func(resource schema.GroupVersionResource) (kexternalinformers.GenericInformer, error) {
216+
return informers.appInformers.ForResource(resource)
217+
}),
218+
genericInternalResourceInformerFunc(func(resource schema.GroupVersionResource) (kexternalinformers.GenericInformer, error) {
219+
return informers.authorizationInformers.ForResource(resource)
220+
}),
221+
genericInternalResourceInformerFunc(func(resource schema.GroupVersionResource) (kexternalinformers.GenericInformer, error) {
222+
return informers.buildInformers.ForResource(resource)
223+
}),
224+
genericInternalResourceInformerFunc(func(resource schema.GroupVersionResource) (kexternalinformers.GenericInformer, error) {
225+
return informers.imageInformers.ForResource(resource)
226+
}),
227+
genericInternalResourceInformerFunc(func(resource schema.GroupVersionResource) (kexternalinformers.GenericInformer, error) {
228+
return informers.quotaInformers.ForResource(resource)
229+
}),
230+
genericInternalResourceInformerFunc(func(resource schema.GroupVersionResource) (kexternalinformers.GenericInformer, error) {
231+
return informers.securityInformers.ForResource(resource)
232+
}),
233+
genericInternalResourceInformerFunc(func(resource schema.GroupVersionResource) (kexternalinformers.GenericInformer, error) {
234+
return informers.templateInformers.ForResource(resource)
235+
}),
236+
informers.externalKubeInformers,
237+
genericInternalResourceInformerFunc(func(resource schema.GroupVersionResource) (kexternalinformers.GenericInformer, error) {
238+
return informers.internalKubeInformers.ForResource(resource)
239+
}),
240+
},
241+
bias: map[schema.GroupVersionResource]schema.GroupVersionResource{
242+
{Group: "rbac.authorization.k8s.io", Resource: "rolebindings", Version: "v1beta1"}: {Group: "rbac.authorization.k8s.io", Resource: "rolebindings", Version: runtime.APIVersionInternal},
243+
{Group: "rbac.authorization.k8s.io", Resource: "clusterrolebindings", Version: "v1beta1"}: {Group: "rbac.authorization.k8s.io", Resource: "clusterrolebindings", Version: runtime.APIVersionInternal},
244+
{Group: "rbac.authorization.k8s.io", Resource: "roles", Version: "v1beta1"}: {Group: "rbac.authorization.k8s.io", Resource: "roles", Version: runtime.APIVersionInternal},
245+
{Group: "rbac.authorization.k8s.io", Resource: "clusterroles", Version: "v1beta1"}: {Group: "rbac.authorization.k8s.io", Resource: "clusterroles", Version: runtime.APIVersionInternal},
246+
{Group: "", Resource: "securitycontextconstraints", Version: "v1"}: {Group: "", Resource: "securitycontextconstraints", Version: runtime.APIVersionInternal},
247+
},
248+
}
249+
}
250+
251+
func (i genericInformers) ForResource(resource schema.GroupVersionResource) (kinformers.GenericInformer, error) {
252+
if try, ok := i.bias[resource]; ok {
253+
if res, err := i.ForResource(try); err == nil {
254+
return res, nil
255+
}
256+
}
257+
258+
informer, firstErr := i.SharedInformerFactory.ForResource(resource)
259+
if firstErr == nil {
260+
return informer, nil
261+
}
262+
for _, generic := range i.generic {
263+
if informer, err := generic.ForResource(resource); err == nil {
264+
return informer, nil
265+
}
266+
}
267+
glog.V(4).Infof("Couldn't find informer for %v", resource)
268+
return nil, firstErr
269+
}

0 commit comments

Comments
 (0)