Skip to content

Commit 8869aeb

Browse files
committed
dump and restore internal state
This replaces the previous approach, trying to reconstruct state from observations, with a simpler dump/restore of the internal state as a JSON file in the driver's data directory. The advantage is that *all* volume and snapshot attributes get restored, not just those that can be deducted from mount points. No attempts are made to restore state properly after a node reboot.
1 parent e4d72e3 commit 8869aeb

24 files changed

+4553
-441
lines changed

cmd/hostpathplugin/main.go

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func main() {
4646

4747
flag.StringVar(&cfg.Endpoint, "endpoint", "unix://tmp/csi.sock", "CSI endpoint")
4848
flag.StringVar(&cfg.DriverName, "drivername", "hostpath.csi.k8s.io", "name of the driver")
49+
flag.StringVar(&cfg.StateDir, "statedir", "/csi-data-dir", "directory for storing state information across driver restarts, volumes and snapshots")
4950
flag.StringVar(&cfg.NodeID, "nodeid", "", "node id")
5051
flag.BoolVar(&cfg.Ephemeral, "ephemeral", false, "publish volumes in ephemeral mode even if kubelet did not ask for it (only needed for Kubernetes 1.15)")
5152
flag.Int64Var(&cfg.MaxVolumesPerNode, "maxvolumespernode", 0, "limit of volumes per node")

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ require (
99
github.com/golang/protobuf v1.4.3
1010
github.com/kubernetes-csi/csi-lib-utils v0.9.0
1111
github.com/pborman/uuid v1.2.1
12-
github.com/stretchr/testify v1.6.1
12+
github.com/stretchr/testify v1.7.0
1313
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11
1414
golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d // indirect
1515
google.golang.org/genproto v0.0.0-20201209185603-f92720507ed4 // indirect

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
507507
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
508508
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
509509
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
510+
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
511+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
510512
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
511513
github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
512514
github.com/thecodeteam/goscaleio v0.1.0/go.mod h1:68sdkZAsK8bvEwBlbQnlLS+xU+hvLYM/iQ8KXej1AwM=

pkg/hostpath/controllerserver.go

+48-53
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"fmt"
2121
"math"
2222
"os"
23-
"path/filepath"
2423
"sort"
2524
"strconv"
2625

@@ -35,17 +34,12 @@ import (
3534

3635
"github.com/container-storage-interface/spec/lib/go/csi"
3736
utilexec "k8s.io/utils/exec"
38-
)
3937

40-
const (
41-
deviceID = "deviceID"
38+
"github.com/kubernetes-csi/csi-driver-host-path/pkg/state"
4239
)
4340

44-
type accessType int
45-
4641
const (
47-
mountAccess accessType = iota
48-
blockAccess
42+
deviceID = "deviceID"
4943
)
5044

5145
func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (resp *csi.CreateVolumeResponse, finalErr error) {
@@ -84,13 +78,13 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque
8478
return nil, status.Error(codes.InvalidArgument, "cannot have both block and mount access type")
8579
}
8680

87-
var requestedAccessType accessType
81+
var requestedAccessType state.AccessType
8882

8983
if accessTypeBlock {
90-
requestedAccessType = blockAccess
84+
requestedAccessType = state.BlockAccess
9185
} else {
9286
// Default to mount.
93-
requestedAccessType = mountAccess
87+
requestedAccessType = state.MountAccess
9488
}
9589

9690
// Lock before acting on global state. A production-quality
@@ -106,7 +100,7 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque
106100

107101
// Need to check for already existing volume name, and if found
108102
// check for the requested capacity and already allocated capacity
109-
if exVol, err := hp.getVolumeByName(req.GetName()); err == nil {
103+
if exVol, err := hp.state.GetVolumeByName(req.GetName()); err == nil {
110104
// Since err is nil, it means the volume with the same name already exists
111105
// need to check if the size of existing volume is the same as in new
112106
// request
@@ -149,7 +143,7 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque
149143
glog.V(4).Infof("created volume %s at path %s", vol.VolID, vol.VolPath)
150144

151145
if req.GetVolumeContentSource() != nil {
152-
path := getVolumePath(volumeID)
146+
path := hp.getVolumePath(volumeID)
153147
volumeSource := req.VolumeContentSource
154148
switch volumeSource.Type.(type) {
155149
case *csi.VolumeContentSource_Snapshot:
@@ -203,7 +197,7 @@ func (hp *hostPath) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeReque
203197
defer hp.mutex.Unlock()
204198

205199
volId := req.GetVolumeId()
206-
vol, err := hp.getVolumeByID(volId)
200+
vol, err := hp.state.GetVolumeByID(volId)
207201
if err != nil {
208202
// Volume not found: might have already deleted
209203
return &csi.DeleteVolumeResponse{}, nil
@@ -243,7 +237,7 @@ func (hp *hostPath) ValidateVolumeCapabilities(ctx context.Context, req *csi.Val
243237
hp.mutex.Lock()
244238
defer hp.mutex.Unlock()
245239

246-
if _, err := hp.getVolumeByID(req.GetVolumeId()); err != nil {
240+
if _, err := hp.state.GetVolumeByID(req.GetVolumeId()); err != nil {
247241
return nil, err
248242
}
249243

@@ -287,7 +281,7 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro
287281
hp.mutex.Lock()
288282
defer hp.mutex.Unlock()
289283

290-
vol, err := hp.getVolumeByID(req.VolumeId)
284+
vol, err := hp.state.GetVolumeByID(req.VolumeId)
291285
if err != nil {
292286
return nil, status.Error(codes.NotFound, err.Error())
293287
}
@@ -311,8 +305,8 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro
311305

312306
vol.IsAttached = true
313307
vol.ReadOnlyAttach = req.GetReadonly()
314-
if err := hp.updateVolume(vol.VolID, vol); err != nil {
315-
return nil, status.Errorf(codes.Internal, "failed to update volume %s: %v", vol.VolID, err)
308+
if err := hp.state.UpdateVolume(vol); err != nil {
309+
return nil, err
316310
}
317311

318312
return &csi.ControllerPublishVolumeResponse{
@@ -337,7 +331,7 @@ func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.Cont
337331
hp.mutex.Lock()
338332
defer hp.mutex.Unlock()
339333

340-
vol, err := hp.getVolumeByID(req.VolumeId)
334+
vol, err := hp.state.GetVolumeByID(req.VolumeId)
341335
if err != nil {
342336
// Not an error: a non-existent volume is not published.
343337
// See also https://github.com/kubernetes-csi/external-attacher/pull/165
@@ -351,7 +345,7 @@ func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.Cont
351345
}
352346

353347
vol.IsAttached = false
354-
if err := hp.updateVolume(vol.VolID, vol); err != nil {
348+
if err := hp.state.UpdateVolume(vol); err != nil {
355349
return nil, status.Errorf(codes.Internal, "could not update volume %s: %v", vol.VolID, err)
356350
}
357351

@@ -399,15 +393,20 @@ func (hp *hostPath) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest
399393

400394
var (
401395
startIdx, volumesLength, maxLength int64
402-
hpVolume hostPathVolume
396+
hpVolume state.Volume
403397
)
404398

405399
// Lock before acting on global state. A production-quality
406400
// driver might use more fine-grained locking.
407401
hp.mutex.Lock()
408402
defer hp.mutex.Unlock()
409403

410-
volumeIds := hp.getSortedVolumeIDs()
404+
// Sort by volume ID.
405+
volumes := hp.state.GetVolumes()
406+
sort.Slice(volumes, func(i, j int) bool {
407+
return volumes[i].VolID < volumes[j].VolID
408+
})
409+
411410
if req.StartingToken == "" {
412411
req.StartingToken = "1"
413412
}
@@ -417,16 +416,16 @@ func (hp *hostPath) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest
417416
return nil, status.Error(codes.Aborted, "The type of startingToken should be integer")
418417
}
419418

420-
volumesLength = int64(len(volumeIds))
419+
volumesLength = int64(len(volumes))
421420
maxLength = int64(req.MaxEntries)
422421

423422
if maxLength > volumesLength || maxLength <= 0 {
424423
maxLength = volumesLength
425424
}
426425

427426
for index := startIdx - 1; index < volumesLength && index < maxLength; index++ {
428-
hpVolume = hp.volumes[volumeIds[index]]
429-
healthy, msg := hp.doHealthCheckInControllerSide(volumeIds[index])
427+
hpVolume = volumes[index]
428+
healthy, msg := hp.doHealthCheckInControllerSide(hpVolume.VolID)
430429
glog.V(3).Infof("Healthy state: %s Volume: %t", hpVolume.VolName, healthy)
431430
volumeRes.Entries = append(volumeRes.Entries, &csi.ListVolumesResponse_Entry{
432431
Volume: &csi.Volume{
@@ -453,7 +452,7 @@ func (hp *hostPath) ControllerGetVolume(ctx context.Context, req *csi.Controller
453452
hp.mutex.Lock()
454453
defer hp.mutex.Unlock()
455454

456-
volume, err := hp.getVolumeByID(req.GetVolumeId())
455+
volume, err := hp.state.GetVolumeByID(req.GetVolumeId())
457456
if err != nil {
458457
return nil, err
459458
}
@@ -475,11 +474,6 @@ func (hp *hostPath) ControllerGetVolume(ctx context.Context, req *csi.Controller
475474
}, nil
476475
}
477476

478-
// getSnapshotPath returns the full path to where the snapshot is stored
479-
func getSnapshotPath(snapshotID string) string {
480-
return filepath.Join(dataRoot, fmt.Sprintf("%s%s", snapshotID, snapshotExt))
481-
}
482-
483477
// CreateSnapshot uses tar command to create snapshot for hostpath volume. The tar command can quickly create
484478
// archives of entire directories. The host image must have "tar" binaries in /bin, /usr/sbin, or /usr/bin.
485479
func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
@@ -503,7 +497,7 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR
503497

504498
// Need to check for already existing snapshot name, and if found check for the
505499
// requested sourceVolumeId and sourceVolumeId of snapshot that has been created.
506-
if exSnap, err := hp.getSnapshotByName(req.GetName()); err == nil {
500+
if exSnap, err := hp.state.GetSnapshotByName(req.GetName()); err == nil {
507501
// Since err is nil, it means the snapshot with the same name already exists need
508502
// to check if the sourceVolumeId of existing snapshot is the same as in new request.
509503
if exSnap.VolID == req.GetSourceVolumeId() {
@@ -522,18 +516,18 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR
522516
}
523517

524518
volumeID := req.GetSourceVolumeId()
525-
hostPathVolume, err := hp.getVolumeByID(volumeID)
519+
hostPathVolume, err := hp.state.GetVolumeByID(volumeID)
526520
if err != nil {
527521
return nil, err
528522
}
529523

530524
snapshotID := uuid.NewUUID().String()
531525
creationTime := ptypes.TimestampNow()
532526
volPath := hostPathVolume.VolPath
533-
file := getSnapshotPath(snapshotID)
527+
file := hp.getSnapshotPath(snapshotID)
534528

535529
var cmd []string
536-
if hostPathVolume.VolAccessType == blockAccess {
530+
if hostPathVolume.VolAccessType == state.BlockAccess {
537531
glog.V(4).Infof("Creating snapshot of Raw Block Mode Volume")
538532
cmd = []string{"cp", volPath, file}
539533
} else {
@@ -547,7 +541,7 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR
547541
}
548542

549543
glog.V(4).Infof("create volume snapshot %s", file)
550-
snapshot := hostPathSnapshot{}
544+
snapshot := state.Snapshot{}
551545
snapshot.Name = req.GetName()
552546
snapshot.Id = snapshotID
553547
snapshot.VolID = volumeID
@@ -556,8 +550,9 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR
556550
snapshot.SizeBytes = hostPathVolume.VolSize
557551
snapshot.ReadyToUse = true
558552

559-
hp.snapshots[snapshotID] = snapshot
560-
553+
if err := hp.state.UpdateSnapshot(snapshot); err != nil {
554+
return nil, err
555+
}
561556
return &csi.CreateSnapshotResponse{
562557
Snapshot: &csi.Snapshot{
563558
SnapshotId: snapshot.Id,
@@ -587,9 +582,11 @@ func (hp *hostPath) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotR
587582
defer hp.mutex.Unlock()
588583

589584
glog.V(4).Infof("deleting snapshot %s", snapshotID)
590-
path := getSnapshotPath(snapshotID)
585+
path := hp.getSnapshotPath(snapshotID)
591586
os.RemoveAll(path)
592-
delete(hp.snapshots, snapshotID)
587+
if err := hp.state.DeleteSnapshot(snapshotID); err != nil {
588+
return nil, err
589+
}
593590
return &csi.DeleteSnapshotResponse{}, nil
594591
}
595592

@@ -607,14 +604,14 @@ func (hp *hostPath) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsReq
607604
// case 1: SnapshotId is not empty, return snapshots that match the snapshot id.
608605
if len(req.GetSnapshotId()) != 0 {
609606
snapshotID := req.SnapshotId
610-
if snapshot, ok := hp.snapshots[snapshotID]; ok {
607+
if snapshot, err := hp.state.GetSnapshotByID(snapshotID); err == nil {
611608
return convertSnapshot(snapshot), nil
612609
}
613610
}
614611

615612
// case 2: SourceVolumeId is not empty, return snapshots that match the source volume id.
616613
if len(req.GetSourceVolumeId()) != 0 {
617-
for _, snapshot := range hp.snapshots {
614+
for _, snapshot := range hp.state.GetSnapshots() {
618615
if snapshot.VolID == req.SourceVolumeId {
619616
return convertSnapshot(snapshot), nil
620617
}
@@ -623,14 +620,12 @@ func (hp *hostPath) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsReq
623620

624621
var snapshots []csi.Snapshot
625622
// case 3: no parameter is set, so we return all the snapshots.
626-
sortedKeys := make([]string, 0)
627-
for k := range hp.snapshots {
628-
sortedKeys = append(sortedKeys, k)
629-
}
630-
sort.Strings(sortedKeys)
623+
hpSnapshots := hp.state.GetSnapshots()
624+
sort.Slice(hpSnapshots, func(i, j int) bool {
625+
return hpSnapshots[i].Id < hpSnapshots[j].Id
626+
})
631627

632-
for _, key := range sortedKeys {
633-
snap := hp.snapshots[key]
628+
for _, snap := range hpSnapshots {
634629
snapshot := csi.Snapshot{
635630
SnapshotId: snap.Id,
636631
SourceVolumeId: snap.VolID,
@@ -725,15 +720,15 @@ func (hp *hostPath) ControllerExpandVolume(ctx context.Context, req *csi.Control
725720
hp.mutex.Lock()
726721
defer hp.mutex.Unlock()
727722

728-
exVol, err := hp.getVolumeByID(volID)
723+
exVol, err := hp.state.GetVolumeByID(volID)
729724
if err != nil {
730725
return nil, err
731726
}
732727

733728
if exVol.VolSize < capacity {
734729
exVol.VolSize = capacity
735-
if err := hp.updateVolume(volID, exVol); err != nil {
736-
return nil, fmt.Errorf("could not update volume %s: %w", volID, err)
730+
if err := hp.state.UpdateVolume(exVol); err != nil {
731+
return nil, err
737732
}
738733
}
739734

@@ -743,7 +738,7 @@ func (hp *hostPath) ControllerExpandVolume(ctx context.Context, req *csi.Control
743738
}, nil
744739
}
745740

746-
func convertSnapshot(snap hostPathSnapshot) *csi.ListSnapshotsResponse {
741+
func convertSnapshot(snap state.Snapshot) *csi.ListSnapshotsResponse {
747742
entries := []*csi.ListSnapshotsResponse_Entry{
748743
{
749744
Snapshot: &csi.Snapshot{

0 commit comments

Comments
 (0)