Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 3bdbf8e

Browse files
tkashembertinatto
authored andcommittedNov 28, 2024
UPSTREAM: 115328: annotate early and late requests
UPSTREAM: <carry>: add shutdown annotation to response header If it is useful we will combine this with the following carry: 20caad9: UPSTREAM: 115328: annotate early and late requests UPSTREAM: <carry>: add conditional shutdown response header
1 parent 8b5c912 commit 3bdbf8e

File tree

5 files changed

+655
-12
lines changed

5 files changed

+655
-12
lines changed
 

Diff for: ‎staging/src/k8s.io/apiserver/pkg/server/config.go

+3
Original file line numberDiff line numberDiff line change
@@ -1085,6 +1085,8 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
10851085
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
10861086
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "audit")
10871087

1088+
handler = genericfilters.WithStartupEarlyAnnotation(handler, c.lifecycleSignals.HasBeenReady)
1089+
10881090
failedHandler := genericapifilters.Unauthorized(c.Serializer)
10891091
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
10901092

@@ -1120,6 +1122,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
11201122
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.NotAcceptingNewRequest.Signaled())
11211123
}
11221124
handler = genericfilters.WithOptInRetryAfter(handler, c.newServerFullyInitializedFunc())
1125+
handler = genericfilters.WithShutdownResponseHeader(handler, c.lifecycleSignals.ShutdownInitiated, c.ShutdownDelayDuration, c.APIServerID)
11231126
handler = genericfilters.WithHTTPLogging(handler, c.newIsTerminatingFunc())
11241127
if c.FeatureGate.Enabled(genericfeatures.APIServerTracing) {
11251128
handler = genericapifilters.WithTracing(handler, c.TracerProvider)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package filters
18+
19+
import (
20+
"fmt"
21+
"net"
22+
"net/http"
23+
"strings"
24+
"time"
25+
26+
"k8s.io/apiserver/pkg/audit"
27+
"k8s.io/apiserver/pkg/authentication/user"
28+
"k8s.io/apiserver/pkg/endpoints/request"
29+
clockutils "k8s.io/utils/clock"
30+
netutils "k8s.io/utils/net"
31+
)
32+
33+
type lifecycleEvent interface {
34+
// Name returns the name of the signal, useful for logging.
35+
Name() string
36+
37+
// Signaled returns a channel that is closed when the underlying event
38+
// has been signaled. Successive calls to Signaled return the same value.
39+
Signaled() <-chan struct{}
40+
41+
// SignaledAt returns the time the event was signaled. If SignaledAt is
42+
// invoked before the event is signaled nil will be returned.
43+
SignaledAt() *time.Time
44+
}
45+
46+
type shouldExemptFunc func(*http.Request) bool
47+
48+
var (
49+
// the health probes are not annotated by default
50+
healthProbes = []string{
51+
"/readyz",
52+
"/healthz",
53+
"/livez",
54+
}
55+
)
56+
57+
func exemptIfHealthProbe(r *http.Request) bool {
58+
path := "/" + strings.TrimLeft(r.URL.Path, "/")
59+
for _, probe := range healthProbes {
60+
if path == probe {
61+
return true
62+
}
63+
}
64+
return false
65+
}
66+
67+
// WithShutdownResponseHeader, if added to the handler chain, adds a header
68+
// 'X-OpenShift-Disruption' to the response with the following information:
69+
//
70+
// shutdown={true|false} shutdown-delay-duration=%s elapsed=%s host=%s
71+
// shutdown: whether the server is currently shutting down gracefully.
72+
// shutdown-delay-duration: value of --shutdown-delay-duration server run option
73+
// elapsed: how much time has elapsed since the server received a TERM signal
74+
// host: host name of the server, it is used to identify the server instance
75+
// from the others.
76+
//
77+
// This handler will add the response header only if the client opts in by
78+
// adding the 'X-Openshift-If-Disruption' header to the request.
79+
func WithShutdownResponseHeader(handler http.Handler, shutdownInitiated lifecycleEvent, delayDuration time.Duration, apiServerID string) http.Handler {
80+
return withShutdownResponseHeader(handler, shutdownInitiated, delayDuration, apiServerID, clockutils.RealClock{})
81+
}
82+
83+
// WithStartupEarlyAnnotation annotates the request with an annotation keyed as
84+
// 'apiserver.k8s.io/startup' if the request arrives early (the server is not
85+
// fully initialized yet). It should be placed after (in order of execution)
86+
// the 'WithAuthentication' filter.
87+
func WithStartupEarlyAnnotation(handler http.Handler, hasBeenReady lifecycleEvent) http.Handler {
88+
return withStartupEarlyAnnotation(handler, hasBeenReady, exemptIfHealthProbe)
89+
}
90+
91+
func withShutdownResponseHeader(handler http.Handler, shutdownInitiated lifecycleEvent, delayDuration time.Duration, apiServerID string, clock clockutils.PassiveClock) http.Handler {
92+
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
93+
if len(req.Header.Get("X-Openshift-If-Disruption")) == 0 {
94+
handler.ServeHTTP(w, req)
95+
return
96+
}
97+
98+
msgFn := func(shutdown bool, elapsed time.Duration) string {
99+
return fmt.Sprintf("shutdown=%t shutdown-delay-duration=%s elapsed=%s host=%s",
100+
shutdown, delayDuration.Round(time.Second).String(), elapsed.Round(time.Second).String(), apiServerID)
101+
}
102+
103+
select {
104+
case <-shutdownInitiated.Signaled():
105+
default:
106+
w.Header().Set("X-OpenShift-Disruption", msgFn(false, time.Duration(0)))
107+
handler.ServeHTTP(w, req)
108+
return
109+
}
110+
111+
shutdownInitiatedAt := shutdownInitiated.SignaledAt()
112+
if shutdownInitiatedAt == nil {
113+
w.Header().Set("X-OpenShift-Disruption", msgFn(true, time.Duration(0)))
114+
handler.ServeHTTP(w, req)
115+
return
116+
}
117+
118+
w.Header().Set("X-OpenShift-Disruption", msgFn(true, clock.Since(*shutdownInitiatedAt)))
119+
handler.ServeHTTP(w, req)
120+
})
121+
}
122+
123+
func withStartupEarlyAnnotation(handler http.Handler, hasBeenReady lifecycleEvent, shouldExemptFn shouldExemptFunc) http.Handler {
124+
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
125+
select {
126+
case <-hasBeenReady.Signaled():
127+
handler.ServeHTTP(w, req)
128+
return
129+
default:
130+
}
131+
132+
// NOTE: some upstream unit tests have authentication disabled and will
133+
// fail if we require the requestor to be present in the request
134+
// context. Fixing those unit tests will increase the chance of merge
135+
// conflict during rebase.
136+
// This also implies that this filter must be placed after (in order of
137+
// execution) the 'WithAuthentication' filter.
138+
self := "self="
139+
if requestor, exists := request.UserFrom(req.Context()); exists && requestor != nil {
140+
if requestor.GetName() == user.APIServerUser {
141+
handler.ServeHTTP(w, req)
142+
return
143+
}
144+
self = fmt.Sprintf("%s%t", self, false)
145+
}
146+
147+
audit.AddAuditAnnotation(req.Context(), "apiserver.k8s.io/startup",
148+
fmt.Sprintf("early=true %s loopback=%t", self, isLoopback(req.RemoteAddr)))
149+
150+
handler.ServeHTTP(w, req)
151+
})
152+
}
153+
154+
func isLoopback(address string) bool {
155+
host, _, err := net.SplitHostPort(address)
156+
if err != nil {
157+
// if the address is missing a port, SplitHostPort will return an error
158+
// with an empty host, and port value. For such an error, we should
159+
// continue and try to parse the original address.
160+
host = address
161+
}
162+
if ip := netutils.ParseIPSloppy(host); ip != nil {
163+
return ip.IsLoopback()
164+
}
165+
166+
return false
167+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,384 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package filters
18+
19+
import (
20+
"net/http"
21+
"net/http/httptest"
22+
"testing"
23+
"time"
24+
25+
"github.com/google/go-cmp/cmp"
26+
auditinternal "k8s.io/apiserver/pkg/apis/audit"
27+
"k8s.io/apiserver/pkg/audit"
28+
authenticationuser "k8s.io/apiserver/pkg/authentication/user"
29+
apirequest "k8s.io/apiserver/pkg/endpoints/request"
30+
utilsclock "k8s.io/utils/clock"
31+
clocktesting "k8s.io/utils/clock/testing"
32+
)
33+
34+
func TestWithShutdownResponseHeader(t *testing.T) {
35+
var (
36+
signaledAt = time.Now()
37+
elapsedAt = signaledAt.Add(20 * time.Second)
38+
)
39+
40+
tests := []struct {
41+
name string
42+
optIn bool
43+
shutdownInitiated func() lifecycleEvent
44+
delayDuration time.Duration
45+
clock func() utilsclock.PassiveClock
46+
handlerInvoked int
47+
statusCodeExpected int
48+
responseHeader string
49+
}{
50+
{
51+
name: "client did not opt in",
52+
shutdownInitiated: func() lifecycleEvent {
53+
return nil
54+
},
55+
handlerInvoked: 1,
56+
statusCodeExpected: http.StatusOK,
57+
},
58+
{
59+
name: "client opted in, shutdown not initiated",
60+
optIn: true,
61+
shutdownInitiated: func() lifecycleEvent {
62+
return fakeLifecycleSignal{ch: make(chan struct{})}
63+
},
64+
delayDuration: 10 * time.Second,
65+
handlerInvoked: 1,
66+
statusCodeExpected: http.StatusOK,
67+
responseHeader: "shutdown=false shutdown-delay-duration=10s elapsed=0s host=foo",
68+
},
69+
{
70+
name: "client opted in, shutdown initiated, signaled at is nil",
71+
optIn: true,
72+
delayDuration: 10 * time.Second,
73+
shutdownInitiated: func() lifecycleEvent {
74+
return fakeLifecycleSignal{ch: newClosedChannel(), at: nil}
75+
},
76+
handlerInvoked: 1,
77+
statusCodeExpected: http.StatusOK,
78+
responseHeader: "shutdown=true shutdown-delay-duration=10s elapsed=0s host=foo",
79+
},
80+
{
81+
name: "client opted in, shutdown initiated, signaled at is nil",
82+
optIn: true,
83+
delayDuration: 10 * time.Second,
84+
shutdownInitiated: func() lifecycleEvent {
85+
return fakeLifecycleSignal{ch: newClosedChannel(), at: nil}
86+
},
87+
handlerInvoked: 1,
88+
statusCodeExpected: http.StatusOK,
89+
responseHeader: "shutdown=true shutdown-delay-duration=10s elapsed=0s host=foo",
90+
},
91+
{
92+
name: "client opted in, shutdown delay duration is zero",
93+
optIn: true,
94+
delayDuration: 0,
95+
shutdownInitiated: func() lifecycleEvent {
96+
return fakeLifecycleSignal{ch: newClosedChannel(), at: &signaledAt}
97+
},
98+
clock: func() utilsclock.PassiveClock {
99+
return clocktesting.NewFakeClock(elapsedAt)
100+
},
101+
handlerInvoked: 1,
102+
statusCodeExpected: http.StatusOK,
103+
responseHeader: "shutdown=true shutdown-delay-duration=0s elapsed=20s host=foo",
104+
},
105+
{
106+
name: "client opted in, shutdown initiated, signaled at is valied",
107+
optIn: true,
108+
delayDuration: 10 * time.Second,
109+
shutdownInitiated: func() lifecycleEvent {
110+
return fakeLifecycleSignal{ch: newClosedChannel(), at: &signaledAt}
111+
},
112+
clock: func() utilsclock.PassiveClock {
113+
return clocktesting.NewFakeClock(elapsedAt)
114+
},
115+
handlerInvoked: 1,
116+
statusCodeExpected: http.StatusOK,
117+
responseHeader: "shutdown=true shutdown-delay-duration=10s elapsed=20s host=foo",
118+
},
119+
}
120+
121+
for _, test := range tests {
122+
t.Run(test.name, func(t *testing.T) {
123+
var handlerInvoked int
124+
handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
125+
handlerInvoked++
126+
w.WriteHeader(http.StatusOK)
127+
})
128+
129+
event := test.shutdownInitiated()
130+
var clock utilsclock.PassiveClock = utilsclock.RealClock{}
131+
if test.clock != nil {
132+
clock = test.clock()
133+
}
134+
target := withShutdownResponseHeader(handler, event, test.delayDuration, "foo", clock)
135+
136+
req, err := http.NewRequest(http.MethodGet, "/api/v1/namespaces", nil)
137+
if err != nil {
138+
t.Fatalf("failed to create new http request - %v", err)
139+
}
140+
if test.optIn {
141+
req.Header.Set("X-Openshift-If-Disruption", "true")
142+
}
143+
144+
w := httptest.NewRecorder()
145+
w.Code = 0
146+
target.ServeHTTP(w, req)
147+
148+
if test.handlerInvoked != handlerInvoked {
149+
t.Errorf("expected the handler to be invoked: %d timed, but got: %d", test.handlerInvoked, handlerInvoked)
150+
}
151+
if test.statusCodeExpected != w.Result().StatusCode {
152+
t.Errorf("expected status code: %d, but got: %d", test.statusCodeExpected, w.Result().StatusCode)
153+
}
154+
155+
key := "X-OpenShift-Disruption"
156+
switch {
157+
case len(test.responseHeader) == 0:
158+
if valueGot := w.Header().Get(key); len(valueGot) > 0 {
159+
t.Errorf("did not expect header to be added to the response, but got: %s", valueGot)
160+
}
161+
default:
162+
if valueGot := w.Header().Get(key); len(valueGot) == 0 || test.responseHeader != valueGot {
163+
t.Logf("got: %s", valueGot)
164+
t.Errorf("expected response header to match, diff: %s", cmp.Diff(test.responseHeader, valueGot))
165+
}
166+
}
167+
})
168+
}
169+
}
170+
171+
func TestWithStartupEarlyAnnotation(t *testing.T) {
172+
tests := []struct {
173+
name string
174+
readySignalFn func() lifecycleEvent
175+
user authenticationuser.Info
176+
remoteAddr string
177+
handlerInvoked int
178+
statusCodeExpected int
179+
annotationExpected string
180+
}{
181+
{
182+
name: "server is ready",
183+
readySignalFn: func() lifecycleEvent {
184+
return fakeLifecycleSignal{ch: newClosedChannel()}
185+
},
186+
handlerInvoked: 1,
187+
statusCodeExpected: http.StatusOK,
188+
},
189+
{
190+
name: "server not ready, no user in request context",
191+
readySignalFn: func() lifecycleEvent {
192+
return fakeLifecycleSignal{ch: make(chan struct{})}
193+
},
194+
handlerInvoked: 1,
195+
statusCodeExpected: http.StatusOK,
196+
annotationExpected: "early=true self= loopback=false",
197+
},
198+
{
199+
name: "server not ready, self is true, not annotated",
200+
readySignalFn: func() lifecycleEvent {
201+
return fakeLifecycleSignal{ch: make(chan struct{})}
202+
},
203+
user: &authenticationuser.DefaultInfo{Name: authenticationuser.APIServerUser},
204+
handlerInvoked: 1,
205+
statusCodeExpected: http.StatusOK,
206+
},
207+
{
208+
name: "server not ready, self is false, request is annotated",
209+
readySignalFn: func() lifecycleEvent {
210+
return fakeLifecycleSignal{ch: make(chan struct{})}
211+
},
212+
user: &authenticationuser.DefaultInfo{Name: authenticationuser.Anonymous},
213+
handlerInvoked: 1,
214+
statusCodeExpected: http.StatusOK,
215+
annotationExpected: "early=true self=false loopback=false",
216+
},
217+
{
218+
name: "server not ready, self is false, looback is true, request is annotated",
219+
readySignalFn: func() lifecycleEvent {
220+
return fakeLifecycleSignal{ch: make(chan struct{})}
221+
},
222+
user: &authenticationuser.DefaultInfo{Name: authenticationuser.Anonymous},
223+
remoteAddr: "127.0.0.1:8080",
224+
handlerInvoked: 1,
225+
statusCodeExpected: http.StatusOK,
226+
annotationExpected: "early=true self=false loopback=true",
227+
},
228+
}
229+
230+
for _, test := range tests {
231+
t.Run(test.name, func(t *testing.T) {
232+
var handlerInvoked int
233+
handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
234+
handlerInvoked++
235+
w.WriteHeader(http.StatusOK)
236+
})
237+
238+
event := test.readySignalFn()
239+
target := WithStartupEarlyAnnotation(handler, event)
240+
241+
req, err := http.NewRequest(http.MethodGet, "/api/v1/namespaces", nil)
242+
if err != nil {
243+
t.Fatalf("failed to create new http request - %v", err)
244+
}
245+
if test.remoteAddr != "" {
246+
req.RemoteAddr = test.remoteAddr
247+
}
248+
249+
ctx := req.Context()
250+
if test.user != nil {
251+
ctx = apirequest.WithUser(ctx, test.user)
252+
}
253+
ctx = audit.WithAuditContext(ctx)
254+
req = req.WithContext(ctx)
255+
256+
ac := audit.AuditContextFrom(req.Context())
257+
if ac == nil {
258+
t.Fatalf("expected audit context inside the request context")
259+
}
260+
ac.Event = auditinternal.Event{
261+
Level: auditinternal.LevelMetadata,
262+
}
263+
264+
w := httptest.NewRecorder()
265+
w.Code = 0
266+
target.ServeHTTP(w, req)
267+
268+
if test.handlerInvoked != handlerInvoked {
269+
t.Errorf("expected the handler to be invoked: %d timed, but got: %d", test.handlerInvoked, handlerInvoked)
270+
}
271+
if test.statusCodeExpected != w.Result().StatusCode {
272+
t.Errorf("expected status code: %d, but got: %d", test.statusCodeExpected, w.Result().StatusCode)
273+
}
274+
275+
key := "apiserver.k8s.io/startup"
276+
switch {
277+
case len(test.annotationExpected) == 0:
278+
if valueGot, ok := ac.Event.Annotations[key]; ok {
279+
t.Errorf("did not expect annotation to be added, but got: %s", valueGot)
280+
}
281+
default:
282+
if valueGot, ok := ac.Event.Annotations[key]; !ok || test.annotationExpected != valueGot {
283+
t.Errorf("expected annotation: %s, but got: %s", test.annotationExpected, valueGot)
284+
}
285+
}
286+
})
287+
}
288+
}
289+
290+
func TestIsLoopback(t *testing.T) {
291+
tests := []struct {
292+
address string
293+
want bool
294+
}{
295+
{
296+
address: "www.foo.bar:80",
297+
want: false,
298+
},
299+
{
300+
address: "www.foo.bar",
301+
want: false,
302+
},
303+
{
304+
address: "127.0.0.1:8080",
305+
want: true,
306+
},
307+
{
308+
address: "127.0.0.1",
309+
want: true,
310+
},
311+
{
312+
address: "192.168.0.1",
313+
want: false,
314+
},
315+
// localhost does not work
316+
{
317+
address: "localhost:8080",
318+
want: false,
319+
},
320+
{
321+
address: "localhost",
322+
want: false,
323+
},
324+
}
325+
326+
for _, test := range tests {
327+
t.Run(test.address, func(t *testing.T) {
328+
if got := isLoopback(test.address); test.want != got {
329+
t.Errorf("expected isLoopback to return: %t, but got: %t", test.want, got)
330+
}
331+
})
332+
}
333+
}
334+
335+
func TestExemptIfHealthProbe(t *testing.T) {
336+
tests := []struct {
337+
path string
338+
exempt bool
339+
}{
340+
{
341+
path: "/apis/v1/foo/bar",
342+
exempt: false,
343+
},
344+
{
345+
path: "/readyz",
346+
exempt: true,
347+
},
348+
{
349+
path: "http://foo.bar///healthz?verbose=1",
350+
exempt: true,
351+
},
352+
{
353+
path: "/livez",
354+
exempt: true,
355+
},
356+
}
357+
358+
for _, test := range tests {
359+
t.Run(test.path, func(t *testing.T) {
360+
req, err := http.NewRequest(http.MethodGet, test.path, nil)
361+
if err != nil {
362+
t.Fatalf("failed to create new http request - %v", err)
363+
}
364+
if got := exemptIfHealthProbe(req); test.exempt != got {
365+
t.Errorf("expected exemptIfHealthProbe to return: %t, but got: %t", test.exempt, got)
366+
}
367+
})
368+
}
369+
}
370+
371+
type fakeLifecycleSignal struct {
372+
ch <-chan struct{}
373+
at *time.Time
374+
}
375+
376+
func (s fakeLifecycleSignal) Name() string { return "initiated" }
377+
func (s fakeLifecycleSignal) Signaled() <-chan struct{} { return s.ch }
378+
func (s fakeLifecycleSignal) SignaledAt() *time.Time { return s.at }
379+
380+
func newClosedChannel() <-chan struct{} {
381+
ch := make(chan struct{})
382+
close(ch)
383+
return ch
384+
}

Diff for: ‎staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go

+47-12
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ package server
1818

1919
import (
2020
"sync"
21+
"sync/atomic"
22+
"time"
23+
24+
utilsclock "k8s.io/utils/clock"
2125
)
2226

2327
/*
@@ -100,6 +104,10 @@ type lifecycleSignal interface {
100104

101105
// Name returns the name of the signal, useful for logging.
102106
Name() string
107+
108+
// SignaledAt returns the time the event was signaled. If SignaledAt is
109+
// invoked before the event is signaled nil will be returned.
110+
SignaledAt() *time.Time
103111
}
104112

105113
// lifecycleSignals provides an abstraction of the events that
@@ -157,34 +165,53 @@ func (s lifecycleSignals) ShuttingDown() <-chan struct{} {
157165
// newLifecycleSignals returns an instance of lifecycleSignals interface to be used
158166
// to coordinate lifecycle of the apiserver
159167
func newLifecycleSignals() lifecycleSignals {
168+
clock := utilsclock.RealClock{}
160169
return lifecycleSignals{
161-
ShutdownInitiated: newNamedChannelWrapper("ShutdownInitiated"),
162-
AfterShutdownDelayDuration: newNamedChannelWrapper("AfterShutdownDelayDuration"),
163-
PreShutdownHooksStopped: newNamedChannelWrapper("PreShutdownHooksStopped"),
164-
NotAcceptingNewRequest: newNamedChannelWrapper("NotAcceptingNewRequest"),
165-
InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"),
166-
HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"),
167-
HasBeenReady: newNamedChannelWrapper("HasBeenReady"),
168-
MuxAndDiscoveryComplete: newNamedChannelWrapper("MuxAndDiscoveryComplete"),
170+
ShutdownInitiated: newNamedChannelWrapper("ShutdownInitiated", clock),
171+
AfterShutdownDelayDuration: newNamedChannelWrapper("AfterShutdownDelayDuration", clock),
172+
PreShutdownHooksStopped: newNamedChannelWrapper("PreShutdownHooksStopped", clock),
173+
NotAcceptingNewRequest: newNamedChannelWrapper("NotAcceptingNewRequest", clock),
174+
InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained", clock),
175+
HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening", clock),
176+
HasBeenReady: newNamedChannelWrapper("HasBeenReady", clock),
177+
MuxAndDiscoveryComplete: newNamedChannelWrapper("MuxAndDiscoveryComplete", clock),
169178
}
170179
}
171180

172-
func newNamedChannelWrapper(name string) lifecycleSignal {
181+
func newNamedChannelWrapper(name string, clock utilsclock.PassiveClock) lifecycleSignal {
173182
return &namedChannelWrapper{
174-
name: name,
175-
once: sync.Once{},
176-
ch: make(chan struct{}),
183+
name: name,
184+
once: sync.Once{},
185+
ch: make(chan struct{}),
186+
clock: clock,
177187
}
178188
}
179189

180190
type namedChannelWrapper struct {
181191
name string
182192
once sync.Once
183193
ch chan struct{}
194+
195+
clock utilsclock.PassiveClock
196+
signaledAt atomic.Value
184197
}
185198

186199
func (e *namedChannelWrapper) Signal() {
187200
e.once.Do(func() {
201+
// set the signaledAt value first to support the expected use case:
202+
//
203+
// <-s.Signaled()
204+
// ..
205+
// at := s.SignaledAt()
206+
//
207+
// we guarantee that at will never be nil after the event is signaled,
208+
// it also implies that 'SignaledAt' if used independently outside of
209+
// the above use case, it may return a valid non-empty time (due to
210+
// the delay between setting signaledAt and closing the channel)
211+
// even when the event has not signaled yet.
212+
now := e.clock.Now()
213+
e.signaledAt.Store(&now)
214+
188215
close(e.ch)
189216
})
190217
}
@@ -196,3 +223,11 @@ func (e *namedChannelWrapper) Signaled() <-chan struct{} {
196223
func (e *namedChannelWrapper) Name() string {
197224
return e.name
198225
}
226+
227+
func (e *namedChannelWrapper) SignaledAt() *time.Time {
228+
value := e.signaledAt.Load()
229+
if value == nil {
230+
return nil
231+
}
232+
return value.(*time.Time)
233+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package server
18+
19+
import (
20+
"testing"
21+
"time"
22+
23+
clocktesting "k8s.io/utils/clock/testing"
24+
)
25+
26+
func TestLifecycleSignal(t *testing.T) {
27+
signalName := "mysignal"
28+
signaledAt := time.Now()
29+
clock := clocktesting.NewFakeClock(signaledAt)
30+
s := newNamedChannelWrapper(signalName, clock)
31+
32+
if s.Name() != signalName {
33+
t.Errorf("expected signal name to match: %q, but got: %q", signalName, s.Name())
34+
}
35+
if at := s.SignaledAt(); at != nil {
36+
t.Errorf("expected SignaledAt to return nil, but got: %v", *at)
37+
}
38+
select {
39+
case <-s.Signaled():
40+
t.Errorf("expected the lifecycle event to not be signaled initially")
41+
default:
42+
}
43+
44+
s.Signal()
45+
46+
if at := s.SignaledAt(); at == nil || !at.Equal(signaledAt) {
47+
t.Errorf("expected SignaledAt to return %v, but got: %v", signaledAt, at)
48+
}
49+
select {
50+
case <-s.Signaled():
51+
default:
52+
t.Errorf("expected the lifecycle event to be signaled")
53+
}
54+
}

0 commit comments

Comments
 (0)
Please sign in to comment.