Skip to content

Commit 3232abc

Browse files
authored
Refactor node drain (#11074)
1 parent 703cc70 commit 3232abc

File tree

12 files changed

+2350
-217
lines changed

12 files changed

+2350
-217
lines changed

controllers/alias.go

+4-9
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package controllers
1818

1919
import (
2020
"context"
21-
"time"
2221

2322
ctrl "sigs.k8s.io/controller-runtime"
2423
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -65,18 +64,14 @@ type MachineReconciler struct {
6564

6665
// WatchFilterValue is the label value used to filter events prior to reconciliation.
6766
WatchFilterValue string
68-
69-
// NodeDrainClientTimeout timeout of the client used for draining nodes.
70-
NodeDrainClientTimeout time.Duration
7167
}
7268

7369
func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
7470
return (&machinecontroller.Reconciler{
75-
Client: r.Client,
76-
APIReader: r.APIReader,
77-
Tracker: r.Tracker,
78-
WatchFilterValue: r.WatchFilterValue,
79-
NodeDrainClientTimeout: r.NodeDrainClientTimeout,
71+
Client: r.Client,
72+
APIReader: r.APIReader,
73+
Tracker: r.Tracker,
74+
WatchFilterValue: r.WatchFilterValue,
8075
}).SetupWithManager(ctx, mgr, options)
8176
}
8277

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ require (
3333
github.com/valyala/fastjson v1.6.4
3434
go.etcd.io/etcd/api/v3 v3.5.15
3535
go.etcd.io/etcd/client/v3 v3.5.15
36+
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
3637
golang.org/x/oauth2 v0.22.0
3738
golang.org/x/text v0.17.0
3839
gomodules.xyz/jsonpatch/v2 v2.4.0
@@ -156,7 +157,6 @@ require (
156157
go.uber.org/zap v1.27.0 // indirect
157158
go4.org v0.0.0-20201209231011-d4a079459e60 // indirect
158159
golang.org/x/crypto v0.26.0 // indirect
159-
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
160160
golang.org/x/net v0.28.0 // indirect
161161
golang.org/x/sync v0.8.0 // indirect
162162
golang.org/x/sys v0.23.0 // indirect
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package drain
18+
19+
import (
20+
"time"
21+
22+
"k8s.io/apimachinery/pkg/types"
23+
"k8s.io/client-go/tools/cache"
24+
)
25+
26+
const (
27+
// ttl is the duration for which we keep entries in the cache.
28+
ttl = 10 * time.Minute
29+
30+
// expirationInterval is the interval in which we will remove expired entries
31+
// from the cache.
32+
expirationInterval = 10 * time.Hour
33+
)
34+
35+
// CacheEntry is an entry of the drain cache. It stores at which time a Machine was drained the last time.
36+
type CacheEntry struct {
37+
Machine types.NamespacedName
38+
LastDrain time.Time
39+
}
40+
41+
// Cache caches the time when the last drain was done for a Machine.
42+
// Specifically we only use it to ensure we only retry drains
43+
// at a specific interval and not more often.
44+
type Cache interface {
45+
// Add adds the given entry to the Cache.
46+
// Note: entries expire after the ttl.
47+
Add(entry CacheEntry)
48+
49+
// Has checks if the given key (still) exists in the Cache.
50+
// Note: entries expire after the ttl.
51+
Has(machineName types.NamespacedName) (CacheEntry, bool)
52+
}
53+
54+
// NewCache creates a new cache.
55+
func NewCache() Cache {
56+
r := &drainCache{
57+
Store: cache.NewTTLStore(func(obj interface{}) (string, error) {
58+
// We only add CacheEntries to the cache, so it's safe to cast to CacheEntry.
59+
return obj.(CacheEntry).Machine.String(), nil
60+
}, ttl),
61+
}
62+
go func() {
63+
for {
64+
// Call list to clear the cache of expired items.
65+
// We have to do this periodically as the cache itself only expires
66+
// items lazily. If we don't do this the cache grows indefinitely.
67+
r.List()
68+
69+
time.Sleep(expirationInterval)
70+
}
71+
}()
72+
return r
73+
}
74+
75+
type drainCache struct {
76+
cache.Store
77+
}
78+
79+
// Add adds the given entry to the Cache.
80+
// Note: entries expire after the ttl.
81+
func (r *drainCache) Add(entry CacheEntry) {
82+
// Note: We can ignore the error here because by only allowing CacheEntries
83+
// and providing the corresponding keyFunc ourselves we can guarantee that
84+
// the error never occurs.
85+
_ = r.Store.Add(entry)
86+
}
87+
88+
// Has checks if the given key (still) exists in the Cache.
89+
// Note: entries expire after the ttl.
90+
func (r *drainCache) Has(machineName types.NamespacedName) (CacheEntry, bool) {
91+
// Note: We can ignore the error here because GetByKey never returns an error.
92+
item, exists, _ := r.Store.GetByKey(machineName.String())
93+
if exists {
94+
return item.(CacheEntry), true
95+
}
96+
return CacheEntry{}, false
97+
}

0 commit comments

Comments
 (0)