Skip to content

Commit 711b48b

Browse files
vinceprisbueringer
andauthored
🌱 Add fsnotify watcher+polling (#3050)
* Add fsnotify watcher+polling Signed-off-by: Vince Prignano <[email protected]> * adjust tests --------- Signed-off-by: Vince Prignano <[email protected]> Co-authored-by: Stefan Bueringer <[email protected]>
1 parent 1ac370e commit 711b48b

File tree

5 files changed

+102
-12
lines changed

5 files changed

+102
-12
lines changed

examples/scratch-env/go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ require (
1414
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
1515
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
1616
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
17+
github.com/fsnotify/fsnotify v1.7.0 // indirect
1718
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
1819
github.com/go-logr/logr v1.4.2 // indirect
1920
github.com/go-logr/zapr v1.3.0 // indirect

examples/scratch-env/go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8
1313
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
1414
github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg=
1515
github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ=
16+
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
17+
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
1618
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
1719
github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
1820
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.23.0
44

55
require (
66
github.com/evanphx/json-patch/v5 v5.9.0
7+
github.com/fsnotify/fsnotify v1.7.0
78
github.com/go-logr/logr v1.4.2
89
github.com/go-logr/zapr v1.3.0
910
github.com/google/btree v1.1.3
@@ -42,7 +43,6 @@ require (
4243
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
4344
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
4445
github.com/felixge/httpsnoop v1.0.4 // indirect
45-
github.com/fsnotify/fsnotify v1.7.0 // indirect
4646
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
4747
github.com/go-logr/stdr v1.2.2 // indirect
4848
github.com/go-openapi/jsonpointer v0.21.0 // indirect

pkg/certwatcher/certwatcher.go

+84-3
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,15 @@ import (
2020
"bytes"
2121
"context"
2222
"crypto/tls"
23+
"fmt"
2324
"os"
2425
"sync"
2526
"time"
2627

28+
"github.com/fsnotify/fsnotify"
29+
kerrors "k8s.io/apimachinery/pkg/util/errors"
30+
"k8s.io/apimachinery/pkg/util/sets"
31+
"k8s.io/apimachinery/pkg/util/wait"
2732
"sigs.k8s.io/controller-runtime/pkg/certwatcher/metrics"
2833
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
2934
)
@@ -40,6 +45,7 @@ type CertWatcher struct {
4045
sync.RWMutex
4146

4247
currentCert *tls.Certificate
48+
watcher *fsnotify.Watcher
4349
interval time.Duration
4450

4551
certPath string
@@ -53,13 +59,25 @@ type CertWatcher struct {
5359

5460
// New returns a new CertWatcher watching the given certificate and key.
5561
func New(certPath, keyPath string) (*CertWatcher, error) {
62+
var err error
63+
5664
cw := &CertWatcher{
5765
certPath: certPath,
5866
keyPath: keyPath,
5967
interval: defaultWatchInterval,
6068
}
6169

62-
return cw, cw.ReadCertificate()
70+
// Initial read of certificate and key.
71+
if err := cw.ReadCertificate(); err != nil {
72+
return nil, err
73+
}
74+
75+
cw.watcher, err = fsnotify.NewWatcher()
76+
if err != nil {
77+
return nil, err
78+
}
79+
80+
return cw, nil
6381
}
6482

6583
// WithWatchInterval sets the watch interval and returns the CertWatcher pointer
@@ -88,14 +106,35 @@ func (cw *CertWatcher) GetCertificate(_ *tls.ClientHelloInfo) (*tls.Certificate,
88106

89107
// Start starts the watch on the certificate and key files.
90108
func (cw *CertWatcher) Start(ctx context.Context) error {
109+
files := sets.New(cw.certPath, cw.keyPath)
110+
111+
{
112+
var watchErr error
113+
if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
114+
for _, f := range files.UnsortedList() {
115+
if err := cw.watcher.Add(f); err != nil {
116+
watchErr = err
117+
return false, nil //nolint:nilerr // We want to keep trying.
118+
}
119+
// We've added the watch, remove it from the set.
120+
files.Delete(f)
121+
}
122+
return true, nil
123+
}); err != nil {
124+
return fmt.Errorf("failed to add watches: %w", kerrors.NewAggregate([]error{err, watchErr}))
125+
}
126+
}
127+
128+
go cw.Watch()
129+
91130
ticker := time.NewTicker(cw.interval)
92131
defer ticker.Stop()
93132

94-
log.Info("Starting certificate watcher")
133+
log.Info("Starting certificate poll+watcher", "interval", cw.interval)
95134
for {
96135
select {
97136
case <-ctx.Done():
98-
return nil
137+
return cw.watcher.Close()
99138
case <-ticker.C:
100139
if err := cw.ReadCertificate(); err != nil {
101140
log.Error(err, "failed read certificate")
@@ -104,6 +143,28 @@ func (cw *CertWatcher) Start(ctx context.Context) error {
104143
}
105144
}
106145

146+
// Watch reads events from the watcher's channel and reacts to changes.
147+
func (cw *CertWatcher) Watch() {
148+
for {
149+
select {
150+
case event, ok := <-cw.watcher.Events:
151+
// Channel is closed.
152+
if !ok {
153+
return
154+
}
155+
156+
cw.handleEvent(event)
157+
case err, ok := <-cw.watcher.Errors:
158+
// Channel is closed.
159+
if !ok {
160+
return
161+
}
162+
163+
log.Error(err, "certificate watch error")
164+
}
165+
}
166+
}
167+
107168
// updateCachedCertificate checks if the new certificate differs from the cache,
108169
// updates it and returns the result if it was updated or not
109170
func (cw *CertWatcher) updateCachedCertificate(cert *tls.Certificate, keyPEMBlock []byte) bool {
@@ -159,3 +220,23 @@ func (cw *CertWatcher) ReadCertificate() error {
159220
}
160221
return nil
161222
}
223+
224+
func (cw *CertWatcher) handleEvent(event fsnotify.Event) {
225+
// Only care about events which may modify the contents of the file.
226+
switch {
227+
case event.Op.Has(fsnotify.Write):
228+
case event.Op.Has(fsnotify.Create):
229+
case event.Op.Has(fsnotify.Chmod), event.Op.Has(fsnotify.Remove):
230+
// If the file was removed or renamed, re-add the watch to the previous name
231+
if err := cw.watcher.Add(event.Name); err != nil {
232+
log.Error(err, "error re-watching file")
233+
}
234+
default:
235+
return
236+
}
237+
238+
log.V(1).Info("certificate event", "event", event)
239+
if err := cw.ReadCertificate(); err != nil {
240+
log.Error(err, "error re-reading certificate")
241+
}
242+
}

pkg/certwatcher/certwatcher_test.go

+14-8
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,12 @@ var _ = Describe("CertWatcher", func() {
7676
Expect(err).ToNot(HaveOccurred())
7777
})
7878

79-
startWatcher := func() (done <-chan struct{}) {
79+
startWatcher := func(interval time.Duration) (done <-chan struct{}) {
8080
doneCh := make(chan struct{})
8181
go func() {
8282
defer GinkgoRecover()
8383
defer close(doneCh)
84-
Expect(watcher.WithWatchInterval(time.Second).Start(ctx)).To(Succeed())
84+
Expect(watcher.WithWatchInterval(interval).Start(ctx)).To(Succeed())
8585
}()
8686
// wait till we read first cert
8787
Eventually(func() error {
@@ -92,14 +92,16 @@ var _ = Describe("CertWatcher", func() {
9292
}
9393

9494
It("should read the initial cert/key", func() {
95-
doneCh := startWatcher()
95+
// This test verifies the initial read succeeded. So interval doesn't matter.
96+
doneCh := startWatcher(10 * time.Second)
9697

9798
ctxCancel()
9899
Eventually(doneCh, "4s").Should(BeClosed())
99100
})
100101

101102
It("should reload currentCert when changed", func() {
102-
doneCh := startWatcher()
103+
// This test verifies fsnotify detects the cert change. So interval doesn't matter.
104+
doneCh := startWatcher(10 * time.Second)
103105
called := atomic.Int64{}
104106
watcher.RegisterCallback(func(crt tls.Certificate) {
105107
called.Add(1)
@@ -123,7 +125,8 @@ var _ = Describe("CertWatcher", func() {
123125
})
124126

125127
It("should reload currentCert when changed with rename", func() {
126-
doneCh := startWatcher()
128+
// This test verifies fsnotify detects the cert change. So interval doesn't matter.
129+
doneCh := startWatcher(10 * time.Second)
127130
called := atomic.Int64{}
128131
watcher.RegisterCallback(func(crt tls.Certificate) {
129132
called.Add(1)
@@ -153,7 +156,8 @@ var _ = Describe("CertWatcher", func() {
153156
})
154157

155158
It("should reload currentCert after move out", func() {
156-
doneCh := startWatcher()
159+
// This test verifies poll works, so we'll use 1s as interval (fsnotify doesn't detect this change).
160+
doneCh := startWatcher(1 * time.Second)
157161
called := atomic.Int64{}
158162
watcher.RegisterCallback(func(crt tls.Certificate) {
159163
called.Add(1)
@@ -189,7 +193,8 @@ var _ = Describe("CertWatcher", func() {
189193
})
190194

191195
It("should get updated on successful certificate read", func() {
192-
doneCh := startWatcher()
196+
// This test verifies fsnotify, so interval doesn't matter.
197+
doneCh := startWatcher(10 * time.Second)
193198

194199
Eventually(func() error {
195200
readCertificateTotalAfter := testutil.ToFloat64(metrics.ReadCertificateTotal)
@@ -204,7 +209,8 @@ var _ = Describe("CertWatcher", func() {
204209
})
205210

206211
It("should get updated on read certificate errors", func() {
207-
doneCh := startWatcher()
212+
// This test works with fsnotify, so interval doesn't matter.
213+
doneCh := startWatcher(10 * time.Second)
208214

209215
Eventually(func() error {
210216
readCertificateTotalAfter := testutil.ToFloat64(metrics.ReadCertificateTotal)

0 commit comments

Comments
 (0)