Skip to content

Commit 3a8aba1

Browse files
committed
Cache devices and their symlinks in node driver, periodically noting
changes and printing the full list.
1 parent e0cc9d9 commit 3a8aba1

File tree

3 files changed

+183
-4
lines changed

3 files changed

+183
-4
lines changed

cmd/gce-pd-csi-driver/main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
3535
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
3636
driver "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-pd-csi-driver"
37+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache"
3738
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics"
3839
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
3940
)
@@ -280,9 +281,11 @@ func handle() {
280281
if err := setupDataCache(ctx, *nodeName, nodeServer.MetadataService.GetName()); err != nil {
281282
klog.Errorf("Data Cache setup failed: %v", err)
282283
}
283-
go driver.StartWatcher(*nodeName)
284+
go driver.StartWatcher(ctx, *nodeName)
284285
}
285286
}
287+
288+
go linkcache.NewListingCache(1*time.Minute, "/dev/disk/by-id/").Run(ctx)
286289
}
287290

288291
err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer)

pkg/gce-pd-csi-driver/cache.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ func InitializeDataCacheNode(nodeId string) error {
615615
return nil
616616
}
617617

618-
func StartWatcher(nodeName string) {
618+
func StartWatcher(ctx context.Context, nodeName string) {
619619
dirToWatch := "/dev/"
620620
watcher, err := fsnotify.NewWatcher()
621621
if err != nil {
@@ -630,17 +630,20 @@ func StartWatcher(nodeName string) {
630630
}
631631
errorCh := make(chan error, 1)
632632
// Handle the error received from the watcher goroutine
633-
go watchDiskDetaches(watcher, nodeName, errorCh)
633+
go watchDiskDetaches(ctx, watcher, nodeName, errorCh)
634634

635635
select {
636636
case err := <-errorCh:
637637
klog.Errorf("watcher encountered an error: %v", err)
638638
}
639639
}
640640

641-
func watchDiskDetaches(watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error {
641+
func watchDiskDetaches(ctx context.Context, watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error {
642642
for {
643643
select {
644+
case <-ctx.Done():
645+
klog.Infof("Context done, stopping watcher")
646+
return nil
644647
// watch for errors
645648
case err := <-watcher.Errors:
646649
errorCh <- fmt.Errorf("disk update event errored: %v", err)

pkg/linkcache/cache.go

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package linkcache
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"path/filepath"
8+
"regexp"
9+
"strings"
10+
"time"
11+
12+
"k8s.io/klog/v2"
13+
)
14+
15+
var partitionNameRegex = regexp.MustCompile(`-part[0-9]+$`)
16+
17+
// ListingCache polls the filesystem at the specified directory once per
18+
// periodand checks each non-directory entry for a symlink. The results are
19+
// cached. Changes to the cache are logged, as well as the full contents of the
20+
// cache. The cache's Run() method is expected to be called in a goroutine.
21+
// Its cancellation is controlled via the context argument.
22+
type ListingCache struct {
23+
period time.Duration
24+
dir string
25+
links *linkCache
26+
}
27+
28+
func NewListingCache(period time.Duration, dir string) *ListingCache {
29+
return &ListingCache{
30+
period: period,
31+
dir: dir,
32+
links: newLinkCache(),
33+
}
34+
}
35+
36+
// Run starts the cache's background loop. The filesystem is listed and the
37+
// cache updated according to the frequency specified by the period. It will run
38+
// until the context is cancelled.
39+
func (l *ListingCache) Run(ctx context.Context) {
40+
// Start the loop that runs every minute
41+
ticker := time.NewTicker(l.period)
42+
defer ticker.Stop()
43+
44+
// Initial list and update so we don't wait for the first tick.
45+
err := l.listAndUpdate()
46+
if err != nil {
47+
klog.Warningf("Error listing and updating symlinks: %v", err)
48+
}
49+
50+
for {
51+
select {
52+
case <-ctx.Done():
53+
klog.Infof("Context done, stopping watcher")
54+
return
55+
case <-ticker.C:
56+
err := l.listAndUpdate()
57+
if err != nil {
58+
klog.Warningf("Error listing and updating symlinks: %v", err)
59+
continue
60+
}
61+
62+
klog.Infof("periodic symlink cache read: %s", l.links.String())
63+
}
64+
}
65+
}
66+
67+
func (l *ListingCache) listAndUpdate() error {
68+
visited := make(map[string]struct{})
69+
70+
entries, err := os.ReadDir(l.dir)
71+
if err != nil {
72+
return fmt.Errorf("failed to read directory %s: %w", l.dir, err)
73+
}
74+
75+
var errs []error
76+
for _, entry := range entries {
77+
if entry.IsDir() {
78+
continue
79+
}
80+
81+
// TODO(juliankatz): To have certainty this works for all edge cases, we
82+
// need to test this with a manually partitioned disk.
83+
if partitionNameRegex.MatchString(entry.Name()) {
84+
continue
85+
}
86+
87+
diskByIdPath := filepath.Join(l.dir, entry.Name())
88+
89+
// Add the device to the map regardless of successful symlink eval.
90+
// Otherwise, a broken symlink will lead us to remove it from the cache.
91+
visited[diskByIdPath] = struct{}{}
92+
93+
realFSPath, err := filepath.EvalSymlinks(diskByIdPath)
94+
if err != nil {
95+
errs = append(errs, fmt.Errorf("failed to evaluate symlink for %s: %w", diskByIdPath, err))
96+
l.links.BrokenSymlink(diskByIdPath)
97+
continue
98+
}
99+
100+
l.links.AddOrUpdateDevice(diskByIdPath, realFSPath)
101+
}
102+
103+
for _, id := range l.links.DeviceIDs() {
104+
if _, found := visited[id]; !found {
105+
l.links.RemoveDevice(id)
106+
}
107+
}
108+
109+
if len(errs) > 0 {
110+
return fmt.Errorf("failed to evaluate symlinks for %d devices: %v", len(errs), errs)
111+
}
112+
return nil
113+
}
114+
115+
// linkCache is a structure that maintains a cache of symlinks between
116+
// /dev/disk/by-id and /dev/sd* paths. It provides methods to add/update,
117+
// retrieve, and remove device symlinks from the cache.
118+
type linkCache struct {
119+
devices map[string]linkCacheEntry
120+
}
121+
122+
type linkCacheEntry struct {
123+
path string
124+
// If true, the symlink is known to be broken.
125+
brokenSymlink bool
126+
}
127+
128+
func newLinkCache() *linkCache {
129+
return &linkCache{
130+
devices: make(map[string]linkCacheEntry),
131+
}
132+
}
133+
134+
func (d *linkCache) AddOrUpdateDevice(symlink, realPath string) {
135+
prevEntry, exists := d.devices[symlink]
136+
if !exists || prevEntry.path != realPath {
137+
klog.Infof("Symlink updated for link %s, previous value: %s, new value: %s", symlink, prevEntry.path, realPath)
138+
}
139+
d.devices[symlink] = linkCacheEntry{path: realPath, brokenSymlink: false}
140+
}
141+
142+
// BrokenSymlink marks a symlink as broken. If the symlink is not in the cache,
143+
// it is ignored.
144+
func (d *linkCache) BrokenSymlink(symlink string) {
145+
if entry, ok := d.devices[symlink]; ok {
146+
entry.brokenSymlink = true
147+
d.devices[symlink] = entry
148+
}
149+
}
150+
151+
func (d *linkCache) RemoveDevice(symlink string) {
152+
delete(d.devices, symlink)
153+
}
154+
155+
func (d *linkCache) DeviceIDs() []string {
156+
ids := make([]string, 0, len(d.devices))
157+
for id := range d.devices {
158+
ids = append(ids, id)
159+
}
160+
return ids
161+
}
162+
163+
func (d *linkCache) String() string {
164+
var sb strings.Builder
165+
for symlink, entry := range d.devices {
166+
if entry.brokenSymlink {
167+
sb.WriteString(fmt.Sprintf("%s -> broken symlink... last known value: %s; ", symlink, entry.path))
168+
} else {
169+
sb.WriteString(fmt.Sprintf("%s -> %s; ", symlink, entry.path))
170+
}
171+
}
172+
return strings.TrimSuffix(sb.String(), "; ")
173+
}

0 commit comments

Comments
 (0)