Skip to content

Commit 5f14610

Browse files
tnozickabertinatto
authored andcommitted
UPSTREAM: <carry>: Release lock on KCM and KS termination
UPSTREAM: <carry>: Force releasing the lock on exit for KS squash with UPSTREAM: <carry>: Release lock on KCM and KS termination OpenShift-Rebase-Source: fc91252 UPSTREAM: <carry>: Release lock on KCM and KS termination
1 parent 8d01c1c commit 5f14610

File tree

4 files changed

+105
-12
lines changed

4 files changed

+105
-12
lines changed

cmd/kube-controller-manager/app/controllermanager.go

+36-11
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"k8s.io/apimachinery/pkg/util/sets"
3939
"k8s.io/apimachinery/pkg/util/uuid"
4040
"k8s.io/apimachinery/pkg/util/wait"
41+
"k8s.io/apiserver/pkg/server"
4142
"k8s.io/apiserver/pkg/server/healthz"
4243
"k8s.io/apiserver/pkg/server/mux"
4344
utilfeature "k8s.io/apiserver/pkg/util/feature"
@@ -155,7 +156,9 @@ controller, and serviceaccounts controller.`,
155156
// add feature enablement metrics
156157
fg := s.ComponentGlobalsRegistry.FeatureGateFor(basecompatibility.DefaultKubeComponent)
157158
fg.(featuregate.MutableFeatureGate).AddMetrics()
158-
return Run(context.Background(), c.Complete())
159+
160+
stopCh := server.SetupSignalHandler()
161+
return Run(context.Background(), c.Complete(), stopCh)
159162
},
160163
Args: func(cmd *cobra.Command, args []string) error {
161164
for _, arg := range args {
@@ -192,9 +195,9 @@ func ResyncPeriod(c *config.CompletedConfig) func() time.Duration {
192195
}
193196

194197
// Run runs the KubeControllerManagerOptions.
195-
func Run(ctx context.Context, c *config.CompletedConfig) error {
198+
func Run(ctx context.Context, c *config.CompletedConfig, stopCh2 <-chan struct{}) error {
196199
logger := klog.FromContext(ctx)
197-
stopCh := ctx.Done()
200+
stopCh := mergeCh(ctx.Done(), stopCh2)
198201

199202
// To help debugging, immediately log version
200203
logger.Info("Starting", "version", utilversion.Get())
@@ -360,10 +363,18 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
360363
run(ctx, controllerDescriptors)
361364
},
362365
OnStoppedLeading: func() {
363-
logger.Error(nil, "leaderelection lost")
364-
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
366+
select {
367+
case <-stopCh:
368+
// We were asked to terminate. Exit 0.
369+
klog.Info("Requested to terminate. Exiting.")
370+
os.Exit(0)
371+
default:
372+
// We lost the lock.
373+
logger.Error(nil, "leaderelection lost")
374+
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
375+
}
365376
},
366-
})
377+
}, stopCh)
367378

368379
// If Leader Migration is enabled, proceed to attempt the migration lock.
369380
if leaderMigrator != nil {
@@ -387,10 +398,18 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
387398
run(ctx, controllerDescriptors)
388399
},
389400
OnStoppedLeading: func() {
390-
logger.Error(nil, "migration leaderelection lost")
391-
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
401+
select {
402+
case <-stopCh:
403+
// We were asked to terminate. Exit 0.
404+
klog.Info("Requested to terminate. Exiting.")
405+
os.Exit(0)
406+
default:
407+
// We lost the lock.
408+
logger.Error(nil, "migration leaderelection lost")
409+
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
410+
}
392411
},
393-
})
412+
}, stopCh)
394413
}
395414

396415
<-stopCh
@@ -900,7 +919,7 @@ func createClientBuilders(c *config.CompletedConfig) (clientBuilder clientbuilde
900919

901920
// leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired.
902921
// TODO: extract this function into staging/controller-manager
903-
func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
922+
func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks, stopCh <-chan struct{}) {
904923
logger := klog.FromContext(ctx)
905924
rl, err := resourcelock.NewFromKubeconfig(resourceLock,
906925
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
@@ -916,7 +935,13 @@ func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdent
916935
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
917936
}
918937

919-
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
938+
leCtx, cancel := context.WithCancel(ctx)
939+
defer cancel()
940+
go func() {
941+
<-stopCh
942+
cancel()
943+
}()
944+
leaderelection.RunOrDie(leCtx, leaderelection.LeaderElectionConfig{
920945
Lock: rl,
921946
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
922947
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,

cmd/kube-controller-manager/app/patch.go

+14
Original file line numberDiff line numberDiff line change
@@ -164,3 +164,17 @@ func (rt *rejectIfNotReadyHeaderRT) RoundTrip(r *http.Request) (*http.Response,
164164
}
165165
return rt.baseRT.RoundTrip(r)
166166
}
167+
168+
// mergeCh takes two stop channels and return a single one that
169+
// closes as soon as one of the inputs closes or receives data.
170+
func mergeCh(stopCh1, stopCh2 <-chan struct{}) <-chan struct{} {
171+
merged := make(chan struct{})
172+
go func() {
173+
defer close(merged)
174+
select {
175+
case <-stopCh1:
176+
case <-stopCh2:
177+
}
178+
}()
179+
return merged
180+
}

cmd/kube-controller-manager/app/patch_test.go

+53
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,56 @@ type fakeRTFunc func(r *http.Request) (*http.Response, error)
7272
func (rt fakeRTFunc) RoundTrip(r *http.Request) (*http.Response, error) {
7373
return rt(r)
7474
}
75+
76+
func TestMergeCh(t *testing.T) {
77+
testCases := []struct {
78+
name string
79+
chan1 chan struct{}
80+
chan2 chan struct{}
81+
closeFn func(chan struct{}, chan struct{})
82+
}{
83+
{
84+
name: "chan1 gets closed",
85+
chan1: make(chan struct{}),
86+
chan2: make(chan struct{}),
87+
closeFn: func(a, b chan struct{}) {
88+
close(a)
89+
},
90+
},
91+
{
92+
name: "chan2 gets closed",
93+
chan1: make(chan struct{}),
94+
chan2: make(chan struct{}),
95+
closeFn: func(a, b chan struct{}) {
96+
close(b)
97+
},
98+
},
99+
{
100+
name: "both channels get closed",
101+
chan1: make(chan struct{}),
102+
chan2: make(chan struct{}),
103+
closeFn: func(a, b chan struct{}) {
104+
close(a)
105+
close(b)
106+
},
107+
},
108+
{
109+
name: "channel receives data and returned channel is closed",
110+
chan1: make(chan struct{}),
111+
chan2: make(chan struct{}),
112+
closeFn: func(a, b chan struct{}) {
113+
a <- struct{}{}
114+
},
115+
},
116+
}
117+
118+
for _, tc := range testCases {
119+
t.Run(tc.name, func(t *testing.T) {
120+
go tc.closeFn(tc.chan1, tc.chan2)
121+
merged := mergeCh(tc.chan1, tc.chan2)
122+
if _, ok := <-merged; ok {
123+
t.Fatalf("expected closed channel, got data")
124+
}
125+
})
126+
}
127+
}

cmd/kube-controller-manager/app/testing/testserver.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ func StartTestServer(ctx context.Context, customFlags []string) (result TestServ
123123
go func(ctx context.Context) {
124124
defer close(errCh)
125125

126-
if err := app.Run(ctx, config.Complete()); err != nil {
126+
stopCh := make(chan struct{})
127+
if err := app.Run(ctx, config.Complete(), stopCh); err != nil {
127128
errCh <- err
128129
}
129130
}(ctx)

0 commit comments

Comments
 (0)