Skip to content

Commit 636cc0b

Browse files
tnozickabertinatto
authored andcommitted
UPSTREAM: <carry>: Release lock on KCM and KS termination
UPSTREAM: <carry>: Force releasing the lock on exit for KS squash with UPSTREAM: <carry>: Release lock on KCM and KS termination OpenShift-Rebase-Source: fc91252 UPSTREAM: <carry>: Release lock on KCM and KS termination
1 parent f27a81d commit 636cc0b

File tree

4 files changed

+105
-12
lines changed

4 files changed

+105
-12
lines changed

cmd/kube-controller-manager/app/controllermanager.go

+36-11
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"k8s.io/apimachinery/pkg/util/sets"
3939
"k8s.io/apimachinery/pkg/util/uuid"
4040
"k8s.io/apimachinery/pkg/util/wait"
41+
"k8s.io/apiserver/pkg/server"
4142
"k8s.io/apiserver/pkg/server/healthz"
4243
"k8s.io/apiserver/pkg/server/mux"
4344
utilfeature "k8s.io/apiserver/pkg/util/feature"
@@ -152,7 +153,9 @@ controller, and serviceaccounts controller.`,
152153
// add feature enablement metrics
153154
fg := s.ComponentGlobalsRegistry.FeatureGateFor(featuregate.DefaultKubeComponent)
154155
fg.(featuregate.MutableFeatureGate).AddMetrics()
155-
return Run(context.Background(), c.Complete())
156+
157+
stopCh := server.SetupSignalHandler()
158+
return Run(context.Background(), c.Complete(), stopCh)
156159
},
157160
Args: func(cmd *cobra.Command, args []string) error {
158161
for _, arg := range args {
@@ -189,9 +192,9 @@ func ResyncPeriod(c *config.CompletedConfig) func() time.Duration {
189192
}
190193

191194
// Run runs the KubeControllerManagerOptions.
192-
func Run(ctx context.Context, c *config.CompletedConfig) error {
195+
func Run(ctx context.Context, c *config.CompletedConfig, stopCh2 <-chan struct{}) error {
193196
logger := klog.FromContext(ctx)
194-
stopCh := ctx.Done()
197+
stopCh := mergeCh(ctx.Done(), stopCh2)
195198

196199
// To help debugging, immediately log version
197200
logger.Info("Starting", "version", utilversion.Get())
@@ -348,10 +351,18 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
348351
run(ctx, controllerDescriptors)
349352
},
350353
OnStoppedLeading: func() {
351-
logger.Error(nil, "leaderelection lost")
352-
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
354+
select {
355+
case <-stopCh:
356+
// We were asked to terminate. Exit 0.
357+
klog.Info("Requested to terminate. Exiting.")
358+
os.Exit(0)
359+
default:
360+
// We lost the lock.
361+
logger.Error(nil, "leaderelection lost")
362+
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
363+
}
353364
},
354-
})
365+
}, stopCh)
355366

356367
// If Leader Migration is enabled, proceed to attempt the migration lock.
357368
if leaderMigrator != nil {
@@ -375,10 +386,18 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
375386
run(ctx, controllerDescriptors)
376387
},
377388
OnStoppedLeading: func() {
378-
logger.Error(nil, "migration leaderelection lost")
379-
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
389+
select {
390+
case <-stopCh:
391+
// We were asked to terminate. Exit 0.
392+
klog.Info("Requested to terminate. Exiting.")
393+
os.Exit(0)
394+
default:
395+
// We lost the lock.
396+
logger.Error(nil, "migration leaderelection lost")
397+
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
398+
}
380399
},
381-
})
400+
}, stopCh)
382401
}
383402

384403
<-stopCh
@@ -886,7 +905,7 @@ func createClientBuilders(c *config.CompletedConfig) (clientBuilder clientbuilde
886905

887906
// leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired.
888907
// TODO: extract this function into staging/controller-manager
889-
func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
908+
func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks, stopCh <-chan struct{}) {
890909
logger := klog.FromContext(ctx)
891910
rl, err := resourcelock.NewFromKubeconfig(resourceLock,
892911
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
@@ -902,7 +921,13 @@ func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdent
902921
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
903922
}
904923

905-
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
924+
leCtx, cancel := context.WithCancel(ctx)
925+
defer cancel()
926+
go func() {
927+
<-stopCh
928+
cancel()
929+
}()
930+
leaderelection.RunOrDie(leCtx, leaderelection.LeaderElectionConfig{
906931
Lock: rl,
907932
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
908933
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,

cmd/kube-controller-manager/app/patch.go

+14
Original file line numberDiff line numberDiff line change
@@ -164,3 +164,17 @@ func (rt *rejectIfNotReadyHeaderRT) RoundTrip(r *http.Request) (*http.Response,
164164
}
165165
return rt.baseRT.RoundTrip(r)
166166
}
167+
168+
// mergeCh takes two stop channels and return a single one that
169+
// closes as soon as one of the inputs closes or receives data.
170+
func mergeCh(stopCh1, stopCh2 <-chan struct{}) <-chan struct{} {
171+
merged := make(chan struct{})
172+
go func() {
173+
defer close(merged)
174+
select {
175+
case <-stopCh1:
176+
case <-stopCh2:
177+
}
178+
}()
179+
return merged
180+
}

cmd/kube-controller-manager/app/patch_test.go

+53
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,56 @@ type fakeRTFunc func(r *http.Request) (*http.Response, error)
7272
func (rt fakeRTFunc) RoundTrip(r *http.Request) (*http.Response, error) {
7373
return rt(r)
7474
}
75+
76+
func TestMergeCh(t *testing.T) {
77+
testCases := []struct {
78+
name string
79+
chan1 chan struct{}
80+
chan2 chan struct{}
81+
closeFn func(chan struct{}, chan struct{})
82+
}{
83+
{
84+
name: "chan1 gets closed",
85+
chan1: make(chan struct{}),
86+
chan2: make(chan struct{}),
87+
closeFn: func(a, b chan struct{}) {
88+
close(a)
89+
},
90+
},
91+
{
92+
name: "chan2 gets closed",
93+
chan1: make(chan struct{}),
94+
chan2: make(chan struct{}),
95+
closeFn: func(a, b chan struct{}) {
96+
close(b)
97+
},
98+
},
99+
{
100+
name: "both channels get closed",
101+
chan1: make(chan struct{}),
102+
chan2: make(chan struct{}),
103+
closeFn: func(a, b chan struct{}) {
104+
close(a)
105+
close(b)
106+
},
107+
},
108+
{
109+
name: "channel receives data and returned channel is closed",
110+
chan1: make(chan struct{}),
111+
chan2: make(chan struct{}),
112+
closeFn: func(a, b chan struct{}) {
113+
a <- struct{}{}
114+
},
115+
},
116+
}
117+
118+
for _, tc := range testCases {
119+
t.Run(tc.name, func(t *testing.T) {
120+
go tc.closeFn(tc.chan1, tc.chan2)
121+
merged := mergeCh(tc.chan1, tc.chan2)
122+
if _, ok := <-merged; ok {
123+
t.Fatalf("expected closed channel, got data")
124+
}
125+
})
126+
}
127+
}

cmd/kube-controller-manager/app/testing/testserver.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ func StartTestServer(ctx context.Context, customFlags []string) (result TestServ
122122
go func(ctx context.Context) {
123123
defer close(errCh)
124124

125-
if err := app.Run(ctx, config.Complete()); err != nil {
125+
stopCh := make(chan struct{})
126+
if err := app.Run(ctx, config.Complete(), stopCh); err != nil {
126127
errCh <- err
127128
}
128129
}(ctx)

0 commit comments

Comments
 (0)