Skip to content

Commit 9f5a2a7

Browse files
atiratreebertinatto
authored andcommitted
UPSTREAM: <carry>: merge v3 openapi discovery and specs for special groups
that have kinds that are served by both CRDs and external apiservers (eg openshift-apiserver) this includes: - authorization.openshift.io (rolebindingrestrictions served by a CRD) - security.openshift.io (securitycontextconstraints served by a CRD) - quota.openshift.io (clusterresourcequotas served by a CRD) By merging all sources, we ensure that kinds served by a CRD will have openapi discovery and spec available even when openshift-apiserver is unavailable.
1 parent 65e3185 commit 9f5a2a7

File tree

3 files changed

+374
-9
lines changed

3 files changed

+374
-9
lines changed

staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator.go

+22-9
Original file line numberDiff line numberDiff line change
@@ -238,20 +238,18 @@ func (s *specProxier) getOpenAPIV3Root() handler3.OpenAPIV3Discovery {
238238
s.rwMutex.RLock()
239239
defer s.rwMutex.RUnlock()
240240

241-
merged := handler3.OpenAPIV3Discovery{
242-
Paths: make(map[string]handler3.OpenAPIV3DiscoveryGroupVersion),
243-
}
241+
paths := make(map[string][]handler3.OpenAPIV3DiscoveryGroupVersion)
244242

245243
for _, apiServiceInfo := range s.apiServiceInfo {
246244
if apiServiceInfo.discovery == nil {
247245
continue
248246
}
249247

250248
for key, item := range apiServiceInfo.discovery.Paths {
251-
merged.Paths[key] = item
249+
paths[key] = append(paths[key], item)
252250
}
253251
}
254-
return merged
252+
return mergeOpenAPIV3RootPaths(paths)
255253
}
256254

257255
// handleDiscovery is the handler for OpenAPI V3 Discovery
@@ -278,18 +276,33 @@ func (s *specProxier) handleGroupVersion(w http.ResponseWriter, r *http.Request)
278276
url := strings.SplitAfterN(r.URL.Path, "/", 4)
279277
targetGV := url[3]
280278

279+
var eligibleURLs []string
280+
eligibleURLsToAPIServiceInfos := make(map[string]*openAPIV3APIServiceInfo)
281+
281282
for _, apiServiceInfo := range s.apiServiceInfo {
282283
if apiServiceInfo.discovery == nil {
283284
continue
284285
}
285286

286-
for key := range apiServiceInfo.discovery.Paths {
287-
if targetGV == key {
288-
apiServiceInfo.handler.ServeHTTP(w, r)
289-
return
287+
for key, value := range apiServiceInfo.discovery.Paths {
288+
if targetGV == key && eligibleURLsToAPIServiceInfos[value.ServerRelativeURL] == nil {
289+
// add only apiServices that do not duplicate ServerRelativeURL (path + hash)
290+
eligibleURLsToAPIServiceInfos[value.ServerRelativeURL] = apiServiceInfo
291+
eligibleURLs = append(eligibleURLs, value.ServerRelativeURL)
292+
break
290293
}
291294
}
295+
if len(eligibleURLsToAPIServiceInfos) > 0 && !strings.HasPrefix(targetGV, "apis/") {
296+
// do not search for duplicates that are not part of apis/ prefix (eg. /version)
297+
break
298+
}
292299
}
300+
301+
if len(eligibleURLs) > 0 {
302+
delegateAndMergeHandleGroupVersion(w, r, eligibleURLs, eligibleURLsToAPIServiceInfos)
303+
return
304+
}
305+
293306
// No group-versions match the desired request
294307
w.WriteHeader(404)
295308
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
package aggregator
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"crypto/sha512"
7+
"encoding/json"
8+
"fmt"
9+
"net/http"
10+
neturl "net/url"
11+
"sort"
12+
"strconv"
13+
"strings"
14+
"time"
15+
16+
"github.com/munnerz/goautoneg"
17+
18+
"k8s.io/apimachinery/pkg/util/sets"
19+
"k8s.io/apiserver/pkg/authentication/user"
20+
"k8s.io/apiserver/pkg/endpoints/request"
21+
"k8s.io/apiserver/pkg/util/responsewriter"
22+
"k8s.io/klog/v2"
23+
"k8s.io/kube-openapi/pkg/handler3"
24+
"k8s.io/kube-openapi/pkg/spec3"
25+
)
26+
27+
// mergeOpenAPIV3RootPaths expects mapping of openapi v3 sub url key to multiple serverRelativeURLs
28+
// these URLs can be backed by different ApiServices or CRDs.
29+
//
30+
// We expect duplicates for the following groups:
31+
// authorization.openshift.io, security.openshift.io and quota.openshift.io
32+
// which are backed by both CRD apiextension apiserver and openshift apiserver.
33+
func mergeOpenAPIV3RootPaths(paths map[string][]handler3.OpenAPIV3DiscoveryGroupVersion) handler3.OpenAPIV3Discovery {
34+
merged := handler3.OpenAPIV3Discovery{
35+
Paths: make(map[string]handler3.OpenAPIV3DiscoveryGroupVersion),
36+
}
37+
38+
for key, delegationURLs := range paths {
39+
// some apiservices can have duplicate paths in openapi v3 discovery (same path and hash) as they are backed by the same apiserver
40+
delegationUniqueURLs := sets.List(toUniqueRelativeURLs(delegationURLs))
41+
// we either have just one url or a special URL like a /version
42+
if len(delegationUniqueURLs) == 1 || (len(delegationUniqueURLs) > 1 && !hasPrefix(delegationUniqueURLs, "/openapi/v3/apis/")) {
43+
merged.Paths[key] = handler3.OpenAPIV3DiscoveryGroupVersion{
44+
ServerRelativeURL: delegationURLs[0].ServerRelativeURL, // take first found apiServiceInfo
45+
}
46+
} else {
47+
newMergedURL, err := mergeURLETags(delegationUniqueURLs)
48+
if err != nil {
49+
klog.Errorf("failed create merged openapi v3 URL for: %s: %s", key, err.Error())
50+
continue
51+
}
52+
merged.Paths[key] = handler3.OpenAPIV3DiscoveryGroupVersion{
53+
ServerRelativeURL: newMergedURL.String(),
54+
}
55+
56+
}
57+
}
58+
return merged
59+
}
60+
61+
// delegateAndMergeHandleGroupVersion delegates requests to eligibleURLs and merges their output
62+
//
63+
// We expect to delegate and merge for the following groups:
64+
// authorization.openshift.io, security.openshift.io and quota.openshift.io
65+
// which are backed by both CRD apiextension apiserver and openshift apiserver.
66+
//
67+
// The other requests will be passed to the original apiServiceInfo handler.
68+
func delegateAndMergeHandleGroupVersion(w http.ResponseWriter, r *http.Request, eligibleURLs []string, eligibleURLsToAPIServiceInfos map[string]*openAPIV3APIServiceInfo) {
69+
if len(eligibleURLs) == 1 {
70+
// fully delegate to the handler
71+
eligibleURLsToAPIServiceInfos[eligibleURLs[0]].handler.ServeHTTP(w, r)
72+
return
73+
} else if len(eligibleURLs) > 1 {
74+
mergedURL, err := mergeURLETags(eligibleURLs)
75+
if err != nil {
76+
klog.Errorf("failed to get mergedURL: %s", err.Error())
77+
w.WriteHeader(http.StatusInternalServerError)
78+
return
79+
}
80+
81+
if !isHashCurrent(r.URL, mergedURL.Query().Get("hash")) {
82+
http.Redirect(w, r, mergedURL.String(), 301)
83+
return
84+
85+
}
86+
var specs []*spec3.OpenAPI
87+
var maxLastModified time.Time
88+
89+
for eligibleURL, apiServiceInfo := range eligibleURLsToAPIServiceInfos {
90+
writer := responsewriter.NewInMemoryResponseWriter()
91+
req, err := createNewAPIServiceRequest(r, eligibleURL)
92+
if err != nil {
93+
klog.Errorf("failed to create request: %s", err.Error())
94+
continue
95+
}
96+
// delegate to multiple apiService handlers
97+
apiServiceInfo.handler.ServeHTTP(writer, req)
98+
lastModified, err := time.Parse(time.RFC1123, writer.Header().Get("Last-Modified"))
99+
if err != nil {
100+
klog.Warningf("not received Last-Modified in RFC1123 format: %s", err.Error())
101+
} else if lastModified.After(maxLastModified) {
102+
maxLastModified = lastModified
103+
}
104+
105+
spec := spec3.OpenAPI{}
106+
if err := json.Unmarshal(writer.Data(), &spec); err != nil {
107+
klog.Errorf("failed to unmarshal OpenAPI for openapiService %v/%v: %s", apiServiceInfo.apiService.Namespace, apiServiceInfo.apiService.Name, err.Error())
108+
continue
109+
}
110+
specs = append(specs, &spec)
111+
}
112+
113+
// prefer info and version from external apiServices (will result in openshift title and description)
114+
sort.Slice(specs, func(i, j int) bool {
115+
if info := specs[i].Info; info != nil && strings.HasPrefix(strings.ToLower(info.Title), "kubernetes") {
116+
return false
117+
}
118+
return true
119+
})
120+
mergedSpec, err := mergeSpecsV3(specs...)
121+
if err != nil {
122+
klog.Errorf("failed to merge spec: %s", err.Error())
123+
w.WriteHeader(http.StatusInternalServerError)
124+
return
125+
}
126+
mergedSpecJSON, _ := json.Marshal(mergedSpec)
127+
128+
if maxLastModified.IsZero() {
129+
maxLastModified = time.Now()
130+
}
131+
132+
openAPIHandleGroupVersion(w, r, mergedSpecJSON, mergedURL.Query().Get("hash"), maxLastModified)
133+
}
134+
}
135+
136+
// openAPIHandleGroupVersion is mostly copied from https://github.com/kubernetes/kube-openapi/blob/3c0fae5ee9fdc4e0cb7abff6fd66784a1f0dbcf8/pkg/handler3/handler.go#L222
137+
func openAPIHandleGroupVersion(w http.ResponseWriter, r *http.Request, data []byte, etag string, lastModified time.Time) {
138+
const (
139+
subTypeProtobufDeprecated = "[email protected]+protobuf"
140+
subTypeProtobuf = "com.github.proto-openapi.spec.v3.v1.0+protobuf"
141+
subTypeJSON = "json"
142+
)
143+
144+
decipherableFormats := r.Header.Get("Accept")
145+
if decipherableFormats == "" {
146+
decipherableFormats = "*/*"
147+
}
148+
clauses := goautoneg.ParseAccept(decipherableFormats)
149+
w.Header().Add("Vary", "Accept")
150+
151+
if len(clauses) == 0 {
152+
return
153+
}
154+
155+
accepted := []struct {
156+
Type string
157+
SubType string
158+
ReturnedContentType string
159+
}{
160+
{"application", subTypeJSON, "application/" + subTypeJSON},
161+
{"application", subTypeProtobuf, "application/" + subTypeProtobuf},
162+
{"application", subTypeProtobufDeprecated, "application/" + subTypeProtobuf},
163+
}
164+
165+
for _, clause := range clauses {
166+
for _, accepts := range accepted {
167+
if clause.Type != accepts.Type && clause.Type != "*" {
168+
continue
169+
}
170+
if clause.SubType != accepts.SubType && clause.SubType != "*" {
171+
continue
172+
}
173+
174+
switch accepts.SubType {
175+
case subTypeProtobuf, subTypeProtobufDeprecated:
176+
var err error
177+
data, err = handler3.ToV3ProtoBinary(data)
178+
if err != nil {
179+
klog.Errorf("failed to convert json to proto: %v", err)
180+
w.WriteHeader(http.StatusInternalServerError)
181+
return
182+
}
183+
}
184+
// Set Content-Type header in the reponse
185+
w.Header().Set("Content-Type", accepts.ReturnedContentType)
186+
187+
// ETag must be enclosed in double quotes: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag
188+
w.Header().Set("Etag", strconv.Quote(etag))
189+
190+
if hash := r.URL.Query().Get("hash"); hash != "" {
191+
// validity of hash checked in handleGroupVersion with isHashCurrent
192+
193+
// The Vary header is required because the Accept header can
194+
// change the contents returned. This prevents clients from caching
195+
// protobuf as JSON and vice versa.
196+
w.Header().Set("Vary", "Accept")
197+
198+
// Only set these headers when a hash is given.
199+
w.Header().Set("Cache-Control", "public, immutable")
200+
// Set the Expires directive to the maximum value of one year from the request,
201+
// effectively indicating that the cache never expires.
202+
w.Header().Set("Expires", time.Now().AddDate(1, 0, 0).Format(time.RFC1123))
203+
}
204+
http.ServeContent(w, r, "", lastModified, bytes.NewReader(data))
205+
return
206+
}
207+
}
208+
w.WriteHeader(406)
209+
return
210+
}
211+
212+
func toUniqueRelativeURLs(relativeURLs []handler3.OpenAPIV3DiscoveryGroupVersion) sets.Set[string] {
213+
uniqueURLs := sets.New[string]()
214+
for _, url := range relativeURLs {
215+
uniqueURLs.Insert(url.ServerRelativeURL)
216+
}
217+
return uniqueURLs
218+
}
219+
220+
func hasPrefix(urls []string, prefix string) bool {
221+
if len(urls) == 0 {
222+
return false
223+
}
224+
for _, url := range urls {
225+
if !strings.HasPrefix(url, prefix) {
226+
return false
227+
}
228+
}
229+
return true
230+
}
231+
232+
func isHashCurrent(u *neturl.URL, currentETag string) bool {
233+
if hash := u.Query().Get("hash"); len(hash) > 0 {
234+
// check if hash is current only if requested
235+
return hash == currentETag
236+
}
237+
return true
238+
}
239+
240+
// computeETag is copied from https://github.com/kubernetes/kubernetes/blob/2c6c4566eff972d6c1320b5f8ad795f88c822d09/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/aggregated/etag.go#L76
241+
func computeETag(data []byte) string {
242+
if data == nil {
243+
return ""
244+
}
245+
return fmt.Sprintf("%X", sha512.Sum512(data))
246+
}
247+
248+
func mergeURLETags(delegationURLs []string) (*neturl.URL, error) {
249+
// presume all urls are the same, so take the first one
250+
newURL, err := neturl.Parse(delegationURLs[0])
251+
if err != nil {
252+
return nil, err
253+
}
254+
if len(delegationURLs) == 1 {
255+
return newURL, nil
256+
}
257+
// sorted, for consistent hash
258+
delegationUniqueURLs := sets.List(sets.New(delegationURLs...))
259+
delegationUniqueURLsBytes, err := json.Marshal(delegationUniqueURLs)
260+
if err != nil {
261+
return nil, err
262+
}
263+
etag := computeETag(delegationUniqueURLsBytes)
264+
265+
newQuery := newURL.Query()
266+
newQuery.Set("hash", etag)
267+
newURL.RawQuery = newQuery.Encode()
268+
return newURL, nil
269+
}
270+
271+
func createNewAPIServiceRequest(from *http.Request, eligibleURL string) (*http.Request, error) {
272+
req := from.Clone(request.WithUser(context.Background(), &user.DefaultInfo{Name: aggregatorUser}))
273+
req.Header.Set("Accept", "application/json")
274+
if hash := req.URL.Query().Get("hash"); len(hash) > 0 {
275+
eligibleParsedURL, err := neturl.Parse(eligibleURL)
276+
if err != nil {
277+
return nil, err
278+
}
279+
// rewrite to include the latest hash for this apiservice
280+
q := req.URL.Query()
281+
q.Set("hash", eligibleParsedURL.Query().Get("hash"))
282+
req.URL.RawQuery = q.Encode()
283+
}
284+
return req, nil
285+
}

0 commit comments

Comments
 (0)