Skip to content

Commit fcb43d6

Browse files
tkashembertinatto
authored andcommitted
UPSTREAM: 115328: annotate early and late requests
UPSTREAM: <carry>: add shutdown annotation to response header If it is useful we will combine this with the following carry: 20caad9: UPSTREAM: 115328: annotate early and late requests UPSTREAM: <carry>: add conditional shutdown response header
1 parent 1ff0bec commit fcb43d6

File tree

5 files changed

+655
-12
lines changed

5 files changed

+655
-12
lines changed

staging/src/k8s.io/apiserver/pkg/server/config.go

+3
Original file line numberDiff line numberDiff line change
@@ -1094,6 +1094,8 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
10941094
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
10951095
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "audit")
10961096

1097+
handler = genericfilters.WithStartupEarlyAnnotation(handler, c.lifecycleSignals.HasBeenReady)
1098+
10971099
failedHandler := genericapifilters.Unauthorized(c.Serializer)
10981100
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
10991101

@@ -1134,6 +1136,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
11341136
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.NotAcceptingNewRequest.Signaled())
11351137
}
11361138
handler = genericfilters.WithOptInRetryAfter(handler, c.newServerFullyInitializedFunc())
1139+
handler = genericfilters.WithShutdownResponseHeader(handler, c.lifecycleSignals.ShutdownInitiated, c.ShutdownDelayDuration, c.APIServerID)
11371140
handler = genericfilters.WithHTTPLogging(handler, c.newIsTerminatingFunc())
11381141
handler = genericapifilters.WithLatencyTrackers(handler)
11391142
// WithRoutine will execute future handlers in a separate goroutine and serving
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package filters
18+
19+
import (
20+
"fmt"
21+
"net"
22+
"net/http"
23+
"strings"
24+
"time"
25+
26+
"k8s.io/apiserver/pkg/audit"
27+
"k8s.io/apiserver/pkg/authentication/user"
28+
"k8s.io/apiserver/pkg/endpoints/request"
29+
clockutils "k8s.io/utils/clock"
30+
netutils "k8s.io/utils/net"
31+
)
32+
33+
type lifecycleEvent interface {
34+
// Name returns the name of the signal, useful for logging.
35+
Name() string
36+
37+
// Signaled returns a channel that is closed when the underlying event
38+
// has been signaled. Successive calls to Signaled return the same value.
39+
Signaled() <-chan struct{}
40+
41+
// SignaledAt returns the time the event was signaled. If SignaledAt is
42+
// invoked before the event is signaled nil will be returned.
43+
SignaledAt() *time.Time
44+
}
45+
46+
type shouldExemptFunc func(*http.Request) bool
47+
48+
var (
49+
// the health probes are not annotated by default
50+
healthProbes = []string{
51+
"/readyz",
52+
"/healthz",
53+
"/livez",
54+
}
55+
)
56+
57+
func exemptIfHealthProbe(r *http.Request) bool {
58+
path := "/" + strings.TrimLeft(r.URL.Path, "/")
59+
for _, probe := range healthProbes {
60+
if path == probe {
61+
return true
62+
}
63+
}
64+
return false
65+
}
66+
67+
// WithShutdownResponseHeader, if added to the handler chain, adds a header
68+
// 'X-OpenShift-Disruption' to the response with the following information:
69+
//
70+
// shutdown={true|false} shutdown-delay-duration=%s elapsed=%s host=%s
71+
// shutdown: whether the server is currently shutting down gracefully.
72+
// shutdown-delay-duration: value of --shutdown-delay-duration server run option
73+
// elapsed: how much time has elapsed since the server received a TERM signal
74+
// host: host name of the server, it is used to identify the server instance
75+
// from the others.
76+
//
77+
// This handler will add the response header only if the client opts in by
78+
// adding the 'X-Openshift-If-Disruption' header to the request.
79+
func WithShutdownResponseHeader(handler http.Handler, shutdownInitiated lifecycleEvent, delayDuration time.Duration, apiServerID string) http.Handler {
80+
return withShutdownResponseHeader(handler, shutdownInitiated, delayDuration, apiServerID, clockutils.RealClock{})
81+
}
82+
83+
// WithStartupEarlyAnnotation annotates the request with an annotation keyed as
84+
// 'apiserver.k8s.io/startup' if the request arrives early (the server is not
85+
// fully initialized yet). It should be placed after (in order of execution)
86+
// the 'WithAuthentication' filter.
87+
func WithStartupEarlyAnnotation(handler http.Handler, hasBeenReady lifecycleEvent) http.Handler {
88+
return withStartupEarlyAnnotation(handler, hasBeenReady, exemptIfHealthProbe)
89+
}
90+
91+
func withShutdownResponseHeader(handler http.Handler, shutdownInitiated lifecycleEvent, delayDuration time.Duration, apiServerID string, clock clockutils.PassiveClock) http.Handler {
92+
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
93+
if len(req.Header.Get("X-Openshift-If-Disruption")) == 0 {
94+
handler.ServeHTTP(w, req)
95+
return
96+
}
97+
98+
msgFn := func(shutdown bool, elapsed time.Duration) string {
99+
return fmt.Sprintf("shutdown=%t shutdown-delay-duration=%s elapsed=%s host=%s",
100+
shutdown, delayDuration.Round(time.Second).String(), elapsed.Round(time.Second).String(), apiServerID)
101+
}
102+
103+
select {
104+
case <-shutdownInitiated.Signaled():
105+
default:
106+
w.Header().Set("X-OpenShift-Disruption", msgFn(false, time.Duration(0)))
107+
handler.ServeHTTP(w, req)
108+
return
109+
}
110+
111+
shutdownInitiatedAt := shutdownInitiated.SignaledAt()
112+
if shutdownInitiatedAt == nil {
113+
w.Header().Set("X-OpenShift-Disruption", msgFn(true, time.Duration(0)))
114+
handler.ServeHTTP(w, req)
115+
return
116+
}
117+
118+
w.Header().Set("X-OpenShift-Disruption", msgFn(true, clock.Since(*shutdownInitiatedAt)))
119+
handler.ServeHTTP(w, req)
120+
})
121+
}
122+
123+
func withStartupEarlyAnnotation(handler http.Handler, hasBeenReady lifecycleEvent, shouldExemptFn shouldExemptFunc) http.Handler {
124+
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
125+
select {
126+
case <-hasBeenReady.Signaled():
127+
handler.ServeHTTP(w, req)
128+
return
129+
default:
130+
}
131+
132+
// NOTE: some upstream unit tests have authentication disabled and will
133+
// fail if we require the requestor to be present in the request
134+
// context. Fixing those unit tests will increase the chance of merge
135+
// conflict during rebase.
136+
// This also implies that this filter must be placed after (in order of
137+
// execution) the 'WithAuthentication' filter.
138+
self := "self="
139+
if requestor, exists := request.UserFrom(req.Context()); exists && requestor != nil {
140+
if requestor.GetName() == user.APIServerUser {
141+
handler.ServeHTTP(w, req)
142+
return
143+
}
144+
self = fmt.Sprintf("%s%t", self, false)
145+
}
146+
147+
audit.AddAuditAnnotation(req.Context(), "apiserver.k8s.io/startup",
148+
fmt.Sprintf("early=true %s loopback=%t", self, isLoopback(req.RemoteAddr)))
149+
150+
handler.ServeHTTP(w, req)
151+
})
152+
}
153+
154+
func isLoopback(address string) bool {
155+
host, _, err := net.SplitHostPort(address)
156+
if err != nil {
157+
// if the address is missing a port, SplitHostPort will return an error
158+
// with an empty host, and port value. For such an error, we should
159+
// continue and try to parse the original address.
160+
host = address
161+
}
162+
if ip := netutils.ParseIPSloppy(host); ip != nil {
163+
return ip.IsLoopback()
164+
}
165+
166+
return false
167+
}

0 commit comments

Comments
 (0)