Skip to content

Commit 85e9768

Browse files
stttsbertinatto
authored andcommitted
UPSTREAM: <carry>: create termination events
UPSTREAM: <carry>: apiserver: log new connections during termination UPSTREAM: <carry>: apiserver: create LateConnections events on events in the last 20% of graceful termination time UPSTREAM: <carry>: apiserver: log source in LateConnections event UPSTREAM: <carry>: apiserver: skip local IPs and probes for LateConnections UPSTREAM: <carry>: only create valid LateConnections/GracefulTermination events UPSTREAM: <carry>: kube-apiserver: log non-probe requests before ready UPSTREAM: <carry>: apiserver: create hasBeenReadyCh channel UPSTREAM: <carry>: kube-apiserver: log non-probe requests before ready UPSTREAM: <carry>: kube-apiserver: log non-probe requests before ready UPSTREAM: <carry>: fix termination event(s) validation failures UPSTREAM: <carry>: during the rebase collapse to create termination event it makes recording termination events a non-blocking operation. previously closing delayedStopCh might have been delayed on preserving data in the storage. the delayedStopCh is important as it signals the HTTP server to start the shutdown procedure. it also sets a hard timeout of 3 seconds for the storage layer since we are bypassing the API layer. UPSTREAM: <carry>: rename termination events to use lifecycleSignals OpenShift-Rebase-Source: 15b2d2e UPSTREAM: <carry>: extend termination events - we tie the shutdown events with the UID of the first (shutdown initiated), this provides us with a more deterministic way to compute shutdown duration from these events - move code snippets from the upstream file to openshift specific patch file, it reduces chance of code conflict
1 parent 8c3be76 commit 85e9768

File tree

5 files changed

+431
-2
lines changed

5 files changed

+431
-2
lines changed

Diff for: pkg/controlplane/apiserver/config.go

+8
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"k8s.io/kubernetes/openshift-kube-apiserver/admission/admissionenablement"
2929
"k8s.io/kubernetes/openshift-kube-apiserver/enablement"
3030
"k8s.io/kubernetes/openshift-kube-apiserver/openshiftkubeapiserver"
31+
eventstorage "k8s.io/kubernetes/pkg/registry/core/event/storage"
3132

3233
"k8s.io/apimachinery/pkg/api/meta"
3334
"k8s.io/apimachinery/pkg/runtime"
@@ -295,6 +296,13 @@ func CreateConfig(
295296
opts.Metrics.Apply()
296297
serviceaccount.RegisterMetrics()
297298

299+
var eventStorage *eventstorage.REST
300+
eventStorage, err := eventstorage.NewREST(genericConfig.RESTOptionsGetter, uint64(opts.EventTTL.Seconds()))
301+
if err != nil {
302+
return nil, nil, err
303+
}
304+
genericConfig.EventSink = eventRegistrySink{eventStorage}
305+
298306
config := &Config{
299307
Generic: genericConfig,
300308
Extra: Extra{

Diff for: pkg/controlplane/apiserver/patch_config.go

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package apiserver
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"time"
23+
24+
corev1 "k8s.io/api/core/v1"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apiserver/pkg/endpoints/request"
27+
genericapiserver "k8s.io/apiserver/pkg/server"
28+
"k8s.io/kubernetes/pkg/apis/core"
29+
v1 "k8s.io/kubernetes/pkg/apis/core/v1"
30+
eventstorage "k8s.io/kubernetes/pkg/registry/core/event/storage"
31+
)
32+
33+
// eventRegistrySink wraps an event registry in order to be used as direct event sync, without going through the API.
34+
type eventRegistrySink struct {
35+
*eventstorage.REST
36+
}
37+
38+
var _ genericapiserver.EventSink = eventRegistrySink{}
39+
40+
func (s eventRegistrySink) Create(v1event *corev1.Event) (*corev1.Event, error) {
41+
ctx := request.WithNamespace(request.WithRequestInfo(request.NewContext(), &request.RequestInfo{APIVersion: "v1"}), v1event.Namespace)
42+
// since we are bypassing the API set a hard timeout for the storage layer
43+
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
44+
defer cancel()
45+
46+
var event core.Event
47+
if err := v1.Convert_v1_Event_To_core_Event(v1event, &event, nil); err != nil {
48+
return nil, err
49+
}
50+
51+
obj, err := s.REST.Create(ctx, &event, nil, &metav1.CreateOptions{})
52+
if err != nil {
53+
return nil, err
54+
}
55+
ret, ok := obj.(*core.Event)
56+
if !ok {
57+
return nil, fmt.Errorf("expected corev1.Event, got %T", obj)
58+
}
59+
60+
var v1ret corev1.Event
61+
if err := v1.Convert_core_Event_To_v1_Event(ret, &v1ret, nil); err != nil {
62+
return nil, err
63+
}
64+
65+
return &v1ret, nil
66+
}

Diff for: staging/src/k8s.io/apiserver/pkg/server/config.go

+39
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ import (
7272
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
7373
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
7474
"k8s.io/client-go/informers"
75+
"k8s.io/client-go/kubernetes"
76+
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
7577
restclient "k8s.io/client-go/rest"
7678
"k8s.io/component-base/featuregate"
7779
"k8s.io/component-base/logs"
@@ -278,6 +280,9 @@ type Config struct {
278280
// rejected with a 429 status code and a 'Retry-After' response.
279281
ShutdownSendRetryAfter bool
280282

283+
// EventSink receives events about the life cycle of the API server, e.g. readiness, serving, signals and termination.
284+
EventSink EventSink
285+
281286
//===========================================================================
282287
// values below here are targets for removal
283288
//===========================================================================
@@ -714,6 +719,10 @@ func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedCo
714719
c.DiscoveryAddresses = discovery.DefaultAddresses{DefaultAddress: c.ExternalAddress}
715720
}
716721

722+
if c.EventSink == nil {
723+
c.EventSink = nullEventSink{}
724+
}
725+
717726
AuthorizeClientBearerToken(c.LoopbackClientConfig, &c.Authentication, &c.Authorization)
718727

719728
if c.RequestInfoResolver == nil {
@@ -741,6 +750,22 @@ func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedCo
741750
// Complete fills in any fields not set that are required to have valid data and can be derived
742751
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
743752
func (c *RecommendedConfig) Complete() CompletedConfig {
753+
if c.ClientConfig != nil {
754+
ref, err := eventReference()
755+
if err != nil {
756+
klog.Warningf("Failed to derive event reference, won't create events: %v", err)
757+
c.EventSink = nullEventSink{}
758+
} else {
759+
ns := ref.Namespace
760+
if len(ns) == 0 {
761+
ns = "default"
762+
}
763+
c.EventSink = clientEventSink{
764+
&v1.EventSinkImpl{Interface: kubernetes.NewForConfigOrDie(c.ClientConfig).CoreV1().Events(ns)},
765+
}
766+
}
767+
}
768+
744769
return c.Config.Complete(c.SharedInformerFactory)
745770
}
746771

@@ -843,7 +868,19 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
843868
FeatureGate: c.FeatureGate,
844869

845870
muxAndDiscoveryCompleteSignals: map[string]<-chan struct{}{},
871+
872+
OpenShiftGenericAPIServerPatch: OpenShiftGenericAPIServerPatch{
873+
eventSink: c.EventSink,
874+
},
875+
}
876+
877+
ref, err := eventReference()
878+
if err != nil {
879+
klog.Warningf("Failed to derive event reference, won't create events: %v", err)
880+
s.OpenShiftGenericAPIServerPatch.eventSink = nullEventSink{}
846881
}
882+
s.RegisterDestroyFunc(c.EventSink.Destroy)
883+
s.eventRef = ref
847884

848885
if c.FeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
849886
manager := c.AggregatedDiscoveryGroupManager
@@ -1048,6 +1085,8 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
10481085
handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
10491086
c.LongRunningFunc, c.Serializer, c.RequestTimeout)
10501087
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.NonLongRunningRequestWaitGroup)
1088+
handler = WithNonReadyRequestLogging(handler, c.lifecycleSignals.HasBeenReady)
1089+
handler = WithLateConnectionFilter(handler)
10511090
if c.ShutdownWatchTerminationGracePeriod > 0 {
10521091
handler = genericfilters.WithWatchTerminationDuringShutdown(handler, c.lifecycleSignals, c.WatchRequestWaitGroup)
10531092
}

Diff for: staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go

+34-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030

3131
"golang.org/x/time/rate"
3232
apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
33+
corev1 "k8s.io/api/core/v1"
3334
"k8s.io/apimachinery/pkg/api/meta"
3435
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3536
"k8s.io/apimachinery/pkg/runtime"
@@ -285,6 +286,9 @@ type GenericAPIServer struct {
285286
// This grace period is orthogonal to other grace periods, and
286287
// it is not overridden by any other grace period.
287288
ShutdownWatchTerminationGracePeriod time.Duration
289+
290+
// OpenShift patch
291+
OpenShiftGenericAPIServerPatch
288292
}
289293

290294
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
@@ -537,7 +541,10 @@ func (s preparedGenericAPIServer) RunWithContext(ctx context.Context) error {
537541

538542
go func() {
539543
defer delayedStopCh.Signal()
540-
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name())
544+
defer func() {
545+
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name())
546+
s.Eventf(corev1.EventTypeNormal, delayedStopCh.Name(), "The minimal shutdown duration of %v finished", s.ShutdownDelayDuration)
547+
}()
541548

542549
<-stopCh
543550

@@ -546,10 +553,28 @@ func (s preparedGenericAPIServer) RunWithContext(ctx context.Context) error {
546553
// and stop sending traffic to this server.
547554
shutdownInitiatedCh.Signal()
548555
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", shutdownInitiatedCh.Name())
556+
s.Eventf(corev1.EventTypeNormal, shutdownInitiatedCh.Name(), "Received signal to terminate, becoming unready, but keeping serving")
549557

550558
time.Sleep(s.ShutdownDelayDuration)
551559
}()
552560

561+
lateStopCh := make(chan struct{})
562+
if s.ShutdownDelayDuration > 0 {
563+
go func() {
564+
defer close(lateStopCh)
565+
566+
<-stopCh
567+
568+
time.Sleep(s.ShutdownDelayDuration * 8 / 10)
569+
}()
570+
}
571+
572+
s.SecureServingInfo.Listener = &terminationLoggingListener{
573+
Listener: s.SecureServingInfo.Listener,
574+
lateStopCh: lateStopCh,
575+
}
576+
unexpectedRequestsEventf.Store(s.Eventf)
577+
553578
// close socket after delayed stopCh
554579
shutdownTimeout := s.ShutdownTimeout
555580
if s.ShutdownSendRetryAfter {
@@ -598,13 +623,17 @@ func (s preparedGenericAPIServer) RunWithContext(ctx context.Context) error {
598623
<-listenerStoppedCh
599624
httpServerStoppedListeningCh.Signal()
600625
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", httpServerStoppedListeningCh.Name())
626+
s.Eventf(corev1.EventTypeNormal, httpServerStoppedListeningCh.Name(), "HTTP Server has stopped listening")
601627
}()
602628

603629
// we don't accept new request as soon as both ShutdownDelayDuration has
604630
// elapsed and preshutdown hooks have completed.
605631
preShutdownHooksHasStoppedCh := s.lifecycleSignals.PreShutdownHooksStopped
606632
go func() {
607-
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", notAcceptingNewRequestCh.Name())
633+
defer func() {
634+
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", notAcceptingNewRequestCh.Name())
635+
s.Eventf(corev1.EventTypeNormal, drainedCh.Name(), "All non long-running request(s) in-flight have drained")
636+
}()
608637
defer notAcceptingNewRequestCh.Signal()
609638

610639
// wait for the delayed stopCh before closing the handler chain
@@ -691,6 +720,7 @@ func (s preparedGenericAPIServer) RunWithContext(ctx context.Context) error {
691720
defer func() {
692721
preShutdownHooksHasStoppedCh.Signal()
693722
klog.V(1).InfoS("[graceful-termination] pre-shutdown hooks completed", "name", preShutdownHooksHasStoppedCh.Name())
723+
s.Eventf(corev1.EventTypeNormal, "TerminationPreShutdownHooksFinished", "All pre-shutdown hooks have been finished")
694724
}()
695725
err = s.RunPreShutdownHooks()
696726
}()
@@ -711,6 +741,8 @@ func (s preparedGenericAPIServer) RunWithContext(ctx context.Context) error {
711741
<-stoppedCh
712742

713743
klog.V(1).Info("[graceful-termination] apiserver is exiting")
744+
s.Eventf(corev1.EventTypeNormal, "TerminationGracefulTerminationFinished", "All pending requests processed")
745+
714746
return nil
715747
}
716748

0 commit comments

Comments
 (0)