Skip to content

Commit 71c77aa

Browse files
committed
wire in aggregator
1 parent 6636649 commit 71c77aa

File tree

4 files changed

+196
-4
lines changed

4 files changed

+196
-4
lines changed

pkg/cmd/server/api/validation/master.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ func ValidateMasterConfig(config *api.MasterConfig, fldPath *field.Path) Validat
194194
validationResults.Append(ValidateControllerConfig(config.ControllerConfig, fldPath.Child("controllerConfig")))
195195
validationResults.Append(ValidateAuditConfig(config.AuditConfig, fldPath.Child("auditConfig")))
196196
validationResults.Append(ValidateMasterAuthConfig(config.AuthConfig, fldPath.Child("authConfig")))
197-
validationResults.Append(ValidateAggregatorConfig(config.AuthConfig, fldPath.Child("aggregatorConfig")))
197+
validationResults.Append(ValidateAggregatorConfig(config.AggregatorConfig, fldPath.Child("aggregatorConfig")))
198198

199199
return validationResults
200200
}

pkg/cmd/server/origin/aggregator.go

+171
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package app does all of the work necessary to create a Kubernetes
18+
// APIServer by binding together the API, master and APIServer infrastructure.
19+
// It can be configured and called directly or via the hyperkube framework.
20+
package origin
21+
22+
import (
23+
"fmt"
24+
"io/ioutil"
25+
"net/http"
26+
"strings"
27+
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/labels"
30+
"k8s.io/apimachinery/pkg/runtime/schema"
31+
genericapiserver "k8s.io/apiserver/pkg/server"
32+
"k8s.io/apiserver/pkg/server/healthz"
33+
kubeclientset "k8s.io/client-go/kubernetes"
34+
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
35+
"k8s.io/kube-aggregator/pkg/apis/apiregistration/install"
36+
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
37+
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion"
38+
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
39+
kapi "k8s.io/kubernetes/pkg/api"
40+
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
41+
)
42+
43+
func (c *MasterConfig) createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config) (*aggregatorapiserver.Config, error) {
44+
// make a shallow copy to let us twiddle a few things
45+
// most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the aggregator
46+
genericConfig := kubeAPIServerConfig
47+
48+
// the aggregator doesn't wire these up. It just delegates them to the kubeapiserver
49+
genericConfig.EnableSwaggerUI = false
50+
genericConfig.OpenAPIConfig = nil
51+
genericConfig.SwaggerConfig = nil
52+
genericConfig.FallThroughHandler = nil
53+
54+
// install our types into the scheme so that "normal" RESTOptionsGetters can work for us
55+
install.Install(kapi.GroupFactoryRegistry, kapi.Registry, kapi.Scheme)
56+
57+
client, err := kubeclientset.NewForConfig(genericConfig.LoopbackClientConfig)
58+
if err != nil {
59+
return nil, err
60+
}
61+
62+
certBytes, err := ioutil.ReadFile(c.Options.AggregatorConfig.ProxyClientInfo.CertFile)
63+
if err != nil {
64+
return nil, err
65+
}
66+
keyBytes, err := ioutil.ReadFile(c.Options.AggregatorConfig.ProxyClientInfo.KeyFile)
67+
if err != nil {
68+
return nil, err
69+
}
70+
return &aggregatorapiserver.Config{
71+
GenericConfig: &genericConfig,
72+
CoreAPIServerClient: client,
73+
ProxyClientCert: certBytes,
74+
ProxyClientKey: keyBytes,
75+
}, nil
76+
}
77+
78+
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
79+
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer, stopCh)
80+
if err != nil {
81+
return nil, err
82+
}
83+
84+
// create controllers for auto-registration
85+
apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig)
86+
if err != nil {
87+
return nil, err
88+
}
89+
autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().InternalVersion().APIServices(), apiRegistrationClient)
90+
apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
91+
92+
aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
93+
go autoRegistrationController.Run(5, stopCh)
94+
return nil
95+
})
96+
aggregatorServer.GenericAPIServer.AddHealthzChecks(healthz.NamedCheck("autoregister-completion", func(r *http.Request) error {
97+
items, err := aggregatorServer.APIRegistrationInformers.Apiregistration().InternalVersion().APIServices().Lister().List(labels.Everything())
98+
if err != nil {
99+
return err
100+
}
101+
102+
missing := []apiregistration.APIService{}
103+
for _, apiService := range apiServices {
104+
found := false
105+
for _, item := range items {
106+
if item.Name == apiService.Name {
107+
found = true
108+
break
109+
}
110+
}
111+
112+
if !found {
113+
missing = append(missing, *apiService)
114+
}
115+
}
116+
117+
if len(missing) > 0 {
118+
return fmt.Errorf("missing APIService: %v", missing)
119+
}
120+
return nil
121+
}))
122+
123+
return aggregatorServer, nil
124+
}
125+
126+
func makeAPIService(gv schema.GroupVersion) *apiregistration.APIService {
127+
return &apiregistration.APIService{
128+
ObjectMeta: metav1.ObjectMeta{Name: gv.Version + "." + gv.Group},
129+
Spec: apiregistration.APIServiceSpec{
130+
Group: gv.Group,
131+
Version: gv.Version,
132+
Priority: 100,
133+
},
134+
}
135+
}
136+
137+
func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget, registration autoregister.AutoAPIServiceRegistration) []*apiregistration.APIService {
138+
apiServices := []*apiregistration.APIService{}
139+
140+
for _, curr := range delegateAPIServer.ListedPaths() {
141+
if curr == "/api/v1" {
142+
apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"})
143+
registration.AddAPIServiceToSync(apiService)
144+
apiServices = append(apiServices, apiService)
145+
continue
146+
}
147+
148+
if !strings.HasPrefix(curr, "/apis/") {
149+
continue
150+
}
151+
// this comes back in a list that looks like /apis/rbac.authorization.k8s.io/v1alpha1
152+
tokens := strings.Split(curr, "/")
153+
if len(tokens) != 4 {
154+
continue
155+
}
156+
157+
apiService := makeAPIService(schema.GroupVersion{Group: tokens[2], Version: tokens[3]})
158+
159+
// TODO this is probably an indication that we need explicit and precise control over the discovery chain
160+
// but for now its a special case
161+
// apps has to come last for compatibility with 1.5 kubectl clients
162+
if apiService.Spec.Group == "apps" {
163+
apiService.Spec.Priority = 110
164+
}
165+
166+
registration.AddAPIServiceToSync(apiService)
167+
apiServices = append(apiServices, apiService)
168+
}
169+
170+
return apiServices
171+
}

pkg/cmd/server/origin/master.go

+23-2
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ var excludedV1Types = sets.NewString()
172172

173173
// Run launches the OpenShift master by creating a kubernetes master, installing
174174
// OpenShift APIs into it and then running it.
175-
func (c *MasterConfig) Run(kc *kubernetes.MasterConfig, assetConfig *AssetConfig) {
175+
func (c *MasterConfig) Run(kc *kubernetes.MasterConfig, assetConfig *AssetConfig, stopCh <-chan struct{}) {
176176
var (
177177
messages []string
178178
err error
@@ -193,7 +193,28 @@ func (c *MasterConfig) Run(kc *kubernetes.MasterConfig, assetConfig *AssetConfig
193193
for _, s := range messages {
194194
glog.Infof(s, c.Options.ServingInfo.BindAddress)
195195
}
196-
go kmaster.GenericAPIServer.PrepareRun().Run(utilwait.NeverStop)
196+
197+
apiserver := kmaster.GenericAPIServer.PrepareRun()
198+
199+
// presence of the key indicates whether or not to enable the aggregator
200+
if len(c.Options.AggregatorConfig.ProxyClientInfo.KeyFile) == 0 {
201+
go apiserver.Run(utilwait.NeverStop)
202+
203+
// Attempt to verify the server came up for 20 seconds (100 tries * 100ms, 100ms timeout per try)
204+
cmdutil.WaitForSuccessfulDial(c.TLS, c.Options.ServingInfo.BindNetwork, c.Options.ServingInfo.BindAddress, 100*time.Millisecond, 100*time.Millisecond, 100)
205+
return
206+
}
207+
208+
aggregatorConfig, err := c.createAggregatorConfig(*kc.Master.GenericConfig)
209+
if err != nil {
210+
glog.Fatalf("Failed to launch master: %v", err)
211+
}
212+
aggregatorServer, err := createAggregatorServer(aggregatorConfig, apiserver.GenericAPIServer, kc.Informers.InternalKubernetesInformers(), stopCh)
213+
if err != nil {
214+
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
215+
glog.Fatalf("Failed to launch master: %v", err)
216+
}
217+
go aggregatorServer.GenericAPIServer.PrepareRun().Run(stopCh)
197218

198219
// Attempt to verify the server came up for 20 seconds (100 tries * 100ms, 100ms timeout per try)
199220
cmdutil.WaitForSuccessfulDial(c.TLS, c.Options.ServingInfo.BindNetwork, c.Options.ServingInfo.BindAddress, 100*time.Millisecond, 100*time.Millisecond, 100)

pkg/cmd/server/start/start_master.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ func StartAPI(oc *origin.MasterConfig, kc *kubernetes.MasterConfig) error {
507507
}
508508
}
509509

510-
oc.Run(kc, embeddedAssetConfig)
510+
oc.Run(kc, embeddedAssetConfig, utilwait.NeverStop)
511511

512512
// start DNS before the informers are started because it adds a ClusterIP index.
513513
if oc.Options.DNSConfig != nil {

0 commit comments

Comments
 (0)