Skip to content

Commit 5023849

Browse files
committed
Add state to snapshot create and configurable retry logic
Signed-off-by: Grant Griffiths <[email protected]>
1 parent 850184d commit 5023849

File tree

8 files changed

+132
-36
lines changed

8 files changed

+132
-36
lines changed

cmd/csi-snapshotter/csi-snapshotter

43.8 MB
Binary file not shown.

cmd/csi-snapshotter/main.go

+6
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ var (
6363
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
6464
createSnapshotContentRetryCount = flag.Int("create-snapshotcontent-retrycount", 5, "Number of retries when we create a snapshot content object for a snapshot.")
6565
createSnapshotContentInterval = flag.Duration("create-snapshotcontent-interval", 10*time.Second, "Interval between retries when we create a snapshot content object for a snapshot.")
66+
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed snapshot creation. It doubles with each failure, up to retry-interval-max.")
67+
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed snapshot creation.")
68+
failedSnapshotThreshold = flag.Int("failed-snapshot-threshold", 15, "The maximum number of retries on snapshot failures. Set 0 to retry indefinitely. Default is 15.")
6669
resyncPeriod = flag.Duration("resync-period", 60*time.Second, "Resync interval of the controller.")
6770
snapshotNamePrefix = flag.String("snapshot-name-prefix", "snapshot", "Prefix to apply to the name of a created snapshot")
6871
snapshotNameUUIDLength = flag.Int("snapshot-name-uuid-length", -1, "Length in characters for the generated uuid of a created snapshot. Defaults behavior is to NOT truncate.")
@@ -191,6 +194,9 @@ func main() {
191194
coreFactory.Core().V1().PersistentVolumeClaims(),
192195
*createSnapshotContentRetryCount,
193196
*createSnapshotContentInterval,
197+
*retryIntervalStart,
198+
*retryIntervalMax,
199+
*failedSnapshotThreshold,
194200
snapShotter,
195201
*csiTimeout,
196202
*resyncPeriod,

pkg/controller/csi_handler.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030

3131
// Handler is responsible for handling VolumeSnapshot events from informer.
3232
type Handler interface {
33-
CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error)
33+
CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, snapshotter.SnapshottingState, error)
3434
DeleteSnapshot(content *crdv1.VolumeSnapshotContent, snapshotterCredentials map[string]string) error
3535
GetSnapshotStatus(content *crdv1.VolumeSnapshotContent) (bool, time.Time, int64, error)
3636
}
@@ -58,19 +58,20 @@ func NewCSIHandler(
5858
}
5959
}
6060

61-
func (handler *csiHandler) CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error) {
61+
func (handler *csiHandler) CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, snapshotter.SnapshottingState, error) {
6262

6363
ctx, cancel := context.WithTimeout(context.Background(), handler.timeout)
6464
defer cancel()
6565

6666
snapshotName, err := makeSnapshotName(handler.snapshotNamePrefix, string(snapshot.UID), handler.snapshotNameUUIDLength)
6767
if err != nil {
68-
return "", "", time.Time{}, 0, false, err
68+
return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, err
6969
}
7070
newParameters, err := removePrefixedParameters(parameters)
7171
if err != nil {
72-
return "", "", time.Time{}, 0, false, fmt.Errorf("failed to remove CSI Parameters of prefixed keys: %v", err)
72+
return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, fmt.Errorf("failed to remove CSI Parameters of prefixed keys: %v", err)
7373
}
74+
7475
return handler.snapshotter.CreateSnapshot(ctx, snapshotName, volume, newParameters, snapshotterCredentials)
7576
}
7677

pkg/controller/framework_test.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
snapshotscheme "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/scheme"
3636
informers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions"
3737
storagelisters "github.com/kubernetes-csi/external-snapshotter/pkg/client/listers/volumesnapshot/v1alpha1"
38+
"github.com/kubernetes-csi/external-snapshotter/pkg/snapshotter"
3839
"k8s.io/api/core/v1"
3940
storagev1 "k8s.io/api/storage/v1"
4041
storagev1beta1 "k8s.io/api/storage/v1beta1"
@@ -772,6 +773,9 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
772773
coreFactory.Core().V1().PersistentVolumeClaims(),
773774
3,
774775
5*time.Millisecond,
776+
5*time.Millisecond,
777+
10*time.Second,
778+
5,
775779
fakeSnapshot,
776780
5*time.Millisecond,
777781
60*time.Second,
@@ -1329,10 +1333,10 @@ type fakeSnapshotter struct {
13291333
t *testing.T
13301334
}
13311335

1332-
func (f *fakeSnapshotter) CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error) {
1336+
func (f *fakeSnapshotter) CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, snapshotter.SnapshottingState, error) {
13331337
if f.createCallCounter >= len(f.createCalls) {
13341338
f.t.Errorf("Unexpected CSI Create Snapshot call: snapshotName=%s, volume=%v, index: %d, calls: %+v", snapshotName, volume.Name, f.createCallCounter, f.createCalls)
1335-
return "", "", time.Time{}, 0, false, fmt.Errorf("unexpected call")
1339+
return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, fmt.Errorf("unexpected call")
13361340
}
13371341
call := f.createCalls[f.createCallCounter]
13381342
f.createCallCounter++
@@ -1359,10 +1363,10 @@ func (f *fakeSnapshotter) CreateSnapshot(ctx context.Context, snapshotName strin
13591363
}
13601364

13611365
if err != nil {
1362-
return "", "", time.Time{}, 0, false, fmt.Errorf("unexpected call")
1366+
return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, fmt.Errorf("unexpected call")
13631367
}
13641368

1365-
return call.driverName, call.snapshotId, call.creationTime, call.size, call.readyToUse, call.err
1369+
return call.driverName, call.snapshotId, call.creationTime, call.size, call.readyToUse, snapshotter.SnapshottingFinished, call.err
13661370
}
13671371

13681372
func (f *fakeSnapshotter) DeleteSnapshot(ctx context.Context, snapshotID string, snapshotterCredentials map[string]string) error {

pkg/controller/snapshot_controller.go

+29-16
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"time"
2323

2424
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
25+
"github.com/kubernetes-csi/external-snapshotter/pkg/snapshotter"
2526
"k8s.io/api/core/v1"
2627
storagev1 "k8s.io/api/storage/v1"
2728
storage "k8s.io/api/storage/v1beta1"
@@ -364,8 +365,8 @@ func (ctrl *csiSnapshotController) createSnapshot(snapshot *crdv1.VolumeSnapshot
364365
klog.V(5).Infof("createSnapshot[%s]: started", snapshotKey(snapshot))
365366
opName := fmt.Sprintf("create-%s[%s]", snapshotKey(snapshot), string(snapshot.UID))
366367
ctrl.scheduleOperation(opName, func() error {
367-
snapshotObj, err := ctrl.createSnapshotOperation(snapshot)
368-
if err != nil {
368+
snapshotObj, state, err := ctrl.createSnapshotOperation(snapshot)
369+
if err != nil && state == snapshotter.SnapshottingFinished {
369370
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err))
370371
klog.Errorf("createSnapshot [%s]: error occurred in createSnapshotOperation: %v", opName, err)
371372
return err
@@ -375,7 +376,19 @@ func (ctrl *csiSnapshotController) createSnapshot(snapshot *crdv1.VolumeSnapshot
375376
// We will get an "snapshot update" event soon, this is not a big error
376377
klog.V(4).Infof("createSnapshot [%s]: cannot update internal cache: %v", snapshotKey(snapshotObj), updateErr)
377378
}
378-
return nil
379+
if state == snapshotter.SnapshottingFinished {
380+
// Snapshotting finished, remove obj from the queue.
381+
ctrl.snapshotQueue.Done(snapshotObj)
382+
return nil
383+
384+
} else if state == snapshotter.SnapshottingInBackground {
385+
klog.V(4).Infof("createSnapshot [%s]: Temporary error received, adding Snapshot back in queue: %v", snapshotKey(snapshotObj), updateErr)
386+
// Snapshotting still in progress.
387+
return nil
388+
} else {
389+
// State is SnapshottingNoChange. Don't change the snapshot queue.
390+
return nil
391+
}
379392
})
380393
return nil
381394
}
@@ -584,7 +597,7 @@ func (ctrl *csiSnapshotController) checkandUpdateBoundSnapshotStatusOperation(sn
584597
if err != nil {
585598
return nil, fmt.Errorf("failed to get input parameters to create snapshot %s: %q", snapshot.Name, err)
586599
}
587-
driverName, snapshotID, creationTime, size, readyToUse, err = ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials)
600+
driverName, snapshotID, creationTime, size, readyToUse, _, err = ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials)
588601
if err != nil {
589602
klog.Errorf("checkandUpdateBoundSnapshotStatusOperation: failed to call create snapshot to check whether the snapshot is ready to use %q", err)
590603
return nil, err
@@ -611,30 +624,30 @@ func (ctrl *csiSnapshotController) checkandUpdateBoundSnapshotStatusOperation(sn
611624
// 2. Update VolumeSnapshot status with creationtimestamp information
612625
// 3. Create the VolumeSnapshotContent object with the snapshot id information.
613626
// 4. Bind the VolumeSnapshot and VolumeSnapshotContent object
614-
func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, error) {
627+
func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, snapshotter.SnapshottingState, error) {
615628
klog.Infof("createSnapshot: Creating snapshot %s through the plugin ...", snapshotKey(snapshot))
616629

617630
if snapshot.Status.Error != nil && !isControllerUpdateFailError(snapshot.Status.Error) {
618631
klog.V(4).Infof("error is already set in snapshot, do not retry to create: %s", snapshot.Status.Error.Message)
619-
return snapshot, nil
632+
return snapshot, snapshotter.SnapshottingFinished, nil
620633
}
621634

622635
// If PVC is not being deleted and finalizer is not added yet, a finalizer should be added.
623636
klog.V(5).Infof("createSnapshotOperation: Check if PVC is not being deleted and add Finalizer for source of snapshot [%s] if needed", snapshot.Name)
624637
err := ctrl.ensureSnapshotSourceFinalizer(snapshot)
625638
if err != nil {
626639
klog.Errorf("createSnapshotOperation failed to add finalizer for source of snapshot %s", err)
627-
return nil, err
640+
return nil, snapshotter.SnapshottingFinished, err
628641
}
629642

630643
class, volume, contentName, snapshotterCredentials, err := ctrl.getCreateSnapshotInput(snapshot)
631644
if err != nil {
632-
return nil, fmt.Errorf("failed to get input parameters to create snapshot %s: %q", snapshot.Name, err)
645+
return nil, snapshotter.SnapshottingFinished, fmt.Errorf("failed to get input parameters to create snapshot %s: %q", snapshot.Name, err)
633646
}
634647

635-
driverName, snapshotID, creationTime, size, readyToUse, err := ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials)
648+
driverName, snapshotID, creationTime, size, readyToUse, state, err := ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials)
636649
if err != nil {
637-
return nil, fmt.Errorf("failed to take snapshot of the volume, %s: %q", volume.Name, err)
650+
return nil, state, fmt.Errorf("failed to take snapshot of the volume, %s: %q", volume.Name, err)
638651
}
639652

640653
klog.V(5).Infof("Created snapshot: driver %s, snapshotId %s, creationTime %v, size %d, readyToUse %t", driverName, snapshotID, creationTime, size, readyToUse)
@@ -651,16 +664,16 @@ func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.Volum
651664
}
652665

653666
if err != nil {
654-
return nil, err
667+
return nil, snapshotter.SnapshottingFinished, err
655668
}
656669
// Create VolumeSnapshotContent in the database
657670
volumeRef, err := ref.GetReference(scheme.Scheme, volume)
658671
if err != nil {
659-
return nil, err
672+
return nil, snapshotter.SnapshottingFinished, err
660673
}
661674
snapshotRef, err := ref.GetReference(scheme.Scheme, snapshot)
662675
if err != nil {
663-
return nil, err
676+
return nil, snapshotter.SnapshottingFinished, err
664677
}
665678

666679
if class.DeletionPolicy == nil {
@@ -713,15 +726,15 @@ func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.Volum
713726
strerr := fmt.Sprintf("Error creating volume snapshot content object for snapshot %s: %v.", snapshotKey(snapshot), err)
714727
klog.Error(strerr)
715728
ctrl.eventRecorder.Event(newSnapshot, v1.EventTypeWarning, "CreateSnapshotContentFailed", strerr)
716-
return nil, newControllerUpdateError(snapshotKey(snapshot), err.Error())
729+
return nil, snapshotter.SnapshottingInBackground, newControllerUpdateError(snapshotKey(snapshot), err.Error())
717730
}
718731

719732
// save succeeded, bind and update status for snapshot.
720733
result, err := ctrl.bindandUpdateVolumeSnapshot(snapshotContent, newSnapshot)
721734
if err != nil {
722-
return nil, err
735+
return nil, snapshotter.SnapshottingFinished, err
723736
}
724-
return result, nil
737+
return result, snapshotter.SnapshottingFinished, nil
725738
}
726739

727740
// Delete a snapshot

pkg/controller/snapshot_controller_base.go

+18-4
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ type csiSnapshotController struct {
6868

6969
createSnapshotContentRetryCount int
7070
createSnapshotContentInterval time.Duration
71+
retryIntervalStart time.Duration
72+
retryIntervalMax time.Duration
73+
failedSnapshotThreshold int
7174
resyncPeriod time.Duration
7275
}
7376

@@ -82,6 +85,9 @@ func NewCSISnapshotController(
8285
pvcInformer coreinformers.PersistentVolumeClaimInformer,
8386
createSnapshotContentRetryCount int,
8487
createSnapshotContentInterval time.Duration,
88+
retryIntervalStart time.Duration,
89+
retryIntervalMax time.Duration,
90+
failedSnapshotThreshold int,
8591
snapshotter snapshotter.Snapshotter,
8692
timeout time.Duration,
8793
resyncPeriod time.Duration,
@@ -103,10 +109,13 @@ func NewCSISnapshotController(
103109
runningOperations: goroutinemap.NewGoRoutineMap(true),
104110
createSnapshotContentRetryCount: createSnapshotContentRetryCount,
105111
createSnapshotContentInterval: createSnapshotContentInterval,
112+
retryIntervalStart: retryIntervalStart,
113+
retryIntervalMax: retryIntervalMax,
114+
failedSnapshotThreshold: failedSnapshotThreshold,
106115
resyncPeriod: resyncPeriod,
107116
snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
108117
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
109-
snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-snapshot"),
118+
snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(retryIntervalStart, retryIntervalMax), "csi-snapshotter-snapshot"),
110119
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
111120
}
112121

@@ -205,7 +214,6 @@ func (ctrl *csiSnapshotController) snapshotWorker() {
205214
if quit {
206215
return true
207216
}
208-
defer ctrl.snapshotQueue.Done(keyObj)
209217
key := keyObj.(string)
210218
klog.V(5).Infof("snapshotWorker[%s]", key)
211219

@@ -393,12 +401,18 @@ func (ctrl *csiSnapshotController) updateSnapshot(snapshot *crdv1.VolumeSnapshot
393401
}
394402
err = ctrl.syncSnapshot(snapshot)
395403
if err != nil {
404+
sKey := snapshotKey(snapshot)
405+
if ctrl.failedSnapshotThreshold == 0 {
406+
ctrl.snapshotQueue.AddRateLimited(sKey)
407+
} else if ctrl.snapshotQueue.NumRequeues(sKey) < ctrl.failedSnapshotThreshold {
408+
ctrl.snapshotQueue.AddRateLimited(sKey)
409+
}
396410
if errors.IsConflict(err) {
397411
// Version conflict error happens quite often and the controller
398412
// recovers from it easily.
399-
klog.V(3).Infof("could not sync claim %q: %+v", snapshotKey(snapshot), err)
413+
klog.V(3).Infof("could not sync claim %q: %+v", sKey, err)
400414
} else {
401-
klog.Errorf("could not sync volume %q: %+v", snapshotKey(snapshot), err)
415+
klog.Errorf("could not sync volume %q: %+v", sKey, err)
402416
}
403417
}
404418
}

0 commit comments

Comments
 (0)