Skip to content

Commit e727239

Browse files
[release-0.19] 🐛 Refactor certificate watcher to use polling, instead of fsnotify (#3023)
* Reestablish watch for the certificate paths * Remove fsnotify and use cached read watcher * Simplify return * 🐛Fix certwatcher test to be backwards compatible --------- Co-authored-by: Maxim Muzafarov <[email protected]>
1 parent bfd1cf9 commit e727239

File tree

8 files changed

+101
-120
lines changed

8 files changed

+101
-120
lines changed

examples/scratch-env/go.mod

-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ require (
1414
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
1515
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
1616
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
17-
github.com/fsnotify/fsnotify v1.7.0 // indirect
1817
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
1918
github.com/go-logr/logr v1.4.2 // indirect
2019
github.com/go-logr/zapr v1.3.0 // indirect

examples/scratch-env/go.sum

-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8
1313
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
1414
github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg=
1515
github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ=
16-
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
17-
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
1816
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
1917
github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
2018
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ go 1.22.0
44

55
require (
66
github.com/evanphx/json-patch/v5 v5.9.0
7-
github.com/fsnotify/fsnotify v1.7.0
87
github.com/go-logr/logr v1.4.2
98
github.com/go-logr/zapr v1.3.0
109
github.com/google/go-cmp v0.6.0
@@ -40,6 +39,7 @@ require (
4039
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
4140
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
4241
github.com/felixge/httpsnoop v1.0.4 // indirect
42+
github.com/fsnotify/fsnotify v1.7.0 // indirect
4343
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
4444
github.com/go-logr/stdr v1.2.2 // indirect
4545
github.com/go-openapi/jsonpointer v0.19.6 // indirect

pkg/certwatcher/certwatcher.go

+57-104
Original file line numberDiff line numberDiff line change
@@ -17,58 +17,55 @@ limitations under the License.
1717
package certwatcher
1818

1919
import (
20+
"bytes"
2021
"context"
2122
"crypto/tls"
22-
"fmt"
23+
"os"
2324
"sync"
2425
"time"
2526

26-
"github.com/fsnotify/fsnotify"
27-
kerrors "k8s.io/apimachinery/pkg/util/errors"
28-
"k8s.io/apimachinery/pkg/util/sets"
29-
"k8s.io/apimachinery/pkg/util/wait"
3027
"sigs.k8s.io/controller-runtime/pkg/certwatcher/metrics"
3128
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
3229
)
3330

3431
var log = logf.RuntimeLog.WithName("certwatcher")
3532

36-
// CertWatcher watches certificate and key files for changes. When either file
37-
// changes, it reads and parses both and calls an optional callback with the new
38-
// certificate.
33+
const defaultWatchInterval = 10 * time.Second
34+
35+
// CertWatcher watches certificate and key files for changes.
36+
// It always returns the cached version,
37+
// but periodically reads and parses certificate and key for changes
38+
// and calls an optional callback with the new certificate.
3939
type CertWatcher struct {
4040
sync.RWMutex
4141

4242
currentCert *tls.Certificate
43-
watcher *fsnotify.Watcher
43+
interval time.Duration
4444

4545
certPath string
4646
keyPath string
4747

48+
cachedKeyPEMBlock []byte
49+
4850
// callback is a function to be invoked when the certificate changes.
4951
callback func(tls.Certificate)
5052
}
5153

5254
// New returns a new CertWatcher watching the given certificate and key.
5355
func New(certPath, keyPath string) (*CertWatcher, error) {
54-
var err error
55-
5656
cw := &CertWatcher{
5757
certPath: certPath,
5858
keyPath: keyPath,
59+
interval: defaultWatchInterval,
5960
}
6061

61-
// Initial read of certificate and key.
62-
if err := cw.ReadCertificate(); err != nil {
63-
return nil, err
64-
}
65-
66-
cw.watcher, err = fsnotify.NewWatcher()
67-
if err != nil {
68-
return nil, err
69-
}
62+
return cw, cw.ReadCertificate()
63+
}
7064

71-
return cw, nil
65+
// WithWatchInterval sets the watch interval and returns the CertWatcher pointer
66+
func (cw *CertWatcher) WithWatchInterval(interval time.Duration) *CertWatcher {
67+
cw.interval = interval
68+
return cw
7269
}
7370

7471
// RegisterCallback registers a callback to be invoked when the certificate changes.
@@ -91,72 +88,64 @@ func (cw *CertWatcher) GetCertificate(_ *tls.ClientHelloInfo) (*tls.Certificate,
9188

9289
// Start starts the watch on the certificate and key files.
9390
func (cw *CertWatcher) Start(ctx context.Context) error {
94-
files := sets.New(cw.certPath, cw.keyPath)
95-
96-
{
97-
var watchErr error
98-
if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
99-
for _, f := range files.UnsortedList() {
100-
if err := cw.watcher.Add(f); err != nil {
101-
watchErr = err
102-
return false, nil //nolint:nilerr // We want to keep trying.
103-
}
104-
// We've added the watch, remove it from the set.
105-
files.Delete(f)
106-
}
107-
return true, nil
108-
}); err != nil {
109-
return fmt.Errorf("failed to add watches: %w", kerrors.NewAggregate([]error{err, watchErr}))
110-
}
111-
}
112-
113-
go cw.Watch()
91+
ticker := time.NewTicker(cw.interval)
92+
defer ticker.Stop()
11493

11594
log.Info("Starting certificate watcher")
116-
117-
// Block until the context is done.
118-
<-ctx.Done()
119-
120-
return cw.watcher.Close()
121-
}
122-
123-
// Watch reads events from the watcher's channel and reacts to changes.
124-
func (cw *CertWatcher) Watch() {
12595
for {
12696
select {
127-
case event, ok := <-cw.watcher.Events:
128-
// Channel is closed.
129-
if !ok {
130-
return
97+
case <-ctx.Done():
98+
return nil
99+
case <-ticker.C:
100+
if err := cw.ReadCertificate(); err != nil {
101+
log.Error(err, "failed read certificate")
131102
}
103+
}
104+
}
105+
}
132106

133-
cw.handleEvent(event)
134-
135-
case err, ok := <-cw.watcher.Errors:
136-
// Channel is closed.
137-
if !ok {
138-
return
139-
}
107+
// updateCachedCertificate checks if the new certificate differs from the cache,
108+
// updates it and returns the result if it was updated or not
109+
func (cw *CertWatcher) updateCachedCertificate(cert *tls.Certificate, keyPEMBlock []byte) bool {
110+
cw.Lock()
111+
defer cw.Unlock()
140112

141-
log.Error(err, "certificate watch error")
142-
}
113+
if cw.currentCert != nil &&
114+
bytes.Equal(cw.currentCert.Certificate[0], cert.Certificate[0]) &&
115+
bytes.Equal(cw.cachedKeyPEMBlock, keyPEMBlock) {
116+
log.V(7).Info("certificate already cached")
117+
return false
143118
}
119+
cw.currentCert = cert
120+
cw.cachedKeyPEMBlock = keyPEMBlock
121+
return true
144122
}
145123

146124
// ReadCertificate reads the certificate and key files from disk, parses them,
147-
// and updates the current certificate on the watcher. If a callback is set, it
125+
// and updates the current certificate on the watcher if updated. If a callback is set, it
148126
// is invoked with the new certificate.
149127
func (cw *CertWatcher) ReadCertificate() error {
150128
metrics.ReadCertificateTotal.Inc()
151-
cert, err := tls.LoadX509KeyPair(cw.certPath, cw.keyPath)
129+
certPEMBlock, err := os.ReadFile(cw.certPath)
130+
if err != nil {
131+
metrics.ReadCertificateErrors.Inc()
132+
return err
133+
}
134+
keyPEMBlock, err := os.ReadFile(cw.keyPath)
152135
if err != nil {
153136
metrics.ReadCertificateErrors.Inc()
154137
return err
155138
}
156139

157-
cw.Lock()
158-
cw.currentCert = &cert
159-
cw.Unlock()
140+
cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
141+
if err != nil {
142+
metrics.ReadCertificateErrors.Inc()
143+
return err
144+
}
145+
146+
if !cw.updateCachedCertificate(&cert, keyPEMBlock) {
147+
return nil
148+
}
160149

161150
log.Info("Updated current TLS certificate")
162151

@@ -170,39 +159,3 @@ func (cw *CertWatcher) ReadCertificate() error {
170159
}
171160
return nil
172161
}
173-
174-
func (cw *CertWatcher) handleEvent(event fsnotify.Event) {
175-
// Only care about events which may modify the contents of the file.
176-
if !(isWrite(event) || isRemove(event) || isCreate(event) || isChmod(event)) {
177-
return
178-
}
179-
180-
log.V(1).Info("certificate event", "event", event)
181-
182-
// If the file was removed or renamed, re-add the watch to the previous name
183-
if isRemove(event) || isChmod(event) {
184-
if err := cw.watcher.Add(event.Name); err != nil {
185-
log.Error(err, "error re-watching file")
186-
}
187-
}
188-
189-
if err := cw.ReadCertificate(); err != nil {
190-
log.Error(err, "error re-reading certificate")
191-
}
192-
}
193-
194-
func isWrite(event fsnotify.Event) bool {
195-
return event.Op.Has(fsnotify.Write)
196-
}
197-
198-
func isCreate(event fsnotify.Event) bool {
199-
return event.Op.Has(fsnotify.Create)
200-
}
201-
202-
func isRemove(event fsnotify.Event) bool {
203-
return event.Op.Has(fsnotify.Remove)
204-
}
205-
206-
func isChmod(event fsnotify.Event) bool {
207-
return event.Op.Has(fsnotify.Chmod)
208-
}

pkg/certwatcher/certwatcher_suite_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
. "github.com/onsi/ginkgo/v2"
2424
. "github.com/onsi/gomega"
25+
2526
logf "sigs.k8s.io/controller-runtime/pkg/log"
2627
"sigs.k8s.io/controller-runtime/pkg/log/zap"
2728
)

pkg/certwatcher/certwatcher_test.go

+40-11
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package certwatcher_test
1818

1919
import (
20+
"bytes"
2021
"context"
2122
"crypto/rand"
2223
"crypto/rsa"
@@ -34,6 +35,7 @@ import (
3435
. "github.com/onsi/ginkgo/v2"
3536
. "github.com/onsi/gomega"
3637
"github.com/prometheus/client_golang/prometheus/testutil"
38+
3739
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
3840
"sigs.k8s.io/controller-runtime/pkg/certwatcher/metrics"
3941
)
@@ -80,7 +82,7 @@ var _ = Describe("CertWatcher", func() {
8082
go func() {
8183
defer GinkgoRecover()
8284
defer close(doneCh)
83-
Expect(watcher.Start(ctx)).To(Succeed())
85+
Expect(watcher.WithWatchInterval(time.Second).Start(ctx)).To(Succeed())
8486
}()
8587
// wait till we read first cert
8688
Eventually(func() error {
@@ -113,7 +115,7 @@ var _ = Describe("CertWatcher", func() {
113115
Eventually(func() bool {
114116
secondcert, _ := watcher.GetCertificate(nil)
115117
first := firstcert.PrivateKey.(*rsa.PrivateKey)
116-
return first.Equal(secondcert.PrivateKey)
118+
return first.Equal(secondcert.PrivateKey) || bytes.Equal(firstcert.Certificate[0], secondcert.Certificate[0])
117119
}).ShouldNot(BeTrue())
118120

119121
ctxCancel()
@@ -143,14 +145,41 @@ var _ = Describe("CertWatcher", func() {
143145
Eventually(func() bool {
144146
secondcert, _ := watcher.GetCertificate(nil)
145147
first := firstcert.PrivateKey.(*rsa.PrivateKey)
146-
return first.Equal(secondcert.PrivateKey)
148+
return first.Equal(secondcert.PrivateKey) || bytes.Equal(firstcert.Certificate[0], secondcert.Certificate[0])
147149
}).ShouldNot(BeTrue())
148150

149151
ctxCancel()
150152
Eventually(doneCh, "4s").Should(BeClosed())
151153
Expect(called.Load()).To(BeNumerically(">=", 1))
152154
})
153155

156+
It("should reload currentCert after move out", func() {
157+
doneCh := startWatcher()
158+
called := atomic.Int64{}
159+
watcher.RegisterCallback(func(crt tls.Certificate) {
160+
called.Add(1)
161+
Expect(crt.Certificate).ToNot(BeEmpty())
162+
})
163+
164+
firstcert, _ := watcher.GetCertificate(nil)
165+
166+
Expect(os.Rename(certPath, certPath+".old")).To(Succeed())
167+
Expect(os.Rename(keyPath, keyPath+".old")).To(Succeed())
168+
169+
err := writeCerts(certPath, keyPath, "192.168.0.3")
170+
Expect(err).ToNot(HaveOccurred())
171+
172+
Eventually(func() bool {
173+
secondcert, _ := watcher.GetCertificate(nil)
174+
first := firstcert.PrivateKey.(*rsa.PrivateKey)
175+
return first.Equal(secondcert.PrivateKey) || bytes.Equal(firstcert.Certificate[0], secondcert.Certificate[0])
176+
}, "10s", "1s").ShouldNot(BeTrue())
177+
178+
ctxCancel()
179+
Eventually(doneCh, "4s").Should(BeClosed())
180+
Expect(called.Load()).To(BeNumerically(">=", 1))
181+
})
182+
154183
Context("prometheus metric read_certificate_total", func() {
155184
var readCertificateTotalBefore float64
156185
var readCertificateErrorsBefore float64
@@ -165,8 +194,8 @@ var _ = Describe("CertWatcher", func() {
165194

166195
Eventually(func() error {
167196
readCertificateTotalAfter := testutil.ToFloat64(metrics.ReadCertificateTotal)
168-
if readCertificateTotalAfter != readCertificateTotalBefore+1.0 {
169-
return fmt.Errorf("metric read certificate total expected: %v and got: %v", readCertificateTotalBefore+1.0, readCertificateTotalAfter)
197+
if readCertificateTotalAfter < readCertificateTotalBefore+1.0 {
198+
return fmt.Errorf("metric read certificate total expected at least: %v and got: %v", readCertificateTotalBefore+1.0, readCertificateTotalAfter)
170199
}
171200
return nil
172201
}, "4s").Should(Succeed())
@@ -180,8 +209,8 @@ var _ = Describe("CertWatcher", func() {
180209

181210
Eventually(func() error {
182211
readCertificateTotalAfter := testutil.ToFloat64(metrics.ReadCertificateTotal)
183-
if readCertificateTotalAfter != readCertificateTotalBefore+1.0 {
184-
return fmt.Errorf("metric read certificate total expected: %v and got: %v", readCertificateTotalBefore+1.0, readCertificateTotalAfter)
212+
if readCertificateTotalAfter < readCertificateTotalBefore+1.0 {
213+
return fmt.Errorf("metric read certificate total expected at least: %v and got: %v", readCertificateTotalBefore+1.0, readCertificateTotalAfter)
185214
}
186215
readCertificateTotalBefore = readCertificateTotalAfter
187216
return nil
@@ -192,15 +221,15 @@ var _ = Describe("CertWatcher", func() {
192221
// Note, we are checking two errors here, because os.Remove generates two fsnotify events: Chmod + Remove
193222
Eventually(func() error {
194223
readCertificateTotalAfter := testutil.ToFloat64(metrics.ReadCertificateTotal)
195-
if readCertificateTotalAfter != readCertificateTotalBefore+2.0 {
196-
return fmt.Errorf("metric read certificate total expected: %v and got: %v", readCertificateTotalBefore+2.0, readCertificateTotalAfter)
224+
if readCertificateTotalAfter < readCertificateTotalBefore+2.0 {
225+
return fmt.Errorf("metric read certificate total expected at least: %v and got: %v", readCertificateTotalBefore+2.0, readCertificateTotalAfter)
197226
}
198227
return nil
199228
}, "4s").Should(Succeed())
200229
Eventually(func() error {
201230
readCertificateErrorsAfter := testutil.ToFloat64(metrics.ReadCertificateErrors)
202-
if readCertificateErrorsAfter != readCertificateErrorsBefore+2.0 {
203-
return fmt.Errorf("metric read certificate errors expected: %v and got: %v", readCertificateErrorsBefore+2.0, readCertificateErrorsAfter)
231+
if readCertificateErrorsAfter < readCertificateErrorsBefore+2.0 {
232+
return fmt.Errorf("metric read certificate errors expected at least: %v and got: %v", readCertificateErrorsBefore+2.0, readCertificateErrorsAfter)
204233
}
205234
return nil
206235
}, "4s").Should(Succeed())

pkg/certwatcher/example_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func Example() {
3939
panic(err)
4040
}
4141

42-
// Start goroutine with certwatcher running fsnotify against supplied certdir
42+
// Start goroutine with certwatcher running against supplied cert
4343
go func() {
4444
if err := watcher.Start(ctx); err != nil {
4545
panic(err)

0 commit comments

Comments
 (0)